An experimental pub/sub client and server project.

write to connection once

+1 -1
README.md
··· 18 18 19 19 ### Sending data via a connection 20 20 21 - When sending a message representing an action (subscribe, publish etc) then a uint8 binary message is sent. 21 + When sending a message representing an action (subscribe, publish etc) then a uint16 binary message is sent. 22 22 23 23 When sending any other data, the length of the data is to be sent first using a binary uint32 and then the actual data sent afterwards.
+1
example/main.go
··· 41 41 42 42 for msg := range consumer.Messages() { 43 43 slog.Info("received message", "message", string(msg.Data)) 44 + msg.Ack(true) 44 45 } 45 46 46 47 }
+8 -13
pubsub/publisher.go
··· 43 43 op := func(conn net.Conn) error { 44 44 // send topic first 45 45 topic := fmt.Sprintf("topic:%s", message.Topic) 46 - err := binary.Write(p.conn, binary.BigEndian, uint32(len(topic))) 47 - if err != nil { 48 - return fmt.Errorf("failed to write topic size to server") 49 - } 46 + 47 + topicLenB := make([]byte, 4) 48 + binary.BigEndian.PutUint32(topicLenB, uint32(len(topic))) 50 49 51 - _, err = p.conn.Write([]byte(topic)) 52 - if err != nil { 53 - return fmt.Errorf("failed to write topic to server") 54 - } 50 + headers := append(topicLenB, []byte(topic)...) 55 51 56 - err = binary.Write(p.conn, binary.BigEndian, uint32(len(message.Data))) 57 - if err != nil { 58 - return fmt.Errorf("failed to write message size to server") 59 - } 52 + messageLenB := make([]byte, 4) 53 + binary.BigEndian.PutUint32(messageLenB, uint32(len(message.Data))) 54 + headers = append(headers, messageLenB...) 60 55 61 - _, err = p.conn.Write(message.Data) 56 + _, err := conn.Write(append(headers, message.Data...)) 62 57 if err != nil { 63 58 return fmt.Errorf("failed to publish data to server") 64 59 }
+14 -20
pubsub/subscriber.go
··· 41 41 // SubscribeToTopics will subscribe to the provided topics 42 42 func (s *Subscriber) SubscribeToTopics(topicNames []string) error { 43 43 op := func(conn net.Conn) error { 44 - err := binary.Write(conn, binary.BigEndian, server.Subscribe) 45 - if err != nil { 46 - return fmt.Errorf("failed to subscribe: %w", err) 47 - } 44 + actionB := make([]byte, 2) 45 + binary.BigEndian.PutUint16(actionB, server.Subscribed) 46 + headers := actionB 48 47 49 48 b, err := json.Marshal(topicNames) 50 49 if err != nil { 51 50 return fmt.Errorf("failed to marshal topic names: %w", err) 52 51 } 53 52 54 - err = binary.Write(conn, binary.BigEndian, uint32(len(b))) 55 - if err != nil { 56 - return fmt.Errorf("failed to write topic data length: %w", err) 57 - } 53 + topicNamesB := make([]byte, 4) 54 + binary.BigEndian.PutUint32(topicNamesB, uint32(len(b))) 55 + headers = append(headers, topicNamesB...) 58 56 59 - _, err = conn.Write(b) 57 + _, err = conn.Write(append(headers, b...)) 60 58 if err != nil { 61 59 return fmt.Errorf("failed to subscribe to topics: %w", err) 62 60 } ··· 92 90 // UnsubscribeToTopics will unsubscribe to the provided topics 93 91 func (s *Subscriber) UnsubscribeToTopics(topicNames []string) error { 94 92 op := func(conn net.Conn) error { 95 - err := binary.Write(conn, binary.BigEndian, server.Unsubscribe) 96 - if err != nil { 97 - return fmt.Errorf("failed to unsubscribe: %w", err) 98 - } 93 + actionB := make([]byte, 2) 94 + binary.BigEndian.PutUint16(actionB, uint16(server.Unsubscribe)) 95 + headers := actionB 99 96 100 97 b, err := json.Marshal(topicNames) 101 98 if err != nil { 102 99 return fmt.Errorf("failed to marshal topic names: %w", err) 103 100 } 104 101 105 - err = binary.Write(conn, binary.BigEndian, uint32(len(b))) 106 - if err != nil { 107 - return fmt.Errorf("failed to write topic data length: %w", err) 108 - } 102 + topicNamesB := make([]byte, 4) 103 + binary.BigEndian.PutUint32(topicNamesB, uint32(len(b))) 104 + headers = append(headers, topicNamesB...) 109 105 110 - _, err = conn.Write(b) 106 + _, err = conn.Write(append(headers, b...)) 111 107 if err != nil { 112 108 return fmt.Errorf("failed to unsubscribe to topics: %w", err) 113 109 } ··· 230 226 return ctx.Err() 231 227 case ack = <-msg.ack: 232 228 } 233 - //ack := <-msg.ack 234 - 235 229 ackMessage := server.Nack 236 230 if ack { 237 231 ackMessage = server.Ack
+14 -19
server/server.go
··· 17 17 ) 18 18 19 19 // Action represents the type of action that a peer requests to do 20 - type Action uint8 20 + type Action uint16 21 21 22 22 const ( 23 23 Subscribe Action = 1 ··· 28 28 ) 29 29 30 30 // Status represents the status of a request 31 - type Status uint8 31 + type Status uint16 32 32 33 33 const ( 34 34 Subscribed = 1 ··· 445 445 } 446 446 447 447 func writeStatus(status Status, message string, conn net.Conn) { 448 - err := binary.Write(conn, binary.BigEndian, status) 449 - if err != nil { 450 - if !errors.Is(err, syscall.EPIPE) { 451 - slog.Error("failed to write status to peers connection", "error", err, "peer", conn.RemoteAddr()) 452 - } 453 - return 454 - } 448 + statusB := make([]byte, 2) 449 + binary.BigEndian.PutUint16(statusB, uint16(status)) 450 + 451 + headers := statusB 455 452 456 - if message == "" { 457 - return 453 + if len(message) > 0 { 454 + sizeB := make([]byte, 4) 455 + binary.BigEndian.PutUint32(sizeB, uint32(len(message))) 456 + headers = append(headers, sizeB...) 458 457 } 459 458 460 459 msgBytes := []byte(message) 461 - err = binary.Write(conn, binary.BigEndian, uint32(len(msgBytes))) 462 - if err != nil { 463 - slog.Error("failed to write message length to peers connection", "error", err, "peer", conn.RemoteAddr()) 464 - return 465 - } 466 - 467 - _, err = conn.Write(msgBytes) 460 + _, err := conn.Write(append(headers, msgBytes...)) 468 461 if err != nil { 469 - slog.Error("failed to write message to peers connection", "error", err, "peer", conn.RemoteAddr()) 462 + if !errors.Is(err, syscall.EPIPE) { 463 + slog.Error("failed to write status to peers connection", "error", err, "peer", conn.RemoteAddr()) 464 + } 470 465 return 471 466 } 472 467 }
+42 -89
server/server_test.go
··· 48 48 conn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 49 49 require.NoError(t, err) 50 50 51 - err = binary.Write(conn, binary.BigEndian, Subscribe) 52 - require.NoError(t, err) 53 - 54 - rawTopics, err := json.Marshal(topics) 55 - require.NoError(t, err) 56 - 57 - err = binary.Write(conn, binary.BigEndian, uint32(len(rawTopics))) 58 - require.NoError(t, err) 59 - 60 - _, err = conn.Write(rawTopics) 61 - require.NoError(t, err) 51 + subscribeOrUnsubscribetoTopics(t, conn, topics, Subscribe) 62 52 63 53 expectedRes := Subscribed 64 54 ··· 71 61 return conn 72 62 } 73 63 64 + func sendMessage(t *testing.T, conn net.Conn, topic string, message []byte) { 65 + topicLenB := make([]byte, 4) 66 + binary.BigEndian.PutUint32(topicLenB, uint32(len(topic))) 67 + 68 + headers := topicLenB 69 + headers = append(headers, []byte(topic)...) 70 + 71 + messageLenB := make([]byte, 4) 72 + binary.BigEndian.PutUint32(messageLenB, uint32(len(message))) 73 + headers = append(headers, messageLenB...) 74 + 75 + _, err := conn.Write(append(headers, message...)) 76 + require.NoError(t, err) 77 + } 78 + 79 + func subscribeOrUnsubscribetoTopics(t *testing.T, conn net.Conn, topics []string, action Action) { 80 + actionB := make([]byte, 2) 81 + binary.BigEndian.PutUint16(actionB, uint16(action)) 82 + headers := actionB 83 + 84 + b, err := json.Marshal(topics) 85 + require.NoError(t, err) 86 + 87 + topicNamesB := make([]byte, 4) 88 + binary.BigEndian.PutUint32(topicNamesB, uint32(len(b))) 89 + headers = append(headers, topicNamesB...) 90 + 91 + _, err = conn.Write(append(headers, b...)) 92 + require.NoError(t, err) 93 + } 94 + 74 95 func TestSubscribeToTopics(t *testing.T) { 75 96 // create a server with an existing topic so we can test subscribing to a new and 76 97 // existing topic ··· 93 114 assert.Len(t, srv.topics[topicB].subscriptions, 1) 94 115 assert.Len(t, srv.topics[topicC].subscriptions, 1) 95 116 96 - err := binary.Write(conn, binary.BigEndian, Unsubscribe) 97 - require.NoError(t, err) 98 - 99 117 topics := []string{topicA, topicB} 100 - rawTopics, err := json.Marshal(topics) 101 - require.NoError(t, err) 102 118 103 - err = binary.Write(conn, binary.BigEndian, uint32(len(rawTopics))) 104 - require.NoError(t, err) 105 - 106 - _, err = conn.Write(rawTopics) 107 - require.NoError(t, err) 119 + subscribeOrUnsubscribetoTopics(t, conn, topics, Unsubscribe) 108 120 109 121 expectedRes := Unsubscribed 110 122 111 123 var resp Status 112 - err = binary.Read(conn, binary.BigEndian, &resp) 124 + err := binary.Read(conn, binary.BigEndian, &resp) 113 125 require.NoError(t, err) 114 126 115 127 assert.Equal(t, expectedRes, int(resp)) ··· 140 152 require.NoError(t, err) 141 153 142 154 data := []byte("hello world") 143 - // send data length first 144 - err = binary.Write(publisherConn, binary.BigEndian, uint32(len(data))) 145 - require.NoError(t, err) 146 - n, err := publisherConn.Write(data) 147 - require.NoError(t, err) 148 - require.Equal(t, len(data), n) 155 + 156 + sendMessage(t, publisherConn, topicA, data) 149 157 150 158 assert.Len(t, srv.topics, 2) 151 159 assert.Len(t, srv.topics[topicA].subscriptions, 0) ··· 158 166 conn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr)) 159 167 require.NoError(t, err) 160 168 161 - err = binary.Write(conn, binary.BigEndian, uint8(99)) 169 + err = binary.Write(conn, binary.BigEndian, uint16(99)) 162 170 require.NoError(t, err) 163 171 164 172 expectedRes := Error ··· 240 248 topic := fmt.Sprintf("topic:%s", topicA) 241 249 messageData := "hello world" 242 250 243 - // send topic first 244 - err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic))) 245 - require.NoError(t, err) 246 - _, err = publisherConn.Write([]byte(topic)) 247 - require.NoError(t, err) 248 - 249 - // now send the data 250 - err = binary.Write(publisherConn, binary.BigEndian, uint32(len(messageData))) 251 - require.NoError(t, err) 252 - n, err := publisherConn.Write([]byte(messageData)) 253 - require.NoError(t, err) 254 - require.Equal(t, len(messageData), n) 251 + sendMessage(t, publisherConn, topic, []byte(messageData)) 255 252 256 253 // check the subsribers got the data 257 254 for _, conn := range subscribers { ··· 335 332 336 333 // send multiple messages 337 334 for _, msg := range messages { 338 - // send topic first 339 - err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic))) 340 - require.NoError(t, err) 341 - _, err = publisherConn.Write([]byte(topic)) 342 - require.NoError(t, err) 343 - 344 - // now send the data 345 - err = binary.Write(publisherConn, binary.BigEndian, uint32(len(msg))) 346 - require.NoError(t, err) 347 - n, err := publisherConn.Write([]byte(msg)) 348 - require.NoError(t, err) 349 - require.Equal(t, len(msg), n) 335 + sendMessage(t, publisherConn, topic, []byte(msg)) 350 336 } 351 337 352 338 select { ··· 371 357 topic := fmt.Sprintf("topic:%s", topicA) 372 358 messageData := "hello world" 373 359 374 - // send topic first 375 - err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic))) 376 - require.NoError(t, err) 377 - _, err = publisherConn.Write([]byte(topic)) 378 - require.NoError(t, err) 379 - 380 - // now send the data 381 - err = binary.Write(publisherConn, binary.BigEndian, uint32(len(messageData))) 382 - require.NoError(t, err) 383 - n, err := publisherConn.Write([]byte(messageData)) 384 - require.NoError(t, err) 385 - require.Equal(t, len(messageData), n) 360 + sendMessage(t, publisherConn, topic, []byte(messageData)) 386 361 387 362 // check the subsribers got the data 388 363 readMessage := func(conn net.Conn, ack Action) { ··· 436 411 topic := fmt.Sprintf("topic:%s", topicA) 437 412 messageData := "hello world" 438 413 439 - // send topic first 440 - err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic))) 441 - require.NoError(t, err) 442 - _, err = publisherConn.Write([]byte(topic)) 443 - require.NoError(t, err) 444 - 445 - // now send the data 446 - err = binary.Write(publisherConn, binary.BigEndian, uint32(len(messageData))) 447 - require.NoError(t, err) 448 - n, err := publisherConn.Write([]byte(messageData)) 449 - require.NoError(t, err) 450 - require.Equal(t, len(messageData), n) 414 + sendMessage(t, publisherConn, topic, []byte(messageData)) 451 415 452 416 // check the subsribers got the data 453 417 readMessage := func(conn net.Conn, ack bool) { ··· 505 469 topic := fmt.Sprintf("topic:%s", topicA) 506 470 messageData := "hello world" 507 471 508 - // send topic first 509 - err = binary.Write(publisherConn, binary.BigEndian, uint32(len(topic))) 510 - require.NoError(t, err) 511 - _, err = publisherConn.Write([]byte(topic)) 512 - require.NoError(t, err) 513 - 514 - // now send the data 515 - err = binary.Write(publisherConn, binary.BigEndian, uint32(len(messageData))) 516 - require.NoError(t, err) 517 - n, err := publisherConn.Write([]byte(messageData)) 518 - require.NoError(t, err) 519 - require.Equal(t, len(messageData), n) 472 + sendMessage(t, publisherConn, topic, []byte(messageData)) 520 473 521 474 // check the subsribers got the data 522 475 readMessage := func(conn net.Conn, ack bool) {
+10 -15
server/subscriber.go
··· 87 87 func (s *subscriber) sendMessage(topic string, msg message) (bool, error) { 88 88 var ack bool 89 89 op := func(conn net.Conn) error { 90 - topicLen := uint64(len(topic)) 91 - err := binary.Write(conn, binary.BigEndian, topicLen) 92 - if err != nil { 93 - return fmt.Errorf("failed to send topic length: %w", err) 94 - } 95 - _, err = conn.Write([]byte(topic)) 96 - if err != nil { 97 - return fmt.Errorf("failed to send topic: %w", err) 98 - } 90 + // TODO: why did I chose uint64 for topic len? 91 + topicB := make([]byte, 8) 92 + binary.BigEndian.PutUint64(topicB, uint64(len(topic))) 99 93 100 - dataLen := uint64(len(msg.data)) 94 + headers := topicB 95 + headers = append(headers, []byte(topic)...) 101 96 102 - err = binary.Write(conn, binary.BigEndian, dataLen) 103 - if err != nil { 104 - return fmt.Errorf("failed to send data length: %w", err) 105 - } 97 + // TODO: if message is empty, return error? 98 + dataLenB := make([]byte, 8) 99 + binary.BigEndian.PutUint64(dataLenB, uint64(len(msg.data))) 100 + headers = append(headers, dataLenB...) 106 101 107 - _, err = conn.Write(msg.data) 102 + _, err := conn.Write(append(headers, msg.data...)) 108 103 if err != nil { 109 104 return fmt.Errorf("failed to write to peer: %w", err) 110 105 }