An experimental pub/sub client and server project.
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

more tweaks

+12 -11
+12 -11
server/server.go
··· 78 78 79 79 switch action { 80 80 case Subscribe: 81 - s.handleSubscribe(peer) 81 + s.handleSubscribe(&peer) 82 82 case Unsubscribe: 83 - s.handleUnsubscribe(peer) 83 + s.handleUnsubscribe(&peer) 84 84 case Publish: 85 - s.handlePublish(peer) 85 + s.handlePublish(&peer) 86 86 default: 87 87 slog.Error("unknown action", "action", action, "peer", peer.addr()) 88 88 writeStatus(Error, "unknown action", peer.conn) 89 89 } 90 90 } 91 91 92 - func (s *Server) handleSubscribe(peer peer) { 92 + func (s *Server) handleSubscribe(peer *peer) { 93 93 // subscribe the peer to the topic 94 - s.subscribePeerToTopic(&peer) 94 + s.subscribePeerToTopic(peer) 95 95 96 96 // keep handling the peers connection, getting the action from the peer when it wishes to do something else. 97 97 // once the peers connection ends, it will be unsubscribed from all topics and returned 98 98 for { 99 - action, err := readAction(peer) 99 + action, err := readAction(*peer) 100 100 if err != nil { 101 101 var neterr net.Error 102 102 if errors.As(err, &neterr) && neterr.Timeout() { ··· 106 106 // TODO: see if there's a way to check if the peers connection has been ended etc 107 107 slog.Error("failed to read action from subscriber", "error", err, "peer", peer.addr()) 108 108 109 - s.unsubscribePeerFromAllTopics(peer) 109 + s.unsubscribePeerFromAllTopics(*peer) 110 110 111 111 return 112 112 } 113 113 114 114 switch action { 115 115 case Subscribe: 116 - s.subscribePeerToTopic(&peer) 116 + s.subscribePeerToTopic(peer) 117 117 case Unsubscribe: 118 118 s.handleUnsubscribe(peer) 119 119 default: ··· 163 163 _ = peer.connOperation(op, "subscribe peer to topic") 164 164 } 165 165 166 - func (s *Server) handleUnsubscribe(peer peer) { 166 + func (s *Server) handleUnsubscribe(peer *peer) { 167 167 op := func(conn net.Conn) error { 168 168 // get the topics the peer wishes to unsubscribe from 169 169 dataLen, err := dataLength(conn) ··· 193 193 return nil 194 194 } 195 195 196 - s.unsubscribeToTopics(peer, topics) 196 + s.unsubscribeToTopics(*peer, topics) 197 197 writeStatus(Unsubscribed, "", conn) 198 198 199 199 return nil ··· 207 207 data []byte 208 208 } 209 209 210 - func (s *Server) handlePublish(peer peer) { 210 + func (s *Server) handlePublish(peer *peer) { 211 211 for { 212 212 var message *messageToSend 213 213 ··· 338 338 return nil 339 339 } 340 340 341 + // TODO: work out why this can't take a pointer to the peer 341 342 func readAction(peer peer) (Action, error) { 342 343 var action Action 343 344 op := func(conn net.Conn) error {