An experimental pub/sub client and server project.

Merge pull request #1 from willdot/subscriber

PubSub code

authored by willdot.net and committed by GitHub b8557aed 219f554c

+63
example/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + 8 + "github.com/willdot/messagebroker" 9 + "github.com/willdot/messagebroker/pubsub" 10 + "github.com/willdot/messagebroker/server" 11 + ) 12 + 13 + func main() { 14 + server, err := server.New(context.Background(), ":3000") 15 + if err != nil { 16 + panic(err) 17 + } 18 + defer server.Shutdown() 19 + 20 + go sendMessages() 21 + 22 + sub, err := pubsub.NewSubscriber(":3000") 23 + if err != nil { 24 + panic(err) 25 + } 26 + defer sub.Close() 27 + 28 + sub.SubscribeToTopics([]string{"topic a"}) 29 + 30 + consumer := sub.Consume(context.Background()) 31 + if consumer.Err != nil { 32 + panic(err) 33 + } 34 + 35 + for msg := range consumer.Msgs { 36 + slog.Info("received message", "message", string(msg.Data)) 37 + } 38 + 39 + } 40 + 41 + func sendMessages() { 42 + publisher, err := pubsub.NewPublisher("localhost:3000") 43 + if err != nil { 44 + panic(err) 45 + } 46 + defer publisher.Close() 47 + 48 + // send some messages 49 + i := 0 50 + for { 51 + i++ 52 + msg := messagebroker.Message{ 53 + Topic: "topic a", 54 + Data: []byte(fmt.Sprintf("message %d", i)), 55 + } 56 + 57 + err = publisher.PublishMessage(msg) 58 + if err != nil { 59 + slog.Error("failed to publish message", "error", err) 60 + continue 61 + } 62 + } 63 + }
+7
message.go
··· 1 + package messagebroker 2 + 3 + // Message represents a message that can be published or consumed 4 + type Message struct { 5 + Topic string `json:"topic"` 6 + Data []byte `json:"data"` 7 + }
+1 -1
peer.go server/peer.go
··· 1 - package messagebroker 1 + package server 2 2 3 3 import ( 4 4 "encoding/binary"
+59
pubsub/publisher.go
··· 1 + package pubsub 2 + 3 + import ( 4 + "encoding/binary" 5 + "encoding/json" 6 + "fmt" 7 + "net" 8 + 9 + "github.com/willdot/messagebroker" 10 + "github.com/willdot/messagebroker/server" 11 + ) 12 + 13 + // Publisher allows messages to be published to a server 14 + type Publisher struct { 15 + conn net.Conn 16 + } 17 + 18 + // NewPublisher connects to the server at the given address and registers as a publisher 19 + func NewPublisher(addr string) (*Publisher, error) { 20 + conn, err := net.Dial("tcp", addr) 21 + if err != nil { 22 + return nil, fmt.Errorf("failed to dial: %w", err) 23 + } 24 + 25 + err = binary.Write(conn, binary.BigEndian, server.Publish) 26 + if err != nil { 27 + conn.Close() 28 + return nil, fmt.Errorf("failed to register publish to server: %w", err) 29 + } 30 + 31 + return &Publisher{ 32 + conn: conn, 33 + }, nil 34 + } 35 + 36 + // Close cleanly shuts down the publisher 37 + func (p *Publisher) Close() error { 38 + return p.conn.Close() 39 + } 40 + 41 + // Publish will publish the given message to the server 42 + func (p *Publisher) PublishMessage(message messagebroker.Message) error { 43 + b, err := json.Marshal(message) 44 + if err != nil { 45 + return fmt.Errorf("failed to marshal message: %w", err) 46 + } 47 + 48 + err = binary.Write(p.conn, binary.BigEndian, uint32(len(b))) 49 + if err != nil { 50 + return fmt.Errorf("failed to write message size to server") 51 + } 52 + 53 + _, err = p.conn.Write(b) 54 + if err != nil { 55 + return fmt.Errorf("failed to publish data to server") 56 + } 57 + 58 + return nil 59 + }
+145
pubsub/subscriber.go
··· 1 + package pubsub 2 + 3 + import ( 4 + "context" 5 + "encoding/binary" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "net" 10 + "time" 11 + 12 + "github.com/willdot/messagebroker" 13 + "github.com/willdot/messagebroker/server" 14 + ) 15 + 16 + // Subscriber allows subscriptions to a server and the consumption of messages 17 + type Subscriber struct { 18 + conn net.Conn 19 + } 20 + 21 + // NewSubscriber will connect to the server at the given address 22 + func NewSubscriber(addr string) (*Subscriber, error) { 23 + conn, err := net.Dial("tcp", addr) 24 + if err != nil { 25 + return nil, fmt.Errorf("failed to dial: %w", err) 26 + } 27 + 28 + return &Subscriber{ 29 + conn: conn, 30 + }, nil 31 + } 32 + 33 + // Close cleanly shuts down the subscriber 34 + func (s *Subscriber) Close() error { 35 + return s.conn.Close() 36 + } 37 + 38 + // SubscribeToTopics will subscribe to the provided topics 39 + func (s *Subscriber) SubscribeToTopics(topicNames []string) error { 40 + err := binary.Write(s.conn, binary.BigEndian, server.Subscribe) 41 + if err != nil { 42 + return fmt.Errorf("failed to subscribe: %w", err) 43 + } 44 + 45 + b, err := json.Marshal(topicNames) 46 + if err != nil { 47 + return fmt.Errorf("failed to marshal topic names: %w", err) 48 + } 49 + 50 + err = binary.Write(s.conn, binary.BigEndian, uint32(len(b))) 51 + if err != nil { 52 + return fmt.Errorf("failed to write topic data length: %w", err) 53 + } 54 + 55 + _, err = s.conn.Write(b) 56 + if err != nil { 57 + return fmt.Errorf("failed to subscribe to topics: %w", err) 58 + } 59 + buf := make([]byte, 512) 60 + _, err = s.conn.Read(buf) 61 + if err != nil { 62 + return fmt.Errorf("failed to read confirmation of subscription: %w", err) 63 + } 64 + 65 + // TODO: this is soooo hacky - need to have some sort of response code 66 + if string(buf[:10]) != "subscribed" { 67 + return fmt.Errorf("failed to subscribe: '%s'", string(buf)) 68 + } 69 + 70 + return nil 71 + } 72 + 73 + // Consumer allows the consumption of messages. It is thread safe to range over the Msgs channel to consume. If during the consumer 74 + // receiving messages from the server an error occurs, it will be stored in Err 75 + type Consumer struct { 76 + Msgs chan messagebroker.Message 77 + // TODO: better error handling? Maybe a channel of errors? 78 + Err error 79 + } 80 + 81 + // Consume will create a consumer and start it running in a go routine. You can then use the Msgs channel of the consumer 82 + // to read the messages 83 + func (s *Subscriber) Consume(ctx context.Context) *Consumer { 84 + consumer := &Consumer{ 85 + Msgs: make(chan messagebroker.Message), 86 + } 87 + 88 + go s.consume(ctx, consumer) 89 + 90 + return consumer 91 + } 92 + 93 + func (s *Subscriber) consume(ctx context.Context, consumer *Consumer) { 94 + defer close(consumer.Msgs) 95 + for { 96 + if ctx.Err() != nil { 97 + return 98 + } 99 + 100 + msg, err := s.readMessage() 101 + if err != nil { 102 + consumer.Err = err 103 + return 104 + } 105 + 106 + if msg != nil { 107 + consumer.Msgs <- *msg 108 + } 109 + } 110 + } 111 + 112 + func (s *Subscriber) readMessage() (*messagebroker.Message, error) { 113 + err := s.conn.SetReadDeadline(time.Now().Add(time.Second)) 114 + if err != nil { 115 + return nil, err 116 + } 117 + 118 + var dataLen uint64 119 + err = binary.Read(s.conn, binary.BigEndian, &dataLen) 120 + if err != nil { 121 + if neterr, ok := err.(net.Error); ok && neterr.Timeout() { 122 + return nil, nil 123 + } 124 + return nil, err 125 + } 126 + 127 + if dataLen <= 0 { 128 + return nil, nil 129 + } 130 + 131 + buf := make([]byte, dataLen) 132 + _, err = s.conn.Read(buf) 133 + if err != nil { 134 + return nil, err 135 + } 136 + 137 + var msg messagebroker.Message 138 + err = json.Unmarshal(buf, &msg) 139 + if err != nil { 140 + slog.Error("failed to unmarshal message", "error", err) 141 + return nil, nil 142 + } 143 + 144 + return &msg, nil 145 + }
+148
pubsub/subscriber_test.go
··· 1 + package pubsub 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "testing" 7 + "time" 8 + 9 + "github.com/stretchr/testify/assert" 10 + "github.com/stretchr/testify/require" 11 + "github.com/willdot/messagebroker" 12 + 13 + "github.com/willdot/messagebroker/server" 14 + ) 15 + 16 + const ( 17 + serverAddr = ":3000" 18 + ) 19 + 20 + func createServer(t *testing.T) { 21 + server, err := server.New(context.Background(), serverAddr) 22 + require.NoError(t, err) 23 + 24 + t.Cleanup(func() { 25 + server.Shutdown() 26 + }) 27 + } 28 + 29 + func TestNewSubscriber(t *testing.T) { 30 + createServer(t) 31 + 32 + sub, err := NewSubscriber(serverAddr) 33 + require.NoError(t, err) 34 + 35 + t.Cleanup(func() { 36 + sub.Close() 37 + }) 38 + } 39 + 40 + func TestNewSubscriberInvalidServerAddr(t *testing.T) { 41 + createServer(t) 42 + 43 + _, err := NewSubscriber(":123456") 44 + require.Error(t, err) 45 + } 46 + 47 + func TestNewPublisher(t *testing.T) { 48 + createServer(t) 49 + 50 + sub, err := NewPublisher(serverAddr) 51 + require.NoError(t, err) 52 + 53 + t.Cleanup(func() { 54 + sub.Close() 55 + }) 56 + } 57 + 58 + func TestNewPublisherInvalidServerAddr(t *testing.T) { 59 + createServer(t) 60 + 61 + _, err := NewPublisher(":123456") 62 + require.Error(t, err) 63 + } 64 + 65 + func TestSubscribeToTopics(t *testing.T) { 66 + createServer(t) 67 + 68 + sub, err := NewSubscriber(serverAddr) 69 + require.NoError(t, err) 70 + 71 + t.Cleanup(func() { 72 + sub.Close() 73 + }) 74 + 75 + topics := []string{"topic a", "topic b"} 76 + 77 + err = sub.SubscribeToTopics(topics) 78 + require.NoError(t, err) 79 + } 80 + 81 + func TestPublishAndSubscribe(t *testing.T) { 82 + createServer(t) 83 + 84 + sub, err := NewSubscriber(serverAddr) 85 + require.NoError(t, err) 86 + 87 + t.Cleanup(func() { 88 + sub.Close() 89 + }) 90 + 91 + topics := []string{"topic a", "topic b"} 92 + 93 + err = sub.SubscribeToTopics(topics) 94 + require.NoError(t, err) 95 + 96 + ctx, cancel := context.WithCancel(context.Background()) 97 + t.Cleanup(func() { 98 + cancel() 99 + }) 100 + 101 + consumer := sub.Consume(ctx) 102 + require.NoError(t, err) 103 + 104 + var receivedMessages []messagebroker.Message 105 + 106 + consumerFinCh := make(chan struct{}) 107 + go func() { 108 + for msg := range consumer.Msgs { 109 + receivedMessages = append(receivedMessages, msg) 110 + } 111 + 112 + require.NoError(t, err) 113 + consumerFinCh <- struct{}{} 114 + }() 115 + 116 + publisher, err := NewPublisher("localhost:3000") 117 + require.NoError(t, err) 118 + t.Cleanup(func() { 119 + publisher.Close() 120 + }) 121 + 122 + // send some messages 123 + sentMessages := make([]messagebroker.Message, 0, 10) 124 + for i := 0; i < 10; i++ { 125 + msg := messagebroker.Message{ 126 + Topic: "topic a", 127 + Data: []byte(fmt.Sprintf("message %d", i)), 128 + } 129 + 130 + sentMessages = append(sentMessages, msg) 131 + 132 + err = publisher.PublishMessage(msg) 133 + require.NoError(t, err) 134 + } 135 + 136 + // give the consumer some time to read the messages -- TODO: make better! 137 + time.Sleep(time.Millisecond * 500) 138 + cancel() 139 + 140 + select { 141 + case <-consumerFinCh: 142 + break 143 + case <-time.After(time.Second): 144 + t.Fatal("timed out waiting for consumer to read messages") 145 + } 146 + 147 + assert.ElementsMatch(t, receivedMessages, sentMessages) 148 + }
+9 -9
server.go server/server.go
··· 1 - package messagebroker 1 + package server 2 2 3 3 import ( 4 4 "context" ··· 8 8 "net" 9 9 "strings" 10 10 "sync" 11 + 12 + "github.com/willdot/messagebroker" 11 13 ) 12 14 13 15 // Action represents the type of action that a peer requests to do ··· 19 21 Publish Action = 3 20 22 ) 21 23 22 - type Message struct { 23 - Topic string `json:"topic"` 24 - Data []byte `json:"data"` 25 - } 26 - 24 + // Server accepts subscribe and publish connections and passes messages around 27 25 type Server struct { 28 26 addr string 29 27 lis net.Listener ··· 32 30 topics map[string]topic 33 31 } 34 32 35 - func NewServer(ctx context.Context, addr string) (*Server, error) { 33 + // New creates and starts a new server 34 + func New(ctx context.Context, addr string) (*Server, error) { 36 35 lis, err := net.Listen("tcp", addr) 37 36 if err != nil { 38 37 return nil, fmt.Errorf("failed to listen: %w", err) ··· 48 47 return srv, nil 49 48 } 50 49 50 + // Shutdown will cleanly shutdown the server 51 51 func (s *Server) Shutdown() error { 52 52 return s.lis.Close() 53 53 } ··· 204 204 return 205 205 } 206 206 207 - var msg Message 207 + var msg messagebroker.Message 208 208 err = json.Unmarshal(buf, &msg) 209 209 if err != nil { 210 210 _, _ = peer.Write([]byte("invalid message")) ··· 234 234 t = newTopic(topicName) 235 235 } 236 236 237 - t.subscriptions[peer.addr()] = Subscriber{ 237 + t.subscriptions[peer.addr()] = subscriber{ 238 238 peer: peer, 239 239 currentOffset: 0, 240 240 }
+6 -5
server_test.go server/server_test.go
··· 1 - package messagebroker 1 + package server 2 2 3 3 import ( 4 4 "context" ··· 11 11 12 12 "github.com/stretchr/testify/assert" 13 13 "github.com/stretchr/testify/require" 14 + "github.com/willdot/messagebroker" 14 15 ) 15 16 16 17 func createServer(t *testing.T) *Server { 17 - srv, err := NewServer(context.Background(), ":3000") 18 + srv, err := New(context.Background(), ":3000") 18 19 require.NoError(t, err) 19 20 20 21 t.Cleanup(func() { ··· 28 29 srv := createServer(t) 29 30 srv.topics[topicName] = topic{ 30 31 name: topicName, 31 - subscriptions: make(map[net.Addr]Subscriber), 32 + subscriptions: make(map[net.Addr]subscriber), 32 33 } 33 34 34 35 return srv ··· 205 206 require.NoError(t, err) 206 207 207 208 // send a message 208 - msg := Message{ 209 + msg := messagebroker.Message{ 209 210 Topic: "topic a", 210 211 Data: []byte("hello world"), 211 212 } ··· 247 248 248 249 messages := make([][]byte, 0, 10) 249 250 for i := 0; i < 10; i++ { 250 - msg := Message{ 251 + msg := messagebroker.Message{ 251 252 Topic: "topic a", 252 253 Data: []byte(fmt.Sprintf("message %d", i)), 253 254 }
+3 -3
subscriber.go server/subscriber.go
··· 1 - package messagebroker 1 + package server 2 2 3 3 import ( 4 4 "encoding/binary" 5 5 "fmt" 6 6 ) 7 7 8 - type Subscriber struct { 8 + type subscriber struct { 9 9 peer peer 10 10 currentOffset int 11 11 } 12 12 13 - func (s *Subscriber) SendMessage(msg []byte) error { 13 + func (s *subscriber) sendMessage(msg []byte) error { 14 14 dataLen := uint64(len(msg)) 15 15 16 16 err := binary.Write(&s.peer, binary.BigEndian, dataLen)
+7 -5
topic.go server/topic.go
··· 1 - package messagebroker 1 + package server 2 2 3 3 import ( 4 4 "encoding/json" 5 5 "log/slog" 6 6 "net" 7 7 "sync" 8 + 9 + "github.com/willdot/messagebroker" 8 10 ) 9 11 10 12 type topic struct { 11 13 name string 12 - subscriptions map[net.Addr]Subscriber 14 + subscriptions map[net.Addr]subscriber 13 15 mu sync.Mutex 14 16 } 15 17 16 18 func newTopic(name string) topic { 17 19 return topic{ 18 20 name: name, 19 - subscriptions: make(map[net.Addr]Subscriber), 21 + subscriptions: make(map[net.Addr]subscriber), 20 22 } 21 23 } 22 24 ··· 28 30 delete(t.subscriptions, addr) 29 31 } 30 32 31 - func (t *topic) sendMessageToSubscribers(msg Message) { 33 + func (t *topic) sendMessageToSubscribers(msg messagebroker.Message) { 32 34 t.mu.Lock() 33 35 subscribers := t.subscriptions 34 36 t.mu.Unlock() ··· 39 41 } 40 42 41 43 for addr, subscriber := range subscribers { 42 - err := subscriber.SendMessage(msgData) 44 + err := subscriber.sendMessage(msgData) 43 45 if err != nil { 44 46 slog.Error("failed to send to message", "error", err, "peer", addr) 45 47 continue