An experimental pub/sub client and server project.

nack/ack

+1 -4
example/main.go
··· 59 i := 0 60 for { 61 i++ 62 - msg := pubsub.Message{ 63 - Topic: "topic a", 64 - Data: []byte(fmt.Sprintf("message %d", i)), 65 - } 66 67 err = publisher.PublishMessage(msg) 68 if err != nil {
··· 59 i := 0 60 for { 61 i++ 62 + msg := pubsub.NewMessage("topic a", []byte(fmt.Sprintf("message %d", i))) 63 64 err = publisher.PublishMessage(msg) 65 if err != nil {
+2 -1
example/server/main.go
··· 5 "os" 6 "os/signal" 7 "syscall" 8 9 "github.com/willdot/messagebroker/server" 10 ) 11 12 func main() { 13 - srv, err := server.New(":3000") 14 if err != nil { 15 log.Fatal(err) 16 }
··· 5 "os" 6 "os/signal" 7 "syscall" 8 + "time" 9 10 "github.com/willdot/messagebroker/server" 11 ) 12 13 func main() { 14 + srv, err := server.New(":3000", time.Second, time.Second*2) 15 if err != nil { 16 log.Fatal(err) 17 }
+16
pubsub/message.go
··· 4 type Message struct { 5 Topic string `json:"topic"` 6 Data []byte `json:"data"` 7 }
··· 4 type Message struct { 5 Topic string `json:"topic"` 6 Data []byte `json:"data"` 7 + 8 + ack chan bool 9 + } 10 + 11 + // NewMessage creates a new message 12 + func NewMessage(topic string, data []byte) *Message { 13 + return &Message{ 14 + Topic: topic, 15 + Data: data, 16 + ack: make(chan bool), 17 + } 18 + } 19 + 20 + // Ack will send the provided value of the ack to the server 21 + func (m *Message) Ack(ack bool) { 22 + m.ack <- ack 23 }
+1 -1
pubsub/publisher.go
··· 39 } 40 41 // Publish will publish the given message to the server 42 - func (p *Publisher) PublishMessage(message Message) error { 43 op := func(conn net.Conn) error { 44 // send topic first 45 topic := fmt.Sprintf("topic:%s", message.Topic)
··· 39 } 40 41 // Publish will publish the given message to the server 42 + func (p *Publisher) PublishMessage(message *Message) error { 43 op := func(conn net.Conn) error { 44 // send topic first 45 topic := fmt.Sprintf("topic:%s", message.Topic)
+23 -17
pubsub/subscriber.go
··· 143 // Consumer allows the consumption of messages. If during the consumer receiving messages from the 144 // server an error occurs, it will be stored in Err 145 type Consumer struct { 146 - msgs chan Message 147 // TODO: better error handling? Maybe a channel of errors? 148 Err error 149 } 150 151 // Messages returns a channel in which this consumer will put messages onto. It is safe to range over the channel since it will be closed once 152 // the consumer has finished either due to an error or from being cancelled. 153 - func (c *Consumer) Messages() <-chan Message { 154 return c.msgs 155 } 156 ··· 158 // to read the messages 159 func (s *Subscriber) Consume(ctx context.Context) *Consumer { 160 consumer := &Consumer{ 161 - msgs: make(chan Message), 162 } 163 164 go s.consume(ctx, consumer) ··· 173 return 174 } 175 176 - msg, err := s.readMessage() 177 if err != nil { 178 consumer.Err = err 179 return 180 } 181 - 182 - if msg != nil { 183 - consumer.msgs <- *msg 184 - } 185 } 186 } 187 188 - func (s *Subscriber) readMessage() (*Message, error) { 189 - var msg *Message 190 op := func(conn net.Conn) error { 191 err := s.conn.SetReadDeadline(time.Now().Add(time.Second)) 192 if err != nil { ··· 225 return err 226 } 227 228 - msg = &Message{ 229 - Data: dataBuf, 230 - Topic: string(topicBuf), 231 } 232 233 - return nil 234 235 } 236 237 err := s.connOperation(op) 238 if err != nil { 239 var neterr net.Error 240 if errors.As(err, &neterr) && neterr.Timeout() { 241 - return nil, nil 242 } 243 - return nil, err 244 } 245 246 - return msg, err 247 } 248 249 func (s *Subscriber) connOperation(op connOpp) error {
··· 143 // Consumer allows the consumption of messages. If during the consumer receiving messages from the 144 // server an error occurs, it will be stored in Err 145 type Consumer struct { 146 + msgs chan *Message 147 // TODO: better error handling? Maybe a channel of errors? 148 Err error 149 } 150 151 // Messages returns a channel in which this consumer will put messages onto. It is safe to range over the channel since it will be closed once 152 // the consumer has finished either due to an error or from being cancelled. 153 + func (c *Consumer) Messages() <-chan *Message { 154 return c.msgs 155 } 156 ··· 158 // to read the messages 159 func (s *Subscriber) Consume(ctx context.Context) *Consumer { 160 consumer := &Consumer{ 161 + msgs: make(chan *Message), 162 } 163 164 go s.consume(ctx, consumer) ··· 173 return 174 } 175 176 + err := s.readMessage(consumer.msgs) 177 if err != nil { 178 consumer.Err = err 179 return 180 } 181 } 182 } 183 184 + func (s *Subscriber) readMessage(msgChan chan *Message) error { 185 + // var msg *Message 186 op := func(conn net.Conn) error { 187 err := s.conn.SetReadDeadline(time.Now().Add(time.Second)) 188 if err != nil { ··· 221 return err 222 } 223 224 + msg := NewMessage(string(topicBuf), dataBuf) 225 + 226 + msgChan <- msg 227 + 228 + ack := <-msg.ack 229 + 230 + ackMessage := server.Nack 231 + if ack { 232 + ackMessage = server.Ack 233 } 234 235 + err = binary.Write(s.conn, binary.BigEndian, ackMessage) 236 + if err != nil { 237 + return fmt.Errorf("failed to ack/nack message: %w", err) 238 + } 239 240 + return nil 241 } 242 243 err := s.connOperation(op) 244 if err != nil { 245 var neterr net.Error 246 if errors.As(err, &neterr) && neterr.Timeout() { 247 + return nil 248 } 249 + return err 250 } 251 252 + return err 253 } 254 255 func (s *Subscriber) connOperation(op connOpp) error {
+88 -36
pubsub/subscriber_test.go
··· 19 ) 20 21 func createServer(t *testing.T) { 22 - server, err := server.New(serverAddr) 23 require.NoError(t, err) 24 25 t.Cleanup(func() { ··· 105 consumer := sub.Consume(ctx) 106 require.NoError(t, err) 107 108 - var receivedMessages []Message 109 consumerFinCh := make(chan struct{}) 110 go func() { 111 for msg := range consumer.Messages() { 112 receivedMessages = append(receivedMessages, msg) 113 } 114 115 - require.NoError(t, err) 116 consumerFinCh <- struct{}{} 117 }() 118 ··· 125 publisher.Close() 126 }) 127 128 - msg := Message{ 129 - Topic: topicA, 130 - Data: []byte("hello world"), 131 - } 132 133 err = publisher.PublishMessage(msg) 134 require.NoError(t, err) ··· 151 } 152 153 func TestPublishAndSubscribe(t *testing.T) { 154 - createServer(t) 155 - 156 - sub, err := NewSubscriber(serverAddr) 157 - require.NoError(t, err) 158 - 159 - t.Cleanup(func() { 160 - sub.Close() 161 - }) 162 - 163 - topics := []string{topicA, topicB} 164 - 165 - err = sub.SubscribeToTopics(topics) 166 - require.NoError(t, err) 167 - 168 - ctx, cancel := context.WithCancel(context.Background()) 169 - t.Cleanup(func() { 170 - cancel() 171 - }) 172 - 173 - consumer := sub.Consume(ctx) 174 - require.NoError(t, err) 175 176 - var receivedMessages []Message 177 178 consumerFinCh := make(chan struct{}) 179 go func() { 180 for msg := range consumer.Messages() { 181 receivedMessages = append(receivedMessages, msg) 182 } 183 184 - require.NoError(t, err) 185 consumerFinCh <- struct{}{} 186 }() 187 ··· 192 }) 193 194 // send some messages 195 - sentMessages := make([]Message, 0, 10) 196 for i := 0; i < 10; i++ { 197 - msg := Message{ 198 - Topic: topicA, 199 - Data: []byte(fmt.Sprintf("message %d", i)), 200 - } 201 202 sentMessages = append(sentMessages, msg) 203 ··· 212 select { 213 case <-consumerFinCh: 214 break 215 - case <-time.After(time.Second): 216 t.Fatal("timed out waiting for consumer to read messages") 217 } 218 219 assert.ElementsMatch(t, receivedMessages, sentMessages) 220 }
··· 19 ) 20 21 func createServer(t *testing.T) { 22 + server, err := server.New(serverAddr, time.Millisecond*100, time.Millisecond*100) 23 require.NoError(t, err) 24 25 t.Cleanup(func() { ··· 105 consumer := sub.Consume(ctx) 106 require.NoError(t, err) 107 108 + var receivedMessages []*Message 109 consumerFinCh := make(chan struct{}) 110 go func() { 111 for msg := range consumer.Messages() { 112 + msg.Ack(true) 113 receivedMessages = append(receivedMessages, msg) 114 } 115 116 consumerFinCh <- struct{}{} 117 }() 118 ··· 125 publisher.Close() 126 }) 127 128 + msg := NewMessage(topicA, []byte("hello world")) 129 130 err = publisher.PublishMessage(msg) 131 require.NoError(t, err) ··· 148 } 149 150 func TestPublishAndSubscribe(t *testing.T) { 151 + consumer, cancel := setupConsumer(t) 152 153 + var receivedMessages []*Message 154 155 consumerFinCh := make(chan struct{}) 156 go func() { 157 for msg := range consumer.Messages() { 158 + msg.Ack(true) 159 receivedMessages = append(receivedMessages, msg) 160 } 161 162 consumerFinCh <- struct{}{} 163 }() 164 ··· 169 }) 170 171 // send some messages 172 + sentMessages := make([]*Message, 0, 10) 173 for i := 0; i < 10; i++ { 174 + msg := NewMessage(topicA, []byte(fmt.Sprintf("message %d", i))) 175 176 sentMessages = append(sentMessages, msg) 177 ··· 186 select { 187 case <-consumerFinCh: 188 break 189 + case <-time.After(time.Second * 5): 190 t.Fatal("timed out waiting for consumer to read messages") 191 } 192 193 + // THIS IS SO HACKY 194 + for _, msg := range receivedMessages { 195 + msg.ack = nil 196 + } 197 + 198 + for _, msg := range sentMessages { 199 + msg.ack = nil 200 + } 201 + 202 assert.ElementsMatch(t, receivedMessages, sentMessages) 203 } 204 + 205 + func TestPublishAndSubscribeNackMessage(t *testing.T) { 206 + consumer, cancel := setupConsumer(t) 207 + 208 + var receivedMessages []*Message 209 + 210 + consumerFinCh := make(chan struct{}) 211 + timesMsgWasReceived := 0 212 + go func() { 213 + for msg := range consumer.Messages() { 214 + msg.Ack(false) 215 + timesMsgWasReceived++ 216 + } 217 + 218 + consumerFinCh <- struct{}{} 219 + }() 220 + 221 + publisher, err := NewPublisher("localhost:9999") 222 + require.NoError(t, err) 223 + t.Cleanup(func() { 224 + publisher.Close() 225 + }) 226 + 227 + // send a message 228 + msg := NewMessage(topicA, []byte("hello world")) 229 + 230 + err = publisher.PublishMessage(msg) 231 + require.NoError(t, err) 232 + 233 + // give the consumer some time to read the messages -- TODO: make better! 234 + time.Sleep(time.Millisecond * 500) 235 + cancel() 236 + 237 + select { 238 + case <-consumerFinCh: 239 + break 240 + case <-time.After(time.Second * 5): 241 + t.Fatal("timed out waiting for consumer to read messages") 242 + } 243 + 244 + assert.Empty(t, receivedMessages) 245 + assert.Equal(t, 5, timesMsgWasReceived) 246 + } 247 + 248 + func setupConsumer(t *testing.T) (*Consumer, context.CancelFunc) { 249 + createServer(t) 250 + 251 + sub, err := NewSubscriber(serverAddr) 252 + require.NoError(t, err) 253 + 254 + t.Cleanup(func() { 255 + sub.Close() 256 + }) 257 + 258 + topics := []string{topicA, topicB} 259 + 260 + err = sub.SubscribeToTopics(topics) 261 + require.NoError(t, err) 262 + 263 + ctx, cancel := context.WithCancel(context.Background()) 264 + t.Cleanup(func() { 265 + cancel() 266 + }) 267 + 268 + consumer := sub.Consume(ctx) 269 + require.NoError(t, err) 270 + 271 + return consumer, cancel 272 + }
+17 -9
server/server.go
··· 23 Subscribe Action = 1 24 Unsubscribe Action = 2 25 Publish Action = 3 26 ) 27 28 // Status represents the status of a request ··· 54 55 mu sync.Mutex 56 topics map[string]*topic 57 } 58 59 // New creates and starts a new server 60 - func New(Addr string) (*Server, error) { 61 lis, err := net.Listen("tcp", Addr) 62 if err != nil { 63 return nil, fmt.Errorf("failed to listen: %w", err) 64 } 65 66 srv := &Server{ 67 - lis: lis, 68 - topics: map[string]*topic{}, 69 } 70 71 go srv.start() ··· 337 t = newTopic(topicName) 338 } 339 340 - t.subscriptions[peer.Addr()] = subscriber{ 341 - peer: peer, 342 - currentOffset: 0, 343 - } 344 345 s.topics[topicName] = t 346 } ··· 388 var action Action 389 op := func(conn net.Conn) error { 390 if timeout > 0 { 391 - err := conn.SetReadDeadline(time.Now().Add(timeout)) 392 - if err != nil { 393 slog.Error("failed to set connection read deadline", "error", err, "peer", peer.Addr()) 394 } 395 } 396 397 err := binary.Read(conn, binary.BigEndian, &action)
··· 23 Subscribe Action = 1 24 Unsubscribe Action = 2 25 Publish Action = 3 26 + Ack Action = 4 27 + Nack Action = 5 28 ) 29 30 // Status represents the status of a request ··· 56 57 mu sync.Mutex 58 topics map[string]*topic 59 + 60 + ackDelay time.Duration 61 + ackTimeout time.Duration 62 } 63 64 // New creates and starts a new server 65 + func New(Addr string, ackDelay, ackTimeout time.Duration) (*Server, error) { 66 lis, err := net.Listen("tcp", Addr) 67 if err != nil { 68 return nil, fmt.Errorf("failed to listen: %w", err) 69 } 70 71 srv := &Server{ 72 + lis: lis, 73 + topics: map[string]*topic{}, 74 + ackDelay: ackDelay, 75 + ackTimeout: ackTimeout, 76 } 77 78 go srv.start() ··· 344 t = newTopic(topicName) 345 } 346 347 + t.subscriptions[peer.Addr()] = newSubscriber(peer, topicName, s.ackDelay, s.ackTimeout) 348 349 s.topics[topicName] = t 350 } ··· 392 var action Action 393 op := func(conn net.Conn) error { 394 if timeout > 0 { 395 + if err := conn.SetReadDeadline(time.Now().Add(timeout)); err != nil { 396 slog.Error("failed to set connection read deadline", "error", err, "peer", peer.Addr()) 397 } 398 + defer func() { 399 + if err := conn.SetReadDeadline(time.Time{}); err != nil { 400 + slog.Error("failed to reset connection read deadline", "error", err, "peer", peer.Addr()) 401 + } 402 + }() 403 } 404 405 err := binary.Read(conn, binary.BigEndian, &action)
+217 -2
server/server_test.go
··· 18 topicC = "topic c" 19 20 serverAddr = ":6666" 21 ) 22 23 func createServer(t *testing.T) *Server { 24 - srv, err := New(serverAddr) 25 require.NoError(t, err) 26 27 t.Cleanup(func() { ··· 35 srv := createServer(t) 36 srv.topics[topicName] = &topic{ 37 name: topicName, 38 - subscriptions: make(map[net.Addr]subscriber), 39 } 40 41 return srv ··· 268 buf := make([]byte, dataLen) 269 n, err := conn.Read(buf) 270 require.NoError(t, err) 271 require.Equal(t, int(dataLen), n) 272 273 assert.Equal(t, messageData, string(buf)) 274 } 275 } 276 ··· 314 require.Equal(t, int(dataLen), n) 315 316 results = append(results, string(buf)) 317 } 318 319 assert.ElementsMatch(t, results, messages) ··· 346 t.Fatal(fmt.Errorf("timed out waiting for subscriber to read messages")) 347 } 348 }
··· 18 topicC = "topic c" 19 20 serverAddr = ":6666" 21 + 22 + ackDelay = time.Millisecond * 100 23 + ackTimeout = time.Millisecond * 100 24 ) 25 26 func createServer(t *testing.T) *Server { 27 + srv, err := New(serverAddr, ackDelay, ackTimeout) 28 require.NoError(t, err) 29 30 t.Cleanup(func() { ··· 38 srv := createServer(t) 39 srv.topics[topicName] = &topic{ 40 name: topicName, 41 + subscriptions: make(map[net.Addr]*subscriber), 42 } 43 44 return srv ··· 271 buf := make([]byte, dataLen) 272 n, err := conn.Read(buf) 273 require.NoError(t, err) 274 + 275 require.Equal(t, int(dataLen), n) 276 277 assert.Equal(t, messageData, string(buf)) 278 + 279 + err = binary.Write(conn, binary.BigEndian, Ack) 280 + require.NoError(t, err) 281 } 282 } 283 ··· 321 require.Equal(t, int(dataLen), n) 322 323 results = append(results, string(buf)) 324 + 325 + err = binary.Write(subscriberConn, binary.BigEndian, Ack) 326 + require.NoError(t, err) 327 } 328 329 assert.ElementsMatch(t, results, messages) ··· 356 t.Fatal(fmt.Errorf("timed out waiting for subscriber to read messages")) 357 } 358 } 359 + 360 + func TestSendsDataToTopicSubscriberNacksThenAcks(t *testing.T) { 361 + _ = createServer(t) 362 + 363 + subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}) 364 + 365 + publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 366 + require.NoError(t, err) 367 + 368 + err = binary.Write(publisherConn, binary.BigEndian, Publish) 369 + require.NoError(t, err) 370 + 371 + topic := fmt.Sprintf("topic:%s", topicA) 372 + messageData := "hello world" 373 + 374 + // send topic first 375 + err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic))) 376 + require.NoError(t, err) 377 + _, err = publisherConn.Write([]byte(topic)) 378 + require.NoError(t, err) 379 + 380 + // now send the data 381 + err = binary.Write(publisherConn, binary.BigEndian, uint32(len(messageData))) 382 + require.NoError(t, err) 383 + n, err := publisherConn.Write([]byte(messageData)) 384 + require.NoError(t, err) 385 + require.Equal(t, len(messageData), n) 386 + 387 + // check the subsribers got the data 388 + readMessage := func(conn net.Conn, ack Action) { 389 + var topicLen uint64 390 + err = binary.Read(conn, binary.BigEndian, &topicLen) 391 + require.NoError(t, err) 392 + 393 + topicBuf := make([]byte, topicLen) 394 + _, err = conn.Read(topicBuf) 395 + require.NoError(t, err) 396 + assert.Equal(t, topicA, string(topicBuf)) 397 + 398 + var dataLen uint64 399 + err = binary.Read(conn, binary.BigEndian, &dataLen) 400 + require.NoError(t, err) 401 + 402 + buf := make([]byte, dataLen) 403 + n, err := conn.Read(buf) 404 + require.NoError(t, err) 405 + 406 + require.Equal(t, int(dataLen), n) 407 + 408 + assert.Equal(t, messageData, string(buf)) 409 + 410 + err = binary.Write(conn, binary.BigEndian, ack) 411 + require.NoError(t, err) 412 + } 413 + 414 + // NACK the message and then ack it 415 + readMessage(subscriberConn, Nack) 416 + readMessage(subscriberConn, Ack) 417 + // reading for another message should now timeout but give enough time for the ack delay to kick in 418 + // should the second read of the message not have been ack'd properly 419 + var topicLen uint64 420 + _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100)) 421 + err = binary.Read(subscriberConn, binary.BigEndian, &topicLen) 422 + require.Error(t, err) 423 + } 424 + 425 + func TestSendsDataToTopicSubscriberDoesntAckMessage(t *testing.T) { 426 + _ = createServer(t) 427 + 428 + subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}) 429 + 430 + publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 431 + require.NoError(t, err) 432 + 433 + err = binary.Write(publisherConn, binary.BigEndian, Publish) 434 + require.NoError(t, err) 435 + 436 + topic := fmt.Sprintf("topic:%s", topicA) 437 + messageData := "hello world" 438 + 439 + // send topic first 440 + err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic))) 441 + require.NoError(t, err) 442 + _, err = publisherConn.Write([]byte(topic)) 443 + require.NoError(t, err) 444 + 445 + // now send the data 446 + err = binary.Write(publisherConn, binary.BigEndian, uint32(len(messageData))) 447 + require.NoError(t, err) 448 + n, err := publisherConn.Write([]byte(messageData)) 449 + require.NoError(t, err) 450 + require.Equal(t, len(messageData), n) 451 + 452 + // check the subsribers got the data 453 + readMessage := func(conn net.Conn, ack bool) { 454 + var topicLen uint64 455 + err = binary.Read(conn, binary.BigEndian, &topicLen) 456 + require.NoError(t, err) 457 + 458 + topicBuf := make([]byte, topicLen) 459 + _, err = conn.Read(topicBuf) 460 + require.NoError(t, err) 461 + assert.Equal(t, topicA, string(topicBuf)) 462 + 463 + var dataLen uint64 464 + err = binary.Read(conn, binary.BigEndian, &dataLen) 465 + require.NoError(t, err) 466 + 467 + buf := make([]byte, dataLen) 468 + n, err := conn.Read(buf) 469 + require.NoError(t, err) 470 + 471 + require.Equal(t, int(dataLen), n) 472 + 473 + assert.Equal(t, messageData, string(buf)) 474 + 475 + if ack { 476 + err = binary.Write(conn, binary.BigEndian, Ack) 477 + require.NoError(t, err) 478 + return 479 + } 480 + } 481 + 482 + // don't send ack or nack and then ack on the second attempt 483 + readMessage(subscriberConn, false) 484 + readMessage(subscriberConn, true) 485 + 486 + // reading for another message should now timeout but give enough time for the ack delay to kick in 487 + // should the second read of the message not have been ack'd properly 488 + var topicLen uint64 489 + _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100)) 490 + err = binary.Read(subscriberConn, binary.BigEndian, &topicLen) 491 + require.Error(t, err) 492 + } 493 + 494 + func TestSendsDataToTopicSubscriberDeliveryCountTooHighWithNoAck(t *testing.T) { 495 + _ = createServer(t) 496 + 497 + subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}) 498 + 499 + publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 500 + require.NoError(t, err) 501 + 502 + err = binary.Write(publisherConn, binary.BigEndian, Publish) 503 + require.NoError(t, err) 504 + 505 + topic := fmt.Sprintf("topic:%s", topicA) 506 + messageData := "hello world" 507 + 508 + // send topic first 509 + err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic))) 510 + require.NoError(t, err) 511 + _, err = publisherConn.Write([]byte(topic)) 512 + require.NoError(t, err) 513 + 514 + // now send the data 515 + err = binary.Write(publisherConn, binary.BigEndian, uint32(len(messageData))) 516 + require.NoError(t, err) 517 + n, err := publisherConn.Write([]byte(messageData)) 518 + require.NoError(t, err) 519 + require.Equal(t, len(messageData), n) 520 + 521 + // check the subsribers got the data 522 + readMessage := func(conn net.Conn, ack bool) { 523 + var topicLen uint64 524 + err = binary.Read(conn, binary.BigEndian, &topicLen) 525 + require.NoError(t, err) 526 + 527 + topicBuf := make([]byte, topicLen) 528 + _, err = conn.Read(topicBuf) 529 + require.NoError(t, err) 530 + assert.Equal(t, topicA, string(topicBuf)) 531 + 532 + var dataLen uint64 533 + err = binary.Read(conn, binary.BigEndian, &dataLen) 534 + require.NoError(t, err) 535 + 536 + buf := make([]byte, dataLen) 537 + n, err := conn.Read(buf) 538 + require.NoError(t, err) 539 + 540 + require.Equal(t, int(dataLen), n) 541 + 542 + assert.Equal(t, messageData, string(buf)) 543 + 544 + if ack { 545 + err = binary.Write(conn, binary.BigEndian, Ack) 546 + require.NoError(t, err) 547 + return 548 + } 549 + } 550 + 551 + // nack the message 5 times 552 + readMessage(subscriberConn, false) 553 + readMessage(subscriberConn, false) 554 + readMessage(subscriberConn, false) 555 + readMessage(subscriberConn, false) 556 + readMessage(subscriberConn, false) 557 + 558 + // reading for the message should now timeout as we have nack'd the message too many times 559 + var topicLen uint64 560 + _ = subscriberConn.SetReadDeadline(time.Now().Add(ackDelay + time.Millisecond*100)) 561 + err = binary.Read(subscriberConn, binary.BigEndian, &topicLen) 562 + require.Error(t, err) 563 + }
+124
server/subscriber.go
···
··· 1 + package server 2 + 3 + import ( 4 + "encoding/binary" 5 + "fmt" 6 + "log/slog" 7 + "net" 8 + "time" 9 + 10 + "github.com/willdot/messagebroker/server/peer" 11 + ) 12 + 13 + type subscriber struct { 14 + peer *peer.Peer 15 + topic string 16 + messages chan message 17 + 18 + ackDelay time.Duration 19 + ackTimeout time.Duration 20 + } 21 + 22 + type message struct { 23 + data []byte 24 + deliveryCount int 25 + } 26 + 27 + func newMessage(data []byte) message { 28 + return message{data: data, deliveryCount: 1} 29 + } 30 + 31 + func newSubscriber(peer *peer.Peer, topic string, ackDelay, ackTimeout time.Duration) *subscriber { 32 + s := &subscriber{ 33 + peer: peer, 34 + topic: topic, 35 + messages: make(chan message), 36 + ackDelay: ackDelay, 37 + ackTimeout: ackTimeout, 38 + } 39 + 40 + go s.sendMessages() 41 + 42 + return s 43 + } 44 + 45 + func (s *subscriber) sendMessages() { 46 + // TODO: should think about how to break out of this if the subsciber closes its connection etc 47 + for msg := range s.messages { 48 + ack, err := s.sendMessage(s.topic, msg) 49 + if err != nil { 50 + slog.Error("failed to send to message", "error", err, "peer", s.peer.Addr()) 51 + } 52 + 53 + if ack { 54 + continue 55 + } 56 + 57 + if msg.deliveryCount >= 5 { 58 + slog.Error("max delivery count for message. Dropping", "peer", s.peer.Addr()) 59 + continue 60 + } 61 + 62 + msg.deliveryCount++ 63 + s.addMessage(msg, s.ackDelay) 64 + } 65 + } 66 + 67 + func (s *subscriber) addMessage(msg message, delay time.Duration) { 68 + go func() { 69 + time.Sleep(delay) 70 + // TODO: should think about how to break out of this if the subsciber closes its connection etc 71 + s.messages <- msg 72 + }() 73 + } 74 + 75 + func (s *subscriber) sendMessage(topic string, msg message) (bool, error) { 76 + var ack bool 77 + op := func(conn net.Conn) error { 78 + topicLen := uint64(len(topic)) 79 + err := binary.Write(conn, binary.BigEndian, topicLen) 80 + if err != nil { 81 + return fmt.Errorf("failed to send topic length: %w", err) 82 + } 83 + _, err = conn.Write([]byte(topic)) 84 + if err != nil { 85 + return fmt.Errorf("failed to send topic: %w", err) 86 + } 87 + 88 + dataLen := uint64(len(msg.data)) 89 + 90 + err = binary.Write(conn, binary.BigEndian, dataLen) 91 + if err != nil { 92 + return fmt.Errorf("failed to send data length: %w", err) 93 + } 94 + 95 + _, err = conn.Write(msg.data) 96 + if err != nil { 97 + return fmt.Errorf("failed to write to peer: %w", err) 98 + } 99 + 100 + var ackRes Action 101 + if err := conn.SetReadDeadline(time.Now().Add(s.ackTimeout)); err != nil { 102 + slog.Error("failed to set connection read deadline", "error", err, "peer", s.peer.Addr()) 103 + } 104 + defer func() { 105 + if err := conn.SetReadDeadline(time.Time{}); err != nil { 106 + slog.Error("failed to reset connection read deadline", "error", err, "peer", s.peer.Addr()) 107 + } 108 + }() 109 + err = binary.Read(conn, binary.BigEndian, &ackRes) 110 + if err != nil { 111 + return fmt.Errorf("failed to read ack from peer: %w", err) 112 + } 113 + 114 + if ackRes == Ack { 115 + ack = true 116 + } 117 + 118 + return nil 119 + } 120 + 121 + err := s.peer.RunConnOperation(op) 122 + 123 + return ack, err 124 + }
+3 -57
server/topic.go
··· 1 package server 2 3 import ( 4 - "encoding/binary" 5 - "fmt" 6 - "log/slog" 7 "net" 8 "sync" 9 - 10 - "github.com/willdot/messagebroker/server/peer" 11 ) 12 13 type topic struct { 14 name string 15 - subscriptions map[net.Addr]subscriber 16 mu sync.Mutex 17 } 18 19 - type subscriber struct { 20 - peer *peer.Peer 21 - currentOffset int 22 - } 23 - 24 func newTopic(name string) *topic { 25 return &topic{ 26 name: name, 27 - subscriptions: make(map[net.Addr]subscriber), 28 } 29 } 30 ··· 33 subscribers := t.subscriptions 34 t.mu.Unlock() 35 36 - var wg sync.WaitGroup 37 - 38 for _, subscriber := range subscribers { 39 - wg.Add(1) 40 - sub := subscriber 41 - go func() { 42 - defer wg.Done() 43 - sendMessage(sub, t.name, msgData) 44 - }() 45 - } 46 - 47 - wg.Wait() 48 - } 49 - 50 - func sendMessage(sub subscriber, topicName string, message []byte) { 51 - err := sub.peer.RunConnOperation(sendMessageOp(topicName, message)) 52 - if err != nil { 53 - slog.Error("failed to send to message", "error", err, "peer", sub.peer.Addr()) 54 - return 55 - } 56 - } 57 - 58 - func sendMessageOp(topic string, data []byte) peer.ConnOpp { 59 - return func(conn net.Conn) error { 60 - topicLen := uint64(len(topic)) 61 - err := binary.Write(conn, binary.BigEndian, topicLen) 62 - if err != nil { 63 - return fmt.Errorf("failed to send topic length: %w", err) 64 - } 65 - _, err = conn.Write([]byte(topic)) 66 - if err != nil { 67 - return fmt.Errorf("failed to send topic: %w", err) 68 - } 69 - 70 - dataLen := uint64(len(data)) 71 - 72 - err = binary.Write(conn, binary.BigEndian, dataLen) 73 - if err != nil { 74 - return fmt.Errorf("failed to send data length: %w", err) 75 - } 76 - 77 - _, err = conn.Write(data) 78 - if err != nil { 79 - return fmt.Errorf("failed to write to peer: %w", err) 80 - } 81 - return nil 82 } 83 }
··· 1 package server 2 3 import ( 4 "net" 5 "sync" 6 ) 7 8 type topic struct { 9 name string 10 + subscriptions map[net.Addr]*subscriber 11 mu sync.Mutex 12 } 13 14 func newTopic(name string) *topic { 15 return &topic{ 16 name: name, 17 + subscriptions: make(map[net.Addr]*subscriber), 18 } 19 } 20 ··· 23 subscribers := t.subscriptions 24 t.mu.Unlock() 25 26 for _, subscriber := range subscribers { 27 + subscriber.addMessage(newMessage(msgData), 0) 28 } 29 }