An experimental pub/sub client and server project.
1package client
2
3import (
4 "encoding/binary"
5 "errors"
6 "fmt"
7 "log/slog"
8 "net"
9 "sync"
10 "syscall"
11
12 "github.com/willdot/messagebroker/internal/server"
13)
14
15// Publisher allows messages to be published to a server
16type Publisher struct {
17 conn net.Conn
18 connMu sync.Mutex
19 addr string
20}
21
22// NewPublisher connects to the server at the given address and registers as a publisher
23func NewPublisher(addr string) (*Publisher, error) {
24 conn, err := connect(addr)
25 if err != nil {
26 return nil, fmt.Errorf("failed to connect to server: %w", err)
27 }
28
29 return &Publisher{
30 conn: conn,
31 addr: addr,
32 }, nil
33}
34
35func connect(addr string) (net.Conn, error) {
36 conn, err := net.Dial("tcp", addr)
37 if err != nil {
38 return nil, fmt.Errorf("failed to dial: %w", err)
39 }
40
41 err = binary.Write(conn, binary.BigEndian, server.Publish)
42 if err != nil {
43 conn.Close()
44 return nil, fmt.Errorf("failed to register publish to server: %w", err)
45 }
46 return conn, nil
47}
48
49// Close cleanly shuts down the publisher
50func (p *Publisher) Close() error {
51 return p.conn.Close()
52}
53
54// Publish will publish the given message to the server
55func (p *Publisher) PublishMessage(message *Message) error {
56 return p.publishMessageWithRetry(message, 0)
57}
58
59func (p *Publisher) publishMessageWithRetry(message *Message, attempt int) error {
60 op := func(conn net.Conn) error {
61 // send topic first
62 topic := fmt.Sprintf("topic:%s", message.Topic)
63
64 topicLenB := make([]byte, 2)
65 binary.BigEndian.PutUint16(topicLenB, uint16(len(topic)))
66
67 headers := append(topicLenB, []byte(topic)...)
68
69 messageLenB := make([]byte, 4)
70 binary.BigEndian.PutUint32(messageLenB, uint32(len(message.Data)))
71 headers = append(headers, messageLenB...)
72
73 _, err := conn.Write(append(headers, message.Data...))
74 if err != nil {
75 return fmt.Errorf("failed to publish data to server: %w", err)
76 }
77 return nil
78 }
79
80 err := p.connOperation(op)
81 if err == nil {
82 return nil
83 }
84
85 // we can handle a broken pipe by trying to reconnect, but if it's a different error return it
86 if !errors.Is(err, syscall.EPIPE) {
87 return err
88 }
89
90 slog.Info("error is broken pipe")
91
92 if attempt >= 5 {
93 return fmt.Errorf("failed to publish message after max attempts to reconnect (%d): %w", attempt, err)
94 }
95
96 slog.Error("failed to publish message", "error", err)
97
98 conn, connectErr := connect(p.addr)
99 if connectErr != nil {
100 return fmt.Errorf("failed to reconnect after failing to publish message: %w", connectErr)
101 }
102
103 p.conn = conn
104
105 return p.publishMessageWithRetry(message, attempt+1)
106}
107
108func (p *Publisher) connOperation(op connOpp) error {
109 p.connMu.Lock()
110 defer p.connMu.Unlock()
111
112 return op(p.conn)
113}