+10
-1
example/main.go
+10
-1
example/main.go
···
8
8
"time"
9
9
10
10
"github.com/willdot/messagebroker/pubsub"
11
+
"github.com/willdot/messagebroker/server"
11
12
)
12
13
13
14
var consumeOnly *bool
15
+
var consumeFrom *int
14
16
15
17
func main() {
16
18
consumeOnly = flag.Bool("consume-only", false, "just consumes (doesn't start server and doesn't publish)")
19
+
consumeFrom = flag.Int("consume-from", -1, "index of message to start consuming from. If not set it will consume from the most recent")
17
20
flag.Parse()
18
21
19
22
if !*consumeOnly {
···
28
31
defer func() {
29
32
_ = sub.Close()
30
33
}()
34
+
startAt := 0
35
+
startAtType := server.Current
36
+
if *consumeFrom >= 0-1 {
37
+
startAtType = server.From
38
+
startAt = *consumeFrom
39
+
}
31
40
32
-
err = sub.SubscribeToTopics([]string{"topic a"})
41
+
err = sub.SubscribeToTopics([]string{"topic a"}, startAtType, startAt)
33
42
if err != nil {
34
43
panic(err)
35
44
}
+1
-1
example/server/main.go
+1
-1
example/server/main.go
+92
-73
pubsub/subscriber.go
+92
-73
pubsub/subscriber.go
···
39
39
}
40
40
41
41
// SubscribeToTopics will subscribe to the provided topics
42
-
func (s *Subscriber) SubscribeToTopics(topicNames []string) error {
42
+
func (s *Subscriber) SubscribeToTopics(topicNames []string, startAtType server.StartAtType, startAtIndex int) error {
43
43
op := func(conn net.Conn) error {
44
-
actionB := make([]byte, 2)
45
-
binary.BigEndian.PutUint16(actionB, server.Subscribed)
46
-
headers := actionB
44
+
return subscribeToTopics(conn, topicNames, startAtType, startAtIndex)
45
+
}
46
+
47
+
return s.connOperation(op)
48
+
}
47
49
48
-
b, err := json.Marshal(topicNames)
49
-
if err != nil {
50
-
return fmt.Errorf("failed to marshal topic names: %w", err)
51
-
}
50
+
// UnsubscribeToTopics will unsubscribe to the provided topics
51
+
func (s *Subscriber) UnsubscribeToTopics(topicNames []string) error {
52
+
op := func(conn net.Conn) error {
53
+
return unsubscribeToTopics(conn, topicNames)
54
+
}
52
55
53
-
topicNamesB := make([]byte, 4)
54
-
binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
55
-
headers = append(headers, topicNamesB...)
56
+
return s.connOperation(op)
57
+
}
58
+
59
+
func subscribeToTopics(conn net.Conn, topicNames []string, startAtType server.StartAtType, startAtIndex int) error {
60
+
actionB := make([]byte, 2)
61
+
binary.BigEndian.PutUint16(actionB, uint16(server.Subscribe))
62
+
headers := actionB
63
+
64
+
b, err := json.Marshal(topicNames)
65
+
if err != nil {
66
+
return fmt.Errorf("failed to marshal topic names: %w", err)
67
+
}
68
+
69
+
topicNamesB := make([]byte, 4)
70
+
binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
71
+
headers = append(headers, topicNamesB...)
72
+
headers = append(headers, b...)
73
+
74
+
startAtTypeB := make([]byte, 2)
75
+
binary.BigEndian.PutUint16(startAtTypeB, uint16(startAtType))
76
+
headers = append(headers, startAtTypeB...)
56
77
57
-
_, err = conn.Write(append(headers, b...))
58
-
if err != nil {
59
-
return fmt.Errorf("failed to subscribe to topics: %w", err)
60
-
}
78
+
if startAtType == server.From {
79
+
fromB := make([]byte, 2)
80
+
binary.BigEndian.PutUint16(fromB, uint16(startAtIndex))
81
+
headers = append(headers, fromB...)
82
+
}
61
83
62
-
var resp server.Status
63
-
err = binary.Read(conn, binary.BigEndian, &resp)
64
-
if err != nil {
65
-
return fmt.Errorf("failed to read confirmation of subscription: %w", err)
66
-
}
84
+
_, err = conn.Write(headers)
85
+
if err != nil {
86
+
return fmt.Errorf("failed to subscribe to topics: %w", err)
87
+
}
67
88
68
-
if resp == server.Subscribed {
69
-
return nil
70
-
}
89
+
var resp server.Status
90
+
err = binary.Read(conn, binary.BigEndian, &resp)
91
+
if err != nil {
92
+
return fmt.Errorf("failed to read confirmation of subscribe: %w", err)
93
+
}
71
94
72
-
var dataLen uint32
73
-
err = binary.Read(conn, binary.BigEndian, &dataLen)
74
-
if err != nil {
75
-
return fmt.Errorf("received status %s:", resp)
76
-
}
95
+
if resp == server.Subscribed {
96
+
return nil
97
+
}
77
98
78
-
buf := make([]byte, dataLen)
79
-
_, err = conn.Read(buf)
80
-
if err != nil {
81
-
return fmt.Errorf("received status %s:", resp)
82
-
}
99
+
var dataLen uint32
100
+
err = binary.Read(conn, binary.BigEndian, &dataLen)
101
+
if err != nil {
102
+
return fmt.Errorf("received status %s:", resp)
103
+
}
83
104
84
-
return fmt.Errorf("received status %s - %s", resp, buf)
105
+
buf := make([]byte, dataLen)
106
+
_, err = conn.Read(buf)
107
+
if err != nil {
108
+
return fmt.Errorf("received status %s:", resp)
85
109
}
86
110
87
-
return s.connOperation(op)
111
+
return fmt.Errorf("received status %s - %s", resp, buf)
88
112
}
89
113
90
-
// UnsubscribeToTopics will unsubscribe to the provided topics
91
-
func (s *Subscriber) UnsubscribeToTopics(topicNames []string) error {
92
-
op := func(conn net.Conn) error {
93
-
actionB := make([]byte, 2)
94
-
binary.BigEndian.PutUint16(actionB, uint16(server.Unsubscribe))
95
-
headers := actionB
114
+
func unsubscribeToTopics(conn net.Conn, topicNames []string) error {
115
+
actionB := make([]byte, 2)
116
+
binary.BigEndian.PutUint16(actionB, uint16(server.Unsubscribe))
117
+
headers := actionB
96
118
97
-
b, err := json.Marshal(topicNames)
98
-
if err != nil {
99
-
return fmt.Errorf("failed to marshal topic names: %w", err)
100
-
}
101
-
102
-
topicNamesB := make([]byte, 4)
103
-
binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
104
-
headers = append(headers, topicNamesB...)
119
+
b, err := json.Marshal(topicNames)
120
+
if err != nil {
121
+
return fmt.Errorf("failed to marshal topic names: %w", err)
122
+
}
105
123
106
-
_, err = conn.Write(append(headers, b...))
107
-
if err != nil {
108
-
return fmt.Errorf("failed to unsubscribe to topics: %w", err)
109
-
}
124
+
topicNamesB := make([]byte, 4)
125
+
binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
126
+
headers = append(headers, topicNamesB...)
110
127
111
-
var resp server.Status
112
-
err = binary.Read(conn, binary.BigEndian, &resp)
113
-
if err != nil {
114
-
return fmt.Errorf("failed to read confirmation of unsubscription: %w", err)
115
-
}
128
+
_, err = conn.Write(append(headers, b...))
129
+
if err != nil {
130
+
return fmt.Errorf("failed to unsubscribe to topics: %w", err)
131
+
}
116
132
117
-
if resp == server.Unsubscribed {
118
-
return nil
119
-
}
133
+
var resp server.Status
134
+
err = binary.Read(conn, binary.BigEndian, &resp)
135
+
if err != nil {
136
+
return fmt.Errorf("failed to read confirmation of unsubscribe: %w", err)
137
+
}
120
138
121
-
var dataLen uint32
122
-
err = binary.Read(conn, binary.BigEndian, &dataLen)
123
-
if err != nil {
124
-
return fmt.Errorf("received status %s:", resp)
125
-
}
139
+
if resp == server.Unsubscribed {
140
+
return nil
141
+
}
126
142
127
-
buf := make([]byte, dataLen)
128
-
_, err = conn.Read(buf)
129
-
if err != nil {
130
-
return fmt.Errorf("received status %s:", resp)
131
-
}
143
+
var dataLen uint32
144
+
err = binary.Read(conn, binary.BigEndian, &dataLen)
145
+
if err != nil {
146
+
return fmt.Errorf("received status %s:", resp)
147
+
}
132
148
133
-
return fmt.Errorf("received status %s - %s", resp, buf)
149
+
buf := make([]byte, dataLen)
150
+
_, err = conn.Read(buf)
151
+
if err != nil {
152
+
return fmt.Errorf("received status %s:", resp)
134
153
}
135
154
136
-
return s.connOperation(op)
155
+
return fmt.Errorf("received status %s - %s", resp, buf)
137
156
}
138
157
139
158
// Consumer allows the consumption of messages. If during the consumer receiving messages from the
+15
-4
pubsub/subscriber_test.go
+15
-4
pubsub/subscriber_test.go
···
18
18
topicB = "topic b"
19
19
)
20
20
21
+
type fakeStore struct {
22
+
}
23
+
24
+
func (f *fakeStore) Write(msg server.MessageToSend) error {
25
+
return nil
26
+
}
27
+
func (f *fakeStore) ReadFrom(offset int, handleFunc func(msgs []server.MessageToSend)) error {
28
+
return nil
29
+
}
30
+
21
31
func createServer(t *testing.T) {
22
-
server, err := server.New(serverAddr, time.Millisecond*100, time.Millisecond*100)
32
+
fs := &fakeStore{}
33
+
server, err := server.New(serverAddr, time.Millisecond*100, time.Millisecond*100, fs)
23
34
require.NoError(t, err)
24
35
25
36
t.Cleanup(func() {
···
75
86
76
87
topics := []string{topicA, topicB}
77
88
78
-
err = sub.SubscribeToTopics(topics)
89
+
err = sub.SubscribeToTopics(topics, server.Current, 0)
79
90
require.NoError(t, err)
80
91
}
81
92
···
91
102
92
103
topics := []string{topicA, topicB}
93
104
94
-
err = sub.SubscribeToTopics(topics)
105
+
err = sub.SubscribeToTopics(topics, server.Current, 0)
95
106
require.NoError(t, err)
96
107
97
108
err = sub.UnsubscribeToTopics([]string{topicA})
···
258
269
259
270
topics := []string{topicA, topicB}
260
271
261
-
err = sub.SubscribeToTopics(topics)
272
+
err = sub.SubscribeToTopics(topics, server.Current, 0)
262
273
require.NoError(t, err)
263
274
264
275
ctx, cancel := context.WithCancel(context.Background())
+47
server/message_store.go
+47
server/message_store.go
···
1
+
package server
2
+
3
+
import (
4
+
"fmt"
5
+
"sync"
6
+
)
7
+
8
+
type MemoryStore struct {
9
+
mu sync.Mutex
10
+
msgs map[int]MessageToSend
11
+
offset int
12
+
}
13
+
14
+
func NewMemoryStore() *MemoryStore {
15
+
return &MemoryStore{
16
+
msgs: make(map[int]MessageToSend),
17
+
}
18
+
}
19
+
20
+
func (m *MemoryStore) Write(msg MessageToSend) error {
21
+
m.mu.Lock()
22
+
defer m.mu.Unlock()
23
+
24
+
m.msgs[m.offset] = msg
25
+
26
+
m.offset++
27
+
28
+
return nil
29
+
}
30
+
31
+
func (m *MemoryStore) ReadFrom(offset int, handleFunc func(msgs []MessageToSend)) error {
32
+
if offset < 0 || offset > m.offset {
33
+
return fmt.Errorf("invalid offset provided")
34
+
}
35
+
36
+
m.mu.Lock()
37
+
defer m.mu.Unlock()
38
+
39
+
msgs := make([]MessageToSend, 0, len(m.msgs))
40
+
for i := offset; i < len(m.msgs); i++ {
41
+
msgs = append(msgs, m.msgs[i])
42
+
}
43
+
44
+
handleFunc(msgs)
45
+
46
+
return nil
47
+
}
+95
-30
server/server.go
+95
-30
server/server.go
···
27
27
Nack Action = 5
28
28
)
29
29
30
+
func (a Action) String() string {
31
+
switch a {
32
+
case Subscribe:
33
+
return "subscribe"
34
+
case Unsubscribe:
35
+
return "unsubscribe"
36
+
case Publish:
37
+
return "publish"
38
+
case Ack:
39
+
return "ack"
40
+
case Nack:
41
+
return "nack"
42
+
}
43
+
44
+
return ""
45
+
}
46
+
30
47
// Status represents the status of a request
31
48
type Status uint16
32
49
33
50
const (
34
-
Subscribed = 1
35
-
Unsubscribed = 2
36
-
Error = 3
51
+
Subscribed Status = 1
52
+
Unsubscribed Status = 2
53
+
Error Status = 3
37
54
)
38
55
39
56
func (s Status) String() string {
···
49
66
return ""
50
67
}
51
68
69
+
// StartAtType represents where the subcriber wishes to start subscribing to a topic from
70
+
type StartAtType uint16
71
+
72
+
const (
73
+
Begining StartAtType = 0
74
+
Current StartAtType = 1
75
+
From StartAtType = 2
76
+
)
77
+
78
+
type Store interface {
79
+
Write(msg MessageToSend) error
80
+
ReadFrom(offset int, handleFunc func(msgs []MessageToSend)) error
81
+
}
82
+
52
83
// Server accepts subscribe and publish connections and passes messages around
53
84
type Server struct {
54
85
Addr string
···
59
90
60
91
ackDelay time.Duration
61
92
ackTimeout time.Duration
93
+
94
+
messageStore Store
62
95
}
63
96
64
97
// New creates and starts a new server
65
-
func New(Addr string, ackDelay, ackTimeout time.Duration) (*Server, error) {
98
+
func New(Addr string, ackDelay, ackTimeout time.Duration, messageStore Store) (*Server, error) {
66
99
lis, err := net.Listen("tcp", Addr)
67
100
if err != nil {
68
101
return nil, fmt.Errorf("failed to listen: %w", err)
69
102
}
70
103
71
104
srv := &Server{
72
-
lis: lis,
73
-
topics: map[string]*topic{},
74
-
ackDelay: ackDelay,
75
-
ackTimeout: ackTimeout,
105
+
lis: lis,
106
+
topics: map[string]*topic{},
107
+
ackDelay: ackDelay,
108
+
ackTimeout: ackTimeout,
109
+
messageStore: messageStore,
76
110
}
77
111
78
112
go srv.start()
···
191
225
}
192
226
193
227
var topics []string
228
+
fmt.Println(string(buf))
194
229
err = json.Unmarshal(buf, &topics)
195
230
if err != nil {
196
231
slog.Error("failed to unmarshal subscibers topic data", "error", err, "peer", peer.Addr())
···
198
233
return nil
199
234
}
200
235
201
-
s.subscribeToTopics(peer, topics)
236
+
var startAtType StartAtType
237
+
err = binary.Read(conn, binary.BigEndian, &startAtType)
238
+
if err != nil {
239
+
slog.Error(err.Error(), "peer", peer.Addr())
240
+
writeStatus(Error, "invalid start at type provided", conn)
241
+
return nil
242
+
}
243
+
var startAt int
244
+
switch startAtType {
245
+
case From:
246
+
// read the from
247
+
var s uint16
248
+
err = binary.Read(conn, binary.BigEndian, &s)
249
+
if err != nil {
250
+
slog.Error(err.Error(), "peer", peer.Addr())
251
+
writeStatus(Error, "invalid start at value provided", conn)
252
+
return nil
253
+
}
254
+
startAt = int(s)
255
+
case Begining:
256
+
startAt = 0
257
+
case Current:
258
+
startAt = -1
259
+
default:
260
+
slog.Error("invalid start up type provided", "start up type", startAtType)
261
+
writeStatus(Error, "invalid start up type provided", conn)
262
+
return nil
263
+
}
264
+
265
+
s.subscribeToTopics(peer, topics, startAt)
202
266
writeStatus(Subscribed, "", conn)
203
267
204
268
return nil
···
247
311
_ = peer.RunConnOperation(op)
248
312
}
249
313
250
-
type messageToSend struct {
314
+
type MessageToSend struct {
251
315
topic string
252
316
data []byte
253
317
}
···
255
319
func (s *Server) handlePublish(peer *peer.Peer) {
256
320
slog.Info("handling publisher", "peer", peer.Addr())
257
321
for {
258
-
var message *messageToSend
322
+
var message *MessageToSend
259
323
260
324
op := func(conn net.Conn) error {
261
325
dataLen, err := dataLength(conn)
···
304
368
return nil
305
369
}
306
370
307
-
message = &messageToSend{
371
+
message = &MessageToSend{
308
372
topic: topicStr,
309
373
data: dataBuf,
310
374
}
311
-
return nil
312
-
}
313
375
314
-
_ = peer.RunConnOperation(op)
376
+
topic := s.getTopic(message.topic)
377
+
if topic == nil {
378
+
topic = newTopic(message.topic, s.messageStore)
379
+
s.topics[message.topic] = topic
380
+
}
315
381
316
-
if message == nil {
317
-
continue
382
+
err = topic.sendMessageToSubscribers(*message)
383
+
if err != nil {
384
+
slog.Error("failed to send message to subscribers", "error", err, "peer", peer.Addr())
385
+
writeStatus(Error, "failed to send message to subscribers", conn)
386
+
return nil
387
+
}
388
+
389
+
return nil
318
390
}
319
391
320
-
// sending messages to the subscribers can be done async because the publisher doesn't need to wait for
321
-
// subscribers to be sent the message
322
-
go func() {
323
-
topic := s.getTopic(message.topic)
324
-
if topic != nil {
325
-
topic.sendMessageToSubscribers(message.data)
326
-
}
327
-
}()
392
+
_ = peer.RunConnOperation(op)
328
393
}
329
394
}
330
395
331
-
func (s *Server) subscribeToTopics(peer *peer.Peer, topics []string) {
396
+
func (s *Server) subscribeToTopics(peer *peer.Peer, topics []string, startAt int) {
332
397
slog.Info("subscribing peer to topics", "topics", topics, "peer", peer.Addr())
333
398
for _, topic := range topics {
334
-
s.addSubsciberToTopic(topic, peer)
399
+
s.addSubsciberToTopic(topic, peer, startAt)
335
400
}
336
401
}
337
402
338
-
func (s *Server) addSubsciberToTopic(topicName string, peer *peer.Peer) {
403
+
func (s *Server) addSubsciberToTopic(topicName string, peer *peer.Peer, startAt int) {
339
404
s.mu.Lock()
340
405
defer s.mu.Unlock()
341
406
342
407
t, ok := s.topics[topicName]
343
408
if !ok {
344
-
t = newTopic(topicName)
409
+
t = newTopic(topicName, s.messageStore)
345
410
}
346
411
347
-
t.subscriptions[peer.Addr()] = newSubscriber(peer, topicName, s.ackDelay, s.ackTimeout)
412
+
t.subscriptions[peer.Addr()] = newSubscriber(peer, topicName, s.ackDelay, s.ackTimeout, s.messageStore, startAt)
348
413
349
414
s.topics[topicName] = t
350
415
}
+153
-63
server/server_test.go
+153
-63
server/server_test.go
···
24
24
)
25
25
26
26
func createServer(t *testing.T) *Server {
27
-
srv, err := New(serverAddr, ackDelay, ackTimeout)
27
+
store := NewMemoryStore()
28
+
srv, err := New(serverAddr, ackDelay, ackTimeout, store)
28
29
require.NoError(t, err)
29
30
30
31
t.Cleanup(func() {
···
44
45
return srv
45
46
}
46
47
47
-
func createConnectionAndSubscribe(t *testing.T, topics []string) net.Conn {
48
+
func createConnectionAndSubscribe(t *testing.T, topics []string, startAtType StartAtType, startAtIndex int) net.Conn {
48
49
conn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
49
50
require.NoError(t, err)
50
51
51
-
subscribeOrUnsubscribetoTopics(t, conn, topics, Subscribe)
52
+
subscribeToTopics(t, conn, topics, startAtType, startAtIndex)
52
53
53
54
expectedRes := Subscribed
54
55
···
56
57
err = binary.Read(conn, binary.BigEndian, &resp)
57
58
require.NoError(t, err)
58
59
59
-
assert.Equal(t, expectedRes, int(resp))
60
+
assert.Equal(t, expectedRes, resp)
60
61
61
62
return conn
62
63
}
···
76
77
require.NoError(t, err)
77
78
}
78
79
79
-
func subscribeOrUnsubscribetoTopics(t *testing.T, conn net.Conn, topics []string, action Action) {
80
+
func subscribeToTopics(t *testing.T, conn net.Conn, topics []string, startAtType StartAtType, startAtIndex int) {
80
81
actionB := make([]byte, 2)
81
-
binary.BigEndian.PutUint16(actionB, uint16(action))
82
+
binary.BigEndian.PutUint16(actionB, uint16(Subscribe))
83
+
headers := actionB
84
+
85
+
b, err := json.Marshal(topics)
86
+
require.NoError(t, err)
87
+
88
+
topicNamesB := make([]byte, 4)
89
+
binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
90
+
headers = append(headers, topicNamesB...)
91
+
headers = append(headers, b...)
92
+
93
+
startAtTypeB := make([]byte, 2)
94
+
binary.BigEndian.PutUint16(startAtTypeB, uint16(startAtType))
95
+
headers = append(headers, startAtTypeB...)
96
+
97
+
if startAtType == From {
98
+
fromB := make([]byte, 2)
99
+
binary.BigEndian.PutUint16(fromB, uint16(startAtIndex))
100
+
headers = append(headers, fromB...)
101
+
}
102
+
103
+
_, err = conn.Write(headers)
104
+
require.NoError(t, err)
105
+
}
106
+
107
+
func unsubscribetoTopics(t *testing.T, conn net.Conn, topics []string) {
108
+
actionB := make([]byte, 2)
109
+
binary.BigEndian.PutUint16(actionB, uint16(Unsubscribe))
82
110
headers := actionB
83
111
84
112
b, err := json.Marshal(topics)
···
97
125
// existing topic
98
126
srv := createServerWithExistingTopic(t, topicA)
99
127
100
-
_ = createConnectionAndSubscribe(t, []string{topicA, topicB})
128
+
_ = createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
101
129
102
130
assert.Len(t, srv.topics, 2)
103
131
assert.Len(t, srv.topics[topicA].subscriptions, 1)
···
107
135
func TestUnsubscribesFromTopic(t *testing.T) {
108
136
srv := createServerWithExistingTopic(t, topicA)
109
137
110
-
conn := createConnectionAndSubscribe(t, []string{topicA, topicB, topicC})
138
+
conn := createConnectionAndSubscribe(t, []string{topicA, topicB, topicC}, Current, 0)
111
139
112
140
assert.Len(t, srv.topics, 3)
113
141
assert.Len(t, srv.topics[topicA].subscriptions, 1)
···
116
144
117
145
topics := []string{topicA, topicB}
118
146
119
-
subscribeOrUnsubscribetoTopics(t, conn, topics, Unsubscribe)
147
+
unsubscribetoTopics(t, conn, topics)
120
148
121
149
expectedRes := Unsubscribed
122
150
···
124
152
err := binary.Read(conn, binary.BigEndian, &resp)
125
153
require.NoError(t, err)
126
154
127
-
assert.Equal(t, expectedRes, int(resp))
155
+
assert.Equal(t, expectedRes, resp)
128
156
129
157
assert.Len(t, srv.topics, 3)
130
158
assert.Len(t, srv.topics[topicA].subscriptions, 0)
···
135
163
func TestSubscriberClosesWithoutUnsubscribing(t *testing.T) {
136
164
srv := createServer(t)
137
165
138
-
conn := createConnectionAndSubscribe(t, []string{topicA, topicB})
166
+
conn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
139
167
140
168
assert.Len(t, srv.topics, 2)
141
169
assert.Len(t, srv.topics[topicA].subscriptions, 1)
···
175
203
err = binary.Read(conn, binary.BigEndian, &resp)
176
204
require.NoError(t, err)
177
205
178
-
assert.Equal(t, expectedRes, int(resp))
206
+
assert.Equal(t, expectedRes, resp)
179
207
180
208
expectedMessage := "unknown action"
181
209
···
213
241
err = binary.Read(publisherConn, binary.BigEndian, &resp)
214
242
require.NoError(t, err)
215
243
216
-
assert.Equal(t, expectedRes, int(resp))
244
+
assert.Equal(t, expectedRes, resp)
217
245
218
246
expectedMessage := "topic data does not contain 'topic:' prefix"
219
247
···
234
262
235
263
subscribers := make([]net.Conn, 0, 10)
236
264
for i := 0; i < 10; i++ {
237
-
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
265
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
238
266
239
267
subscribers = append(subscribers, subscriberConn)
240
268
}
···
252
280
253
281
// check the subsribers got the data
254
282
for _, conn := range subscribers {
255
-
var topicLen uint64
256
-
err = binary.Read(conn, binary.BigEndian, &topicLen)
257
-
require.NoError(t, err)
258
-
259
-
topicBuf := make([]byte, topicLen)
260
-
_, err = conn.Read(topicBuf)
261
-
require.NoError(t, err)
262
-
assert.Equal(t, topicA, string(topicBuf))
263
-
264
-
var dataLen uint64
265
-
err = binary.Read(conn, binary.BigEndian, &dataLen)
266
-
require.NoError(t, err)
267
-
268
-
buf := make([]byte, dataLen)
269
-
n, err := conn.Read(buf)
270
-
require.NoError(t, err)
271
-
272
-
require.Equal(t, int(dataLen), n)
273
-
274
-
assert.Equal(t, messageData, string(buf))
275
-
276
-
err = binary.Write(conn, binary.BigEndian, Ack)
277
-
require.NoError(t, err)
283
+
msg := readMessage(t, conn)
284
+
assert.Equal(t, messageData, string(msg))
278
285
}
279
286
}
280
287
···
294
301
295
302
subscribeFinCh := make(chan struct{})
296
303
// create a subscriber that will read messages
297
-
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
304
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
298
305
go func() {
299
306
// check subscriber got all messages
300
307
results := make([]string, 0, len(messages))
301
308
for i := 0; i < len(messages); i++ {
302
-
var topicLen uint64
303
-
err = binary.Read(subscriberConn, binary.BigEndian, &topicLen)
304
-
require.NoError(t, err)
305
-
306
-
topicBuf := make([]byte, topicLen)
307
-
_, err = subscriberConn.Read(topicBuf)
308
-
require.NoError(t, err)
309
-
assert.Equal(t, topicA, string(topicBuf))
310
-
311
-
var dataLen uint64
312
-
err = binary.Read(subscriberConn, binary.BigEndian, &dataLen)
313
-
require.NoError(t, err)
314
-
315
-
buf := make([]byte, dataLen)
316
-
n, err := subscriberConn.Read(buf)
317
-
require.NoError(t, err)
318
-
require.Equal(t, int(dataLen), n)
319
-
320
-
results = append(results, string(buf))
321
-
322
-
err = binary.Write(subscriberConn, binary.BigEndian, Ack)
323
-
require.NoError(t, err)
309
+
msg := readMessage(t, subscriberConn)
310
+
results = append(results, string(msg))
324
311
}
325
312
326
313
assert.ElementsMatch(t, results, messages)
···
346
333
func TestSendsDataToTopicSubscriberNacksThenAcks(t *testing.T) {
347
334
_ = createServer(t)
348
335
349
-
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
336
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
350
337
351
338
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
352
339
require.NoError(t, err)
···
400
387
func TestSendsDataToTopicSubscriberDoesntAckMessage(t *testing.T) {
401
388
_ = createServer(t)
402
389
403
-
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
390
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
404
391
405
392
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
406
393
require.NoError(t, err)
···
458
445
func TestSendsDataToTopicSubscriberDeliveryCountTooHighWithNoAck(t *testing.T) {
459
446
_ = createServer(t)
460
447
461
-
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
448
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
462
449
463
450
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
464
451
require.NoError(t, err)
···
514
501
err = binary.Read(subscriberConn, binary.BigEndian, &topicLen)
515
502
require.Error(t, err)
516
503
}
504
+
505
+
func TestSubscribeAndReplaysFromStart(t *testing.T) {
506
+
_ = createServer(t)
507
+
508
+
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
509
+
require.NoError(t, err)
510
+
511
+
err = binary.Write(publisherConn, binary.BigEndian, Publish)
512
+
require.NoError(t, err)
513
+
514
+
messages := make([]string, 0, 10)
515
+
for i := 0; i < 10; i++ {
516
+
messages = append(messages, fmt.Sprintf("message %d", i))
517
+
}
518
+
519
+
// send messages first
520
+
topic := fmt.Sprintf("topic:%s", topicA)
521
+
522
+
// send multiple messages
523
+
for _, msg := range messages {
524
+
sendMessage(t, publisherConn, topic, []byte(msg))
525
+
}
526
+
527
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA}, From, 0)
528
+
results := make([]string, 0, len(messages))
529
+
for i := 0; i < len(messages); i++ {
530
+
msg := readMessage(t, subscriberConn)
531
+
results = append(results, string(msg))
532
+
}
533
+
assert.ElementsMatch(t, results, messages)
534
+
}
535
+
536
+
func TestSubscribeAndReplaysFromIndex(t *testing.T) {
537
+
_ = createServer(t)
538
+
539
+
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
540
+
require.NoError(t, err)
541
+
542
+
err = binary.Write(publisherConn, binary.BigEndian, Publish)
543
+
require.NoError(t, err)
544
+
545
+
messages := make([]string, 0, 10)
546
+
for i := 0; i < 10; i++ {
547
+
messages = append(messages, fmt.Sprintf("message %d", i))
548
+
}
549
+
550
+
// send messages first
551
+
topic := fmt.Sprintf("topic:%s", topicA)
552
+
553
+
// send multiple messages
554
+
for _, msg := range messages {
555
+
sendMessage(t, publisherConn, topic, []byte(msg))
556
+
}
557
+
558
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA}, From, 3)
559
+
560
+
// now that the subscriber has subecribed send another message that should arrive after all the other messages were consumed
561
+
sendMessage(t, publisherConn, topic, []byte("hello there"))
562
+
563
+
results := make([]string, 0, len(messages))
564
+
for i := 0; i < len(messages)-3; i++ {
565
+
msg := readMessage(t, subscriberConn)
566
+
results = append(results, string(msg))
567
+
}
568
+
require.Len(t, results, 7)
569
+
expMessages := make([]string, 0, 7)
570
+
for i, msg := range messages {
571
+
if i < 3 {
572
+
continue
573
+
}
574
+
expMessages = append(expMessages, msg)
575
+
}
576
+
assert.Equal(t, expMessages, results)
577
+
578
+
// now check we can get the message that was sent after the subscription was created
579
+
msg := readMessage(t, subscriberConn)
580
+
assert.Equal(t, "hello there", string(msg))
581
+
}
582
+
583
+
func readMessage(t *testing.T, subscriberConn net.Conn) []byte {
584
+
var topicLen uint64
585
+
err := binary.Read(subscriberConn, binary.BigEndian, &topicLen)
586
+
require.NoError(t, err)
587
+
588
+
topicBuf := make([]byte, topicLen)
589
+
_, err = subscriberConn.Read(topicBuf)
590
+
require.NoError(t, err)
591
+
assert.Equal(t, topicA, string(topicBuf))
592
+
593
+
var dataLen uint64
594
+
err = binary.Read(subscriberConn, binary.BigEndian, &dataLen)
595
+
require.NoError(t, err)
596
+
597
+
buf := make([]byte, dataLen)
598
+
n, err := subscriberConn.Read(buf)
599
+
require.NoError(t, err)
600
+
require.Equal(t, int(dataLen), n)
601
+
602
+
err = binary.Write(subscriberConn, binary.BigEndian, Ack)
603
+
require.NoError(t, err)
604
+
605
+
return buf
606
+
}
+20
-2
server/subscriber.go
+20
-2
server/subscriber.go
···
29
29
return message{data: data, deliveryCount: 1}
30
30
}
31
31
32
-
func newSubscriber(peer *peer.Peer, topic string, ackDelay, ackTimeout time.Duration) *subscriber {
32
+
func newSubscriber(peer *peer.Peer, topic string, ackDelay, ackTimeout time.Duration, messageStore Store, startAt int) *subscriber {
33
33
s := &subscriber{
34
34
peer: peer,
35
35
topic: topic,
36
36
messages: make(chan message),
37
37
ackDelay: ackDelay,
38
38
ackTimeout: ackTimeout,
39
-
unsubscribeCh: make(chan struct{}),
39
+
unsubscribeCh: make(chan struct{}, 1),
40
40
}
41
41
42
42
go s.sendMessages()
43
+
44
+
offset := startAt
45
+
46
+
go func() {
47
+
// here we need to replay all messages from the store for the topic.
48
+
err := messageStore.ReadFrom(offset, func(msgs []MessageToSend) {
49
+
// go func() {
50
+
for _, msg := range msgs {
51
+
s.messages <- newMessage(msg.data)
52
+
}
53
+
// }()
54
+
})
55
+
if err != nil {
56
+
slog.Error("failed to replay messages from offset", "error", err, "offset", offset)
57
+
}
58
+
}()
43
59
44
60
return s
45
61
}
···
79
95
case <-s.unsubscribeCh:
80
96
return
81
97
case <-timer.C:
98
+
fmt.Printf("waiting to put message on queue: %s\n", msg.data)
82
99
s.messages <- msg
100
+
fmt.Printf("put message on queue: %s\n", msg.data)
83
101
}
84
102
}()
85
103
}
+13
-3
server/topic.go
+13
-3
server/topic.go
···
1
1
package server
2
2
3
3
import (
4
+
"fmt"
4
5
"net"
5
6
"sync"
6
7
)
···
9
10
name string
10
11
subscriptions map[net.Addr]*subscriber
11
12
mu sync.Mutex
13
+
messageStore Store
12
14
}
13
15
14
-
func newTopic(name string) *topic {
16
+
func newTopic(name string, messageStore Store) *topic {
15
17
return &topic{
16
18
name: name,
17
19
subscriptions: make(map[net.Addr]*subscriber),
20
+
messageStore: messageStore,
18
21
}
19
22
}
20
23
21
-
func (t *topic) sendMessageToSubscribers(msgData []byte) {
24
+
func (t *topic) sendMessageToSubscribers(msg MessageToSend) error {
25
+
err := t.messageStore.Write(msg)
26
+
if err != nil {
27
+
return fmt.Errorf("failed to write message to store: %w", err)
28
+
}
29
+
22
30
t.mu.Lock()
23
31
subscribers := t.subscriptions
24
32
t.mu.Unlock()
25
33
26
34
for _, subscriber := range subscribers {
27
-
subscriber.addMessage(newMessage(msgData), 0)
35
+
subscriber.addMessage(newMessage(msg.data), 0)
28
36
}
37
+
38
+
return nil
29
39
}