An experimental pub/sub client and server project.
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at af1f01d5491fc5f37811103227380bc96b558e86 279 lines 6.7 kB view raw
1package client 2 3import ( 4 "context" 5 "encoding/binary" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "net" 10 "sync" 11 "time" 12 13 "github.com/willdot/messagebroker/internal/server" 14) 15 16type connOpp func(conn net.Conn) error 17 18// Subscriber allows subscriptions to a server and the consumption of messages 19type Subscriber struct { 20 conn net.Conn 21 connMu sync.Mutex 22} 23 24// NewSubscriber will connect to the server at the given address 25func NewSubscriber(addr string) (*Subscriber, error) { 26 conn, err := net.Dial("tcp", addr) 27 if err != nil { 28 return nil, fmt.Errorf("failed to dial: %w", err) 29 } 30 31 return &Subscriber{ 32 conn: conn, 33 }, nil 34} 35 36// Close cleanly shuts down the subscriber 37func (s *Subscriber) Close() error { 38 return s.conn.Close() 39} 40 41// SubscribeToTopics will subscribe to the provided topics 42func (s *Subscriber) SubscribeToTopics(topicNames []string, startAtType server.StartAtType, startAtIndex int) error { 43 op := func(conn net.Conn) error { 44 return subscribeToTopics(conn, topicNames, startAtType, startAtIndex) 45 } 46 47 return s.connOperation(op) 48} 49 50// UnsubscribeToTopics will unsubscribe to the provided topics 51func (s *Subscriber) UnsubscribeToTopics(topicNames []string) error { 52 op := func(conn net.Conn) error { 53 return unsubscribeToTopics(conn, topicNames) 54 } 55 56 return s.connOperation(op) 57} 58 59func subscribeToTopics(conn net.Conn, topicNames []string, startAtType server.StartAtType, startAtIndex int) error { 60 actionB := make([]byte, 2) 61 binary.BigEndian.PutUint16(actionB, uint16(server.Subscribe)) 62 headers := actionB 63 64 b, err := json.Marshal(topicNames) 65 if err != nil { 66 return fmt.Errorf("failed to marshal topic names: %w", err) 67 } 68 69 topicNamesB := make([]byte, 4) 70 binary.BigEndian.PutUint32(topicNamesB, uint32(len(b))) 71 headers = append(headers, topicNamesB...) 72 headers = append(headers, b...) 73 74 startAtTypeB := make([]byte, 2) 75 binary.BigEndian.PutUint16(startAtTypeB, uint16(startAtType)) 76 headers = append(headers, startAtTypeB...) 77 78 if startAtType == server.From { 79 fromB := make([]byte, 2) 80 binary.BigEndian.PutUint16(fromB, uint16(startAtIndex)) 81 headers = append(headers, fromB...) 82 } 83 84 _, err = conn.Write(headers) 85 if err != nil { 86 return fmt.Errorf("failed to subscribe to topics: %w", err) 87 } 88 89 var resp server.Status 90 err = binary.Read(conn, binary.BigEndian, &resp) 91 if err != nil { 92 return fmt.Errorf("failed to read confirmation of subscribe: %w", err) 93 } 94 95 if resp == server.Subscribed { 96 return nil 97 } 98 99 var dataLen uint16 100 err = binary.Read(conn, binary.BigEndian, &dataLen) 101 if err != nil { 102 return fmt.Errorf("received status %s:", resp) 103 } 104 105 buf := make([]byte, dataLen) 106 _, err = conn.Read(buf) 107 if err != nil { 108 return fmt.Errorf("received status %s:", resp) 109 } 110 111 return fmt.Errorf("received status %s - %s", resp, buf) 112} 113 114func unsubscribeToTopics(conn net.Conn, topicNames []string) error { 115 actionB := make([]byte, 2) 116 binary.BigEndian.PutUint16(actionB, uint16(server.Unsubscribe)) 117 headers := actionB 118 119 b, err := json.Marshal(topicNames) 120 if err != nil { 121 return fmt.Errorf("failed to marshal topic names: %w", err) 122 } 123 124 topicNamesB := make([]byte, 4) 125 binary.BigEndian.PutUint32(topicNamesB, uint32(len(b))) 126 headers = append(headers, topicNamesB...) 127 128 _, err = conn.Write(append(headers, b...)) 129 if err != nil { 130 return fmt.Errorf("failed to unsubscribe to topics: %w", err) 131 } 132 133 var resp server.Status 134 err = binary.Read(conn, binary.BigEndian, &resp) 135 if err != nil { 136 return fmt.Errorf("failed to read confirmation of unsubscribe: %w", err) 137 } 138 139 if resp == server.Unsubscribed { 140 return nil 141 } 142 143 var dataLen uint16 144 err = binary.Read(conn, binary.BigEndian, &dataLen) 145 if err != nil { 146 return fmt.Errorf("received status %s:", resp) 147 } 148 149 buf := make([]byte, dataLen) 150 _, err = conn.Read(buf) 151 if err != nil { 152 return fmt.Errorf("received status %s:", resp) 153 } 154 155 return fmt.Errorf("received status %s - %s", resp, buf) 156} 157 158// Consumer allows the consumption of messages. If during the consumer receiving messages from the 159// server an error occurs, it will be stored in Err 160type Consumer struct { 161 msgs chan *Message 162 // TODO: better error handling? Maybe a channel of errors? 163 Err error 164} 165 166// Messages returns a channel in which this consumer will put messages onto. It is safe to range over the channel since it will be closed once 167// the consumer has finished either due to an error or from being cancelled. 168func (c *Consumer) Messages() <-chan *Message { 169 return c.msgs 170} 171 172// Consume will create a consumer and start it running in a go routine. You can then use the Msgs channel of the consumer 173// to read the messages 174func (s *Subscriber) Consume(ctx context.Context) *Consumer { 175 consumer := &Consumer{ 176 msgs: make(chan *Message), 177 } 178 179 go s.consume(ctx, consumer) 180 181 return consumer 182} 183 184func (s *Subscriber) consume(ctx context.Context, consumer *Consumer) { 185 defer close(consumer.msgs) 186 for { 187 if ctx.Err() != nil { 188 return 189 } 190 191 err := s.readMessage(ctx, consumer.msgs) 192 if err != nil { 193 // TODO: if broken pipe, we need to somehow reconnect and subscribe again....YIKES 194 consumer.Err = err 195 return 196 } 197 } 198} 199 200func (s *Subscriber) readMessage(ctx context.Context, msgChan chan *Message) error { 201 op := func(conn net.Conn) error { 202 err := s.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 300)) 203 if err != nil { 204 return err 205 } 206 207 var topicLen uint16 208 err = binary.Read(s.conn, binary.BigEndian, &topicLen) 209 if err != nil { 210 // TODO: check if this is needed elsewhere. I'm not sure where the read deadline resets.... 211 if neterr, ok := err.(net.Error); ok && neterr.Timeout() { 212 return nil 213 } 214 return err 215 } 216 217 topicBuf := make([]byte, topicLen) 218 _, err = s.conn.Read(topicBuf) 219 if err != nil { 220 return err 221 } 222 223 var dataLen uint64 224 err = binary.Read(s.conn, binary.BigEndian, &dataLen) 225 if err != nil { 226 return err 227 } 228 229 if dataLen <= 0 { 230 return nil 231 } 232 233 dataBuf := make([]byte, dataLen) 234 _, err = s.conn.Read(dataBuf) 235 if err != nil { 236 return err 237 } 238 239 msg := NewMessage(string(topicBuf), dataBuf) 240 241 msgChan <- msg 242 243 var ack bool 244 select { 245 case <-ctx.Done(): 246 return ctx.Err() 247 case ack = <-msg.ack: 248 } 249 ackMessage := server.Nack 250 if ack { 251 ackMessage = server.Ack 252 } 253 254 err = binary.Write(s.conn, binary.BigEndian, ackMessage) 255 if err != nil { 256 return fmt.Errorf("failed to ack/nack message: %w", err) 257 } 258 259 return nil 260 } 261 262 err := s.connOperation(op) 263 if err != nil { 264 var neterr net.Error 265 if errors.As(err, &neterr) && neterr.Timeout() { 266 return nil 267 } 268 return err 269 } 270 271 return err 272} 273 274func (s *Subscriber) connOperation(op connOpp) error { 275 s.connMu.Lock() 276 defer s.connMu.Unlock() 277 278 return op(s.conn) 279}