An experimental pub/sub client and server project.

tidy up and refactor

Changed files
+109 -95
example
server
+3
example/main.go
··· 5 5 "flag" 6 6 "fmt" 7 7 "log/slog" 8 + "time" 8 9 9 10 "github.com/willdot/messagebroker/pubsub" 10 11 ) ··· 59 60 slog.Error("failed to publish message", "error", err) 60 61 continue 61 62 } 63 + 64 + time.Sleep(time.Millisecond * 500) 62 65 } 63 66 }
-52
server/peer.go
··· 1 - package server 2 - 3 - import ( 4 - "net" 5 - "sync" 6 - ) 7 - 8 - // Status represents the status of a request 9 - type Status uint8 10 - 11 - const ( 12 - Subscribed = 1 13 - Unsubscribed = 2 14 - Error = 3 15 - ) 16 - 17 - func (s Status) String() string { 18 - switch s { 19 - case Subscribed: 20 - return "subsribed" 21 - case Unsubscribed: 22 - return "unsubscribed" 23 - case Error: 24 - return "error" 25 - } 26 - 27 - return "" 28 - } 29 - 30 - type peer struct { 31 - conn net.Conn 32 - connMu sync.Mutex 33 - } 34 - 35 - func newPeer(conn net.Conn) *peer { 36 - return &peer{ 37 - conn: conn, 38 - } 39 - } 40 - 41 - func (p *peer) addr() net.Addr { 42 - return p.conn.RemoteAddr() 43 - } 44 - 45 - type connOpp func(conn net.Conn) error 46 - 47 - func (p *peer) connOperation(op connOpp, from string) error { 48 - p.connMu.Lock() 49 - defer p.connMu.Unlock() 50 - 51 - return op(p.conn) 52 - }
+30
server/peer/peer.go
··· 1 + package peer 2 + 3 + import ( 4 + "net" 5 + "sync" 6 + ) 7 + 8 + type Peer struct { 9 + conn net.Conn 10 + connMu sync.Mutex 11 + } 12 + 13 + func New(conn net.Conn) *Peer { 14 + return &Peer{ 15 + conn: conn, 16 + } 17 + } 18 + 19 + func (p *Peer) Addr() net.Addr { 20 + return p.conn.RemoteAddr() 21 + } 22 + 23 + type ConnOpp func(conn net.Conn) error 24 + 25 + func (p *Peer) ConnOperation(op ConnOpp) error { 26 + p.connMu.Lock() 27 + defer p.connMu.Unlock() 28 + 29 + return op(p.conn) 30 + }
+71 -38
server/server.go
··· 10 10 "strings" 11 11 "sync" 12 12 "time" 13 + 14 + "github.com/willdot/messagebroker/server/peer" 13 15 ) 14 16 15 17 // Action represents the type of action that a peer requests to do ··· 21 23 Publish Action = 3 22 24 ) 23 25 26 + // Status represents the status of a request 27 + type Status uint8 28 + 29 + const ( 30 + Subscribed = 1 31 + Unsubscribed = 2 32 + Error = 3 33 + ) 34 + 35 + func (s Status) String() string { 36 + switch s { 37 + case Subscribed: 38 + return "subsribed" 39 + case Unsubscribed: 40 + return "unsubscribed" 41 + case Error: 42 + return "error" 43 + } 44 + 45 + return "" 46 + } 47 + 24 48 // Server accepts subscribe and publish connections and passes messages around 25 49 type Server struct { 26 - addr string 50 + Addr string 27 51 lis net.Listener 28 52 29 53 mu sync.Mutex ··· 31 55 } 32 56 33 57 // New creates and starts a new server 34 - func New(addr string) (*Server, error) { 35 - lis, err := net.Listen("tcp", addr) 58 + func New(Addr string) (*Server, error) { 59 + lis, err := net.Listen("tcp", Addr) 36 60 if err != nil { 37 61 return nil, fmt.Errorf("failed to listen: %w", err) 38 62 } ··· 69 93 } 70 94 71 95 func (s *Server) handleConn(conn net.Conn) { 72 - peer := newPeer(conn) 96 + peer := peer.New(conn) 73 97 74 98 action, err := readAction(peer, 0) 75 99 if err != nil { 76 - slog.Error("failed to read action from peer", "error", err, "peer", peer.addr()) 100 + slog.Error("failed to read action from peer", "error", err, "peer", peer.Addr()) 77 101 return 78 102 } 79 103 ··· 85 109 case Publish: 86 110 s.handlePublish(peer) 87 111 default: 88 - slog.Error("unknown action", "action", action, "peer", peer.addr()) 89 - writeStatus(Error, "unknown action", peer.conn) 112 + slog.Error("unknown action", "action", action, "peer", peer.Addr()) 113 + writeInvalidAction(peer) 90 114 } 91 115 } 92 116 93 - func (s *Server) handleSubscribe(peer *peer) { 117 + func (s *Server) handleSubscribe(peer *peer.Peer) { 94 118 // subscribe the peer to the topic 95 119 s.subscribePeerToTopic(peer) 96 120 ··· 105 129 continue 106 130 } 107 131 // TODO: see if there's a way to check if the peers connection has been ended etc 108 - slog.Error("failed to read action from subscriber", "error", err, "peer", peer.addr()) 132 + slog.Error("failed to read action from subscriber", "error", err, "peer", peer.Addr()) 109 133 110 134 s.unsubscribePeerFromAllTopics(*peer) 111 135 ··· 118 142 case Unsubscribe: 119 143 s.handleUnsubscribe(peer) 120 144 default: 121 - slog.Error("unknown action for subscriber", "action", action, "peer", peer.addr()) 122 - writeStatus(Error, "unknown action", peer.conn) 145 + slog.Error("unknown action for subscriber", "action", action, "peer", peer.Addr()) 146 + writeInvalidAction(peer) 123 147 continue 124 148 } 125 149 } 126 150 } 127 151 128 - func (s *Server) subscribePeerToTopic(peer *peer) { 152 + func (s *Server) subscribePeerToTopic(peer *peer.Peer) { 129 153 op := func(conn net.Conn) error { 130 154 // get the topics the peer wishes to subscribe to 131 155 dataLen, err := dataLength(conn) 132 156 if err != nil { 133 - slog.Error(err.Error(), "peer", peer.addr()) 157 + slog.Error(err.Error(), "peer", peer.Addr()) 134 158 writeStatus(Error, "invalid data length of topics provided", conn) 135 159 return nil 136 160 } ··· 142 166 buf := make([]byte, dataLen) 143 167 _, err = conn.Read(buf) 144 168 if err != nil { 145 - slog.Error("failed to read subscibers topic data", "error", err, "peer", peer.addr()) 169 + slog.Error("failed to read subscibers topic data", "error", err, "peer", peer.Addr()) 146 170 writeStatus(Error, "failed to read topic data", conn) 147 171 return nil 148 172 } ··· 150 174 var topics []string 151 175 err = json.Unmarshal(buf, &topics) 152 176 if err != nil { 153 - slog.Error("failed to unmarshal subscibers topic data", "error", err, "peer", peer.addr()) 177 + slog.Error("failed to unmarshal subscibers topic data", "error", err, "peer", peer.Addr()) 154 178 writeStatus(Error, "invalid topic data provided", conn) 155 179 return nil 156 180 } ··· 161 185 return nil 162 186 } 163 187 164 - _ = peer.connOperation(op, "subscribe peer to topic") 188 + _ = peer.ConnOperation(op) 165 189 } 166 190 167 - func (s *Server) handleUnsubscribe(peer *peer) { 191 + func (s *Server) handleUnsubscribe(peer *peer.Peer) { 168 192 op := func(conn net.Conn) error { 169 193 // get the topics the peer wishes to unsubscribe from 170 194 dataLen, err := dataLength(conn) 171 195 if err != nil { 172 - slog.Error(err.Error(), "peer", peer.addr()) 196 + slog.Error(err.Error(), "peer", peer.Addr()) 173 197 writeStatus(Error, "invalid data length of topics provided", conn) 174 198 return nil 175 199 } ··· 181 205 buf := make([]byte, dataLen) 182 206 _, err = conn.Read(buf) 183 207 if err != nil { 184 - slog.Error("failed to read subscibers topic data", "error", err, "peer", peer.addr()) 208 + slog.Error("failed to read subscibers topic data", "error", err, "peer", peer.Addr()) 185 209 writeStatus(Error, "failed to read topic data", conn) 186 210 return nil 187 211 } ··· 189 213 var topics []string 190 214 err = json.Unmarshal(buf, &topics) 191 215 if err != nil { 192 - slog.Error("failed to unmarshal subscibers topic data", "error", err, "peer", peer.addr()) 216 + slog.Error("failed to unmarshal subscibers topic data", "error", err, "peer", peer.Addr()) 193 217 writeStatus(Error, "invalid topic data provided", conn) 194 218 return nil 195 219 } ··· 200 224 return nil 201 225 } 202 226 203 - _ = peer.connOperation(op, "handle unsubscribe") 227 + _ = peer.ConnOperation(op) 204 228 } 205 229 206 230 type messageToSend struct { ··· 208 232 data []byte 209 233 } 210 234 211 - func (s *Server) handlePublish(peer *peer) { 235 + func (s *Server) handlePublish(peer *peer.Peer) { 212 236 for { 213 237 var message *messageToSend 214 238 215 239 op := func(conn net.Conn) error { 216 240 dataLen, err := dataLength(conn) 217 241 if err != nil { 218 - slog.Error("failed to read data length", "error", err, "peer", peer.addr()) 242 + slog.Error("failed to read data length", "error", err, "peer", peer.Addr()) 219 243 writeStatus(Error, "invalid data length of data provided", conn) 220 244 return nil 221 245 } ··· 225 249 topicBuf := make([]byte, dataLen) 226 250 _, err = conn.Read(topicBuf) 227 251 if err != nil { 228 - slog.Error("failed to read topic from peer", "error", err, "peer", peer.addr()) 252 + slog.Error("failed to read topic from peer", "error", err, "peer", peer.Addr()) 229 253 writeStatus(Error, "failed to read topic", conn) 230 254 return nil 231 255 } 232 256 233 257 topicStr := string(topicBuf) 234 258 if !strings.HasPrefix(topicStr, "topic:") { 235 - slog.Error("topic data does not contain topic prefix", "peer", peer.addr()) 259 + slog.Error("topic data does not contain topic prefix", "peer", peer.Addr()) 236 260 writeStatus(Error, "topic data does not contain 'topic:' prefix", conn) 237 261 return nil 238 262 } ··· 240 264 241 265 dataLen, err = dataLength(conn) 242 266 if err != nil { 243 - slog.Error(err.Error(), "peer", peer.addr()) 267 + slog.Error(err.Error(), "peer", peer.Addr()) 244 268 writeStatus(Error, "invalid data length of data provided", conn) 245 269 return nil 246 270 } ··· 251 275 dataBuf := make([]byte, dataLen) 252 276 _, err = conn.Read(dataBuf) 253 277 if err != nil { 254 - slog.Error("failed to read data from peer", "error", err, "peer", peer.addr()) 278 + slog.Error("failed to read data from peer", "error", err, "peer", peer.Addr()) 255 279 writeStatus(Error, "failed to read data", conn) 256 280 return nil 257 281 } ··· 263 287 return nil 264 288 } 265 289 266 - _ = peer.connOperation(op, "handle publish") 290 + _ = peer.ConnOperation(op) 267 291 268 292 if message == nil { 269 293 continue ··· 278 302 } 279 303 } 280 304 281 - func (s *Server) subscribeToTopics(peer *peer, topics []string) { 305 + func (s *Server) subscribeToTopics(peer *peer.Peer, topics []string) { 282 306 for _, topic := range topics { 283 307 s.addSubsciberToTopic(topic, peer) 284 308 } 285 309 } 286 310 287 - func (s *Server) addSubsciberToTopic(topicName string, peer *peer) { 311 + func (s *Server) addSubsciberToTopic(topicName string, peer *peer.Peer) { 288 312 s.mu.Lock() 289 313 defer s.mu.Unlock() 290 314 ··· 293 317 t = newTopic(topicName) 294 318 } 295 319 296 - t.subscriptions[peer.addr()] = subscriber{ 320 + t.subscriptions[peer.Addr()] = subscriber{ 297 321 peer: peer, 298 322 currentOffset: 0, 299 323 } ··· 301 325 s.topics[topicName] = t 302 326 } 303 327 304 - func (s *Server) unsubscribeToTopics(peer peer, topics []string) { 328 + func (s *Server) unsubscribeToTopics(peer peer.Peer, topics []string) { 305 329 for _, topic := range topics { 306 330 s.removeSubsciberFromTopic(topic, peer) 307 331 } 308 332 } 309 333 310 - func (s *Server) removeSubsciberFromTopic(topicName string, peer peer) { 334 + func (s *Server) removeSubsciberFromTopic(topicName string, peer peer.Peer) { 311 335 s.mu.Lock() 312 336 defer s.mu.Unlock() 313 337 ··· 316 340 return 317 341 } 318 342 319 - delete(t.subscriptions, peer.addr()) 343 + delete(t.subscriptions, peer.Addr()) 320 344 } 321 345 322 - func (s *Server) unsubscribePeerFromAllTopics(peer peer) { 346 + func (s *Server) unsubscribePeerFromAllTopics(peer peer.Peer) { 323 347 s.mu.Lock() 324 348 defer s.mu.Unlock() 325 349 326 350 for _, topic := range s.topics { 327 - delete(topic.subscriptions, peer.addr()) 351 + delete(topic.subscriptions, peer.Addr()) 328 352 } 329 353 } 330 354 ··· 339 363 return nil 340 364 } 341 365 342 - func readAction(peer *peer, timeout time.Duration) (Action, error) { 366 + func readAction(peer *peer.Peer, timeout time.Duration) (Action, error) { 343 367 var action Action 344 368 op := func(conn net.Conn) error { 345 369 if timeout > 0 { ··· 353 377 return nil 354 378 } 355 379 356 - err := peer.connOperation(op, "read action") 380 + err := peer.ConnOperation(op) 357 381 if err != nil { 358 382 return 0, fmt.Errorf("failed to read action from peer: %w", err) 359 383 } 360 384 361 385 return action, nil 386 + } 387 + 388 + func writeInvalidAction(peer *peer.Peer) { 389 + op := func(conn net.Conn) error { 390 + writeStatus(Error, "unknown action", conn) 391 + return nil 392 + } 393 + 394 + _ = peer.ConnOperation(op) 362 395 } 363 396 364 397 func dataLength(conn net.Conn) (uint32, error) {
+5 -5
server/topic.go
··· 6 6 "log/slog" 7 7 "net" 8 8 "sync" 9 + 10 + "github.com/willdot/messagebroker/server/peer" 9 11 ) 10 12 11 13 type topic struct { ··· 15 17 } 16 18 17 19 type subscriber struct { 18 - peer *peer 20 + peer *peer.Peer 19 21 currentOffset int 20 22 } 21 23 ··· 40 42 t.mu.Unlock() 41 43 42 44 for addr, subscriber := range subscribers { 43 - //sendMessageOpFunc := sendMessageOp(t.name, msgData) 44 - 45 - err := subscriber.peer.connOperation(sendMessageOp(t.name, msgData), "send message to subscribers") 45 + err := subscriber.peer.ConnOperation(sendMessageOp(t.name, msgData)) 46 46 if err != nil { 47 47 slog.Error("failed to send to message", "error", err, "peer", addr) 48 48 return ··· 50 50 } 51 51 } 52 52 53 - func sendMessageOp(topic string, data []byte) connOpp { 53 + func sendMessageOp(topic string, data []byte) peer.ConnOpp { 54 54 return func(conn net.Conn) error { 55 55 topicLen := uint64(len(topic)) 56 56 err := binary.Write(conn, binary.BigEndian, topicLen)