An experimental pub/sub client and server project.
at main 2.6 kB view raw
1package client 2 3import ( 4 "encoding/binary" 5 "errors" 6 "fmt" 7 "log/slog" 8 "net" 9 "sync" 10 "syscall" 11 12 "github.com/willdot/messagebroker/internal/server" 13) 14 15// Publisher allows messages to be published to a server 16type Publisher struct { 17 conn net.Conn 18 connMu sync.Mutex 19 addr string 20} 21 22// NewPublisher connects to the server at the given address and registers as a publisher 23func NewPublisher(addr string) (*Publisher, error) { 24 conn, err := connect(addr) 25 if err != nil { 26 return nil, fmt.Errorf("failed to connect to server: %w", err) 27 } 28 29 return &Publisher{ 30 conn: conn, 31 addr: addr, 32 }, nil 33} 34 35func connect(addr string) (net.Conn, error) { 36 conn, err := net.Dial("tcp", addr) 37 if err != nil { 38 return nil, fmt.Errorf("failed to dial: %w", err) 39 } 40 41 err = binary.Write(conn, binary.BigEndian, server.Publish) 42 if err != nil { 43 conn.Close() 44 return nil, fmt.Errorf("failed to register publish to server: %w", err) 45 } 46 return conn, nil 47} 48 49// Close cleanly shuts down the publisher 50func (p *Publisher) Close() error { 51 return p.conn.Close() 52} 53 54// Publish will publish the given message to the server 55func (p *Publisher) PublishMessage(message *Message) error { 56 return p.publishMessageWithRetry(message, 0) 57} 58 59func (p *Publisher) publishMessageWithRetry(message *Message, attempt int) error { 60 op := func(conn net.Conn) error { 61 // send topic first 62 topic := fmt.Sprintf("topic:%s", message.Topic) 63 64 topicLenB := make([]byte, 2) 65 binary.BigEndian.PutUint16(topicLenB, uint16(len(topic))) 66 67 headers := append(topicLenB, []byte(topic)...) 68 69 messageLenB := make([]byte, 4) 70 binary.BigEndian.PutUint32(messageLenB, uint32(len(message.Data))) 71 headers = append(headers, messageLenB...) 72 73 _, err := conn.Write(append(headers, message.Data...)) 74 if err != nil { 75 return fmt.Errorf("failed to publish data to server: %w", err) 76 } 77 return nil 78 } 79 80 err := p.connOperation(op) 81 if err == nil { 82 return nil 83 } 84 85 // we can handle a broken pipe by trying to reconnect, but if it's a different error return it 86 if !errors.Is(err, syscall.EPIPE) { 87 return err 88 } 89 90 slog.Info("error is broken pipe") 91 92 if attempt >= 5 { 93 return fmt.Errorf("failed to publish message after max attempts to reconnect (%d): %w", attempt, err) 94 } 95 96 slog.Error("failed to publish message", "error", err) 97 98 conn, connectErr := connect(p.addr) 99 if connectErr != nil { 100 return fmt.Errorf("failed to reconnect after failing to publish message: %w", connectErr) 101 } 102 103 p.conn = conn 104 105 return p.publishMessageWithRetry(message, attempt+1) 106} 107 108func (p *Publisher) connOperation(op connOpp) error { 109 p.connMu.Lock() 110 defer p.connMu.Unlock() 111 112 return op(p.conn) 113}