+2
-12
server/peer.go
+2
-12
server/peer.go
···
1
package server
2
3
import (
4
-
"log/slog"
5
"net"
6
"sync"
7
-
8
-
"github.com/google/uuid"
9
)
10
11
// Status represents the status of a request
···
33
type peer struct {
34
conn net.Conn
35
connMu sync.Mutex
36
-
name string
37
}
38
39
func newPeer(conn net.Conn) peer {
40
return peer{
41
conn: conn,
42
-
name: uuid.New().String(),
43
}
44
}
45
···
50
type connOpp func(conn net.Conn) error
51
52
func (p *peer) connOperation(op connOpp, from string) error {
53
-
slog.Info("operation running", "from", from, "peer", p.conn.RemoteAddr(), "name", p.name, "mu addr", &p.connMu)
54
-
55
p.connMu.Lock()
56
-
err := op(p.conn)
57
-
p.connMu.Unlock()
58
59
-
slog.Info("operation finished", "from", from, "peer", p.conn.RemoteAddr(), "name", p.name, "mu addr", &p.connMu)
60
-
61
-
return err
62
}
···
1
package server
2
3
import (
4
"net"
5
"sync"
6
)
7
8
// Status represents the status of a request
···
30
type peer struct {
31
conn net.Conn
32
connMu sync.Mutex
33
}
34
35
func newPeer(conn net.Conn) peer {
36
return peer{
37
conn: conn,
38
}
39
}
40
···
45
type connOpp func(conn net.Conn) error
46
47
func (p *peer) connOperation(op connOpp, from string) error {
48
p.connMu.Lock()
49
+
defer p.connMu.Unlock()
50
51
+
return op(p.conn)
52
}
+1
-1
server/server.go
+1
-1
server/server.go
+2
-2
server/server_test.go
+2
-2
server/server_test.go
···
215
func TestSendsDataToTopicSubscribers(t *testing.T) {
216
_ = createServer(t)
217
218
-
subscribers := make([]net.Conn, 0, 1)
219
-
for i := 0; i < 1; i++ {
220
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
221
222
subscribers = append(subscribers, subscriberConn)
···
215
func TestSendsDataToTopicSubscribers(t *testing.T) {
216
_ = createServer(t)
217
218
+
subscribers := make([]net.Conn, 0, 10)
219
+
for i := 0; i < 10; i++ {
220
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
221
222
subscribers = append(subscribers, subscriberConn)