+1
-5
go.mod
+1
-5
go.mod
-4
go.sum
-4
go.sum
···
1
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3
-
github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk=
4
-
github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
5
-
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
6
-
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
7
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
8
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
9
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
···
1
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
4
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
+9
-1
server/server.go
+9
-1
server/server.go
···
37
func (s Status) String() string {
38
switch s {
39
case Subscribed:
40
-
return "subsribed"
41
case Unsubscribed:
42
return "unsubscribed"
43
case Error:
···
96
97
func (s *Server) handleConn(conn net.Conn) {
98
peer := peer.New(conn)
99
100
action, err := readAction(peer, 0)
101
if err != nil {
···
119
}
120
121
func (s *Server) handleSubscribe(peer *peer.Peer) {
122
// subscribe the peer to the topic
123
s.subscribePeerToTopic(peer)
124
···
197
}
198
199
func (s *Server) handleUnsubscribe(peer *peer.Peer) {
200
op := func(conn net.Conn) error {
201
// get the topics the peer wishes to unsubscribe from
202
dataLen, err := dataLength(conn)
···
241
}
242
243
func (s *Server) handlePublish(peer *peer.Peer) {
244
for {
245
var message *messageToSend
246
···
316
}
317
318
func (s *Server) subscribeToTopics(peer *peer.Peer, topics []string) {
319
for _, topic := range topics {
320
s.addSubsciberToTopic(topic, peer)
321
}
···
339
}
340
341
func (s *Server) unsubscribeToTopics(peer *peer.Peer, topics []string) {
342
for _, topic := range topics {
343
s.removeSubsciberFromTopic(topic, peer)
344
}
···
37
func (s Status) String() string {
38
switch s {
39
case Subscribed:
40
+
return "subscribed"
41
case Unsubscribed:
42
return "unsubscribed"
43
case Error:
···
96
97
func (s *Server) handleConn(conn net.Conn) {
98
peer := peer.New(conn)
99
+
100
+
slog.Info("handling connection", "peer", peer.Addr())
101
+
defer slog.Info("ending connection", "peer", peer.Addr())
102
103
action, err := readAction(peer, 0)
104
if err != nil {
···
122
}
123
124
func (s *Server) handleSubscribe(peer *peer.Peer) {
125
+
slog.Info("handling subscriber", "peer", peer.Addr())
126
// subscribe the peer to the topic
127
s.subscribePeerToTopic(peer)
128
···
201
}
202
203
func (s *Server) handleUnsubscribe(peer *peer.Peer) {
204
+
slog.Info("handling unsubscriber", "peer", peer.Addr())
205
op := func(conn net.Conn) error {
206
// get the topics the peer wishes to unsubscribe from
207
dataLen, err := dataLength(conn)
···
246
}
247
248
func (s *Server) handlePublish(peer *peer.Peer) {
249
+
slog.Info("handling publisher", "peer", peer.Addr())
250
for {
251
var message *messageToSend
252
···
322
}
323
324
func (s *Server) subscribeToTopics(peer *peer.Peer, topics []string) {
325
+
slog.Info("subscribing peer to topics", "topics", topics, "peer", peer.Addr())
326
for _, topic := range topics {
327
s.addSubsciberToTopic(topic, peer)
328
}
···
346
}
347
348
func (s *Server) unsubscribeToTopics(peer *peer.Peer, topics []string) {
349
+
slog.Info("unsubscribing peer from topics", "topics", topics, "peer", peer.Addr())
350
for _, topic := range topics {
351
s.removeSubsciberFromTopic(topic, peer)
352
}