+2
-2
server/peer.go
+2
-2
server/peer.go
+10
-8
server/server.go
+10
-8
server/server.go
···
70
70
71
71
func (s *Server) handleConn(conn net.Conn) {
72
72
peer := newPeer(conn)
73
-
action, err := readAction(peer)
73
+
74
+
action, err := readAction(peer, 0)
74
75
if err != nil {
75
76
slog.Error("failed to read action from peer", "error", err, "peer", peer.addr())
76
77
return
···
78
79
79
80
switch action {
80
81
case Subscribe:
81
-
s.handleSubscribe(&peer)
82
+
s.handleSubscribe(peer)
82
83
case Unsubscribe:
83
-
s.handleUnsubscribe(&peer)
84
+
s.handleUnsubscribe(peer)
84
85
case Publish:
85
-
s.handlePublish(&peer)
86
+
s.handlePublish(peer)
86
87
default:
87
88
slog.Error("unknown action", "action", action, "peer", peer.addr())
88
89
writeStatus(Error, "unknown action", peer.conn)
···
96
97
// keep handling the peers connection, getting the action from the peer when it wishes to do something else.
97
98
// once the peers connection ends, it will be unsubscribed from all topics and returned
98
99
for {
99
-
action, err := readAction(*peer)
100
+
action, err := readAction(peer, time.Millisecond*100)
100
101
if err != nil {
101
102
var neterr net.Error
102
103
if errors.As(err, &neterr) && neterr.Timeout() {
···
338
339
return nil
339
340
}
340
341
341
-
// TODO: work out why this can't take a pointer to the peer
342
-
func readAction(peer peer) (Action, error) {
342
+
func readAction(peer *peer, timeout time.Duration) (Action, error) {
343
343
var action Action
344
344
op := func(conn net.Conn) error {
345
-
conn.SetReadDeadline(time.Now().Add(time.Second))
345
+
if timeout > 0 {
346
+
conn.SetReadDeadline(time.Now().Add(timeout))
347
+
}
346
348
347
349
err := binary.Read(conn, binary.BigEndian, &action)
348
350
if err != nil {