An experimental pub/sub client and server project.
1package client
2
3import (
4 "context"
5 "encoding/binary"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "net"
10 "sync"
11 "time"
12
13 "github.com/willdot/messagebroker/internal/server"
14)
15
16type connOpp func(conn net.Conn) error
17
18// Subscriber allows subscriptions to a server and the consumption of messages
19type Subscriber struct {
20 conn net.Conn
21 connMu sync.Mutex
22}
23
24// NewSubscriber will connect to the server at the given address
25func NewSubscriber(addr string) (*Subscriber, error) {
26 conn, err := net.Dial("tcp", addr)
27 if err != nil {
28 return nil, fmt.Errorf("failed to dial: %w", err)
29 }
30
31 return &Subscriber{
32 conn: conn,
33 }, nil
34}
35
36// Close cleanly shuts down the subscriber
37func (s *Subscriber) Close() error {
38 return s.conn.Close()
39}
40
41// SubscribeToTopics will subscribe to the provided topics
42func (s *Subscriber) SubscribeToTopics(topicNames []string, startAtType server.StartAtType, startAtIndex int) error {
43 op := func(conn net.Conn) error {
44 return subscribeToTopics(conn, topicNames, startAtType, startAtIndex)
45 }
46
47 return s.connOperation(op)
48}
49
50// UnsubscribeToTopics will unsubscribe to the provided topics
51func (s *Subscriber) UnsubscribeToTopics(topicNames []string) error {
52 op := func(conn net.Conn) error {
53 return unsubscribeToTopics(conn, topicNames)
54 }
55
56 return s.connOperation(op)
57}
58
59func subscribeToTopics(conn net.Conn, topicNames []string, startAtType server.StartAtType, startAtIndex int) error {
60 actionB := make([]byte, 2)
61 binary.BigEndian.PutUint16(actionB, uint16(server.Subscribe))
62 headers := actionB
63
64 b, err := json.Marshal(topicNames)
65 if err != nil {
66 return fmt.Errorf("failed to marshal topic names: %w", err)
67 }
68
69 topicNamesB := make([]byte, 4)
70 binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
71 headers = append(headers, topicNamesB...)
72 headers = append(headers, b...)
73
74 startAtTypeB := make([]byte, 2)
75 binary.BigEndian.PutUint16(startAtTypeB, uint16(startAtType))
76 headers = append(headers, startAtTypeB...)
77
78 if startAtType == server.From {
79 fromB := make([]byte, 2)
80 binary.BigEndian.PutUint16(fromB, uint16(startAtIndex))
81 headers = append(headers, fromB...)
82 }
83
84 _, err = conn.Write(headers)
85 if err != nil {
86 return fmt.Errorf("failed to subscribe to topics: %w", err)
87 }
88
89 var resp server.Status
90 err = binary.Read(conn, binary.BigEndian, &resp)
91 if err != nil {
92 return fmt.Errorf("failed to read confirmation of subscribe: %w", err)
93 }
94
95 if resp == server.Subscribed {
96 return nil
97 }
98
99 var dataLen uint16
100 err = binary.Read(conn, binary.BigEndian, &dataLen)
101 if err != nil {
102 return fmt.Errorf("received status %s:", resp)
103 }
104
105 buf := make([]byte, dataLen)
106 _, err = conn.Read(buf)
107 if err != nil {
108 return fmt.Errorf("received status %s:", resp)
109 }
110
111 return fmt.Errorf("received status %s - %s", resp, buf)
112}
113
114func unsubscribeToTopics(conn net.Conn, topicNames []string) error {
115 actionB := make([]byte, 2)
116 binary.BigEndian.PutUint16(actionB, uint16(server.Unsubscribe))
117 headers := actionB
118
119 b, err := json.Marshal(topicNames)
120 if err != nil {
121 return fmt.Errorf("failed to marshal topic names: %w", err)
122 }
123
124 topicNamesB := make([]byte, 4)
125 binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
126 headers = append(headers, topicNamesB...)
127
128 _, err = conn.Write(append(headers, b...))
129 if err != nil {
130 return fmt.Errorf("failed to unsubscribe to topics: %w", err)
131 }
132
133 var resp server.Status
134 err = binary.Read(conn, binary.BigEndian, &resp)
135 if err != nil {
136 return fmt.Errorf("failed to read confirmation of unsubscribe: %w", err)
137 }
138
139 if resp == server.Unsubscribed {
140 return nil
141 }
142
143 var dataLen uint16
144 err = binary.Read(conn, binary.BigEndian, &dataLen)
145 if err != nil {
146 return fmt.Errorf("received status %s:", resp)
147 }
148
149 buf := make([]byte, dataLen)
150 _, err = conn.Read(buf)
151 if err != nil {
152 return fmt.Errorf("received status %s:", resp)
153 }
154
155 return fmt.Errorf("received status %s - %s", resp, buf)
156}
157
158// Consumer allows the consumption of messages. If during the consumer receiving messages from the
159// server an error occurs, it will be stored in Err
160type Consumer struct {
161 msgs chan *Message
162 // TODO: better error handling? Maybe a channel of errors?
163 Err error
164}
165
166// Messages returns a channel in which this consumer will put messages onto. It is safe to range over the channel since it will be closed once
167// the consumer has finished either due to an error or from being cancelled.
168func (c *Consumer) Messages() <-chan *Message {
169 return c.msgs
170}
171
172// Consume will create a consumer and start it running in a go routine. You can then use the Msgs channel of the consumer
173// to read the messages
174func (s *Subscriber) Consume(ctx context.Context) *Consumer {
175 consumer := &Consumer{
176 msgs: make(chan *Message),
177 }
178
179 go s.consume(ctx, consumer)
180
181 return consumer
182}
183
184func (s *Subscriber) consume(ctx context.Context, consumer *Consumer) {
185 defer close(consumer.msgs)
186 for {
187 if ctx.Err() != nil {
188 return
189 }
190
191 err := s.readMessage(ctx, consumer.msgs)
192 if err != nil {
193 // TODO: if broken pipe, we need to somehow reconnect and subscribe again....YIKES
194 consumer.Err = err
195 return
196 }
197 }
198}
199
200func (s *Subscriber) readMessage(ctx context.Context, msgChan chan *Message) error {
201 op := func(conn net.Conn) error {
202 err := s.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 300))
203 if err != nil {
204 return err
205 }
206
207 var topicLen uint16
208 err = binary.Read(s.conn, binary.BigEndian, &topicLen)
209 if err != nil {
210 // TODO: check if this is needed elsewhere. I'm not sure where the read deadline resets....
211 if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
212 return nil
213 }
214 return err
215 }
216
217 topicBuf := make([]byte, topicLen)
218 _, err = s.conn.Read(topicBuf)
219 if err != nil {
220 return err
221 }
222
223 var dataLen uint64
224 err = binary.Read(s.conn, binary.BigEndian, &dataLen)
225 if err != nil {
226 return err
227 }
228
229 if dataLen <= 0 {
230 return nil
231 }
232
233 dataBuf := make([]byte, dataLen)
234 _, err = s.conn.Read(dataBuf)
235 if err != nil {
236 return err
237 }
238
239 msg := NewMessage(string(topicBuf), dataBuf)
240
241 msgChan <- msg
242
243 var ack bool
244 select {
245 case <-ctx.Done():
246 return ctx.Err()
247 case ack = <-msg.ack:
248 }
249 ackMessage := server.Nack
250 if ack {
251 ackMessage = server.Ack
252 }
253
254 err = binary.Write(s.conn, binary.BigEndian, ackMessage)
255 if err != nil {
256 return fmt.Errorf("failed to ack/nack message: %w", err)
257 }
258
259 return nil
260 }
261
262 err := s.connOperation(op)
263 if err != nil {
264 var neterr net.Error
265 if errors.As(err, &neterr) && neterr.Timeout() {
266 return nil
267 }
268 return err
269 }
270
271 return err
272}
273
274func (s *Subscriber) connOperation(op connOpp) error {
275 s.connMu.Lock()
276 defer s.connMu.Unlock()
277
278 return op(s.conn)
279}