An experimental pub/sub client and server project.

makes the call to publish message async

Changed files
+34 -16
server
+8 -6
server/server.go
··· 303 303 if message == nil { 304 304 continue 305 305 } 306 - // TODO: this can be done in a go routine because once we've got the message from the publisher, the publisher 307 - // doesn't need to wait for us to send the message to all peers 308 306 309 - topic := s.getTopic(message.topic) 310 - if topic != nil { 311 - topic.sendMessageToSubscribers(message.data) 312 - } 307 + // sending messages to the subscribers can be done async because the publisher doesn't need to wait for 308 + // subscribers to be sent the message 309 + go func() { 310 + topic := s.getTopic(message.topic) 311 + if topic != nil { 312 + topic.sendMessageToSubscribers(message.data) 313 + } 314 + }() 313 315 } 314 316 } 315 317
+7 -4
server/server_test.go
··· 283 283 err = binary.Write(publisherConn, binary.BigEndian, Publish) 284 284 require.NoError(t, err) 285 285 286 - messages := make([][]byte, 0, 10) 286 + messages := make([]string, 0, 10) 287 287 for i := 0; i < 10; i++ { 288 - messages = append(messages, []byte(fmt.Sprintf("message %d", i))) 288 + messages = append(messages, fmt.Sprintf("message %d", i)) 289 289 } 290 290 291 291 subscribeFinCh := make(chan struct{}) ··· 293 293 subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}) 294 294 go func() { 295 295 // check subscriber got all messages 296 - for _, msg := range messages { 296 + results := make([]string, 0, len(messages)) 297 + for i := 0; i < len(messages); i++ { 297 298 var topicLen uint64 298 299 err = binary.Read(subscriberConn, binary.BigEndian, &topicLen) 299 300 require.NoError(t, err) ··· 312 313 require.NoError(t, err) 313 314 require.Equal(t, int(dataLen), n) 314 315 315 - assert.Equal(t, msg, buf) 316 + results = append(results, string(buf)) 316 317 } 318 + 319 + assert.ElementsMatch(t, results, messages) 317 320 318 321 subscribeFinCh <- struct{}{} 319 322 }()
+19 -6
server/topic.go
··· 33 33 subscribers := t.subscriptions 34 34 t.mu.Unlock() 35 35 36 - for addr, subscriber := range subscribers { 37 - err := subscriber.peer.RunConnOperation(sendMessageOp(t.name, msgData)) 38 - if err != nil { 39 - slog.Error("failed to send to message", "error", err, "peer", addr) 40 - return 41 - } 36 + var wg sync.WaitGroup 37 + 38 + for _, subscriber := range subscribers { 39 + wg.Add(1) 40 + sub := subscriber 41 + go func() { 42 + defer wg.Done() 43 + sendMessage(sub, t.name, msgData) 44 + }() 45 + } 46 + 47 + wg.Wait() 48 + } 49 + 50 + func sendMessage(sub subscriber, topicName string, message []byte) { 51 + err := sub.peer.RunConnOperation(sendMessageOp(topicName, message)) 52 + if err != nil { 53 + slog.Error("failed to send to message", "error", err, "peer", sub.peer.Addr()) 54 + return 42 55 } 43 56 } 44 57