+10
-1
example/main.go
+10
-1
example/main.go
···
8
8
"time"
9
9
10
10
"github.com/willdot/messagebroker/pubsub"
11
+
"github.com/willdot/messagebroker/server"
11
12
)
12
13
13
14
var consumeOnly *bool
15
+
var consumeFrom *int
14
16
15
17
func main() {
16
18
consumeOnly = flag.Bool("consume-only", false, "just consumes (doesn't start server and doesn't publish)")
19
+
consumeFrom = flag.Int("consume-from", -1, "index of message to start consuming from. If not set it will consume from the most recent")
17
20
flag.Parse()
18
21
19
22
if !*consumeOnly {
···
28
31
defer func() {
29
32
_ = sub.Close()
30
33
}()
34
+
startAt := 0
35
+
startAtType := server.Current
36
+
if *consumeFrom > -1 {
37
+
startAtType = server.From
38
+
startAt = *consumeFrom
39
+
}
31
40
32
-
err = sub.SubscribeToTopics([]string{"topic a"})
41
+
err = sub.SubscribeToTopics([]string{"topic a"}, startAtType, startAt)
33
42
if err != nil {
34
43
panic(err)
35
44
}
+92
-73
pubsub/subscriber.go
+92
-73
pubsub/subscriber.go
···
39
39
}
40
40
41
41
// SubscribeToTopics will subscribe to the provided topics
42
-
func (s *Subscriber) SubscribeToTopics(topicNames []string) error {
42
+
func (s *Subscriber) SubscribeToTopics(topicNames []string, startAtType server.StartAtType, startAtIndex int) error {
43
43
op := func(conn net.Conn) error {
44
-
actionB := make([]byte, 2)
45
-
binary.BigEndian.PutUint16(actionB, server.Subscribed)
46
-
headers := actionB
44
+
return subscribeToTopics(conn, topicNames, startAtType, startAtIndex)
45
+
}
46
+
47
+
return s.connOperation(op)
48
+
}
47
49
48
-
b, err := json.Marshal(topicNames)
49
-
if err != nil {
50
-
return fmt.Errorf("failed to marshal topic names: %w", err)
51
-
}
50
+
// UnsubscribeToTopics will unsubscribe to the provided topics
51
+
func (s *Subscriber) UnsubscribeToTopics(topicNames []string) error {
52
+
op := func(conn net.Conn) error {
53
+
return unsubscribeToTopics(conn, topicNames)
54
+
}
52
55
53
-
topicNamesB := make([]byte, 4)
54
-
binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
55
-
headers = append(headers, topicNamesB...)
56
+
return s.connOperation(op)
57
+
}
58
+
59
+
func subscribeToTopics(conn net.Conn, topicNames []string, startAtType server.StartAtType, startAtIndex int) error {
60
+
actionB := make([]byte, 2)
61
+
binary.BigEndian.PutUint16(actionB, uint16(server.Subscribe))
62
+
headers := actionB
63
+
64
+
b, err := json.Marshal(topicNames)
65
+
if err != nil {
66
+
return fmt.Errorf("failed to marshal topic names: %w", err)
67
+
}
68
+
69
+
topicNamesB := make([]byte, 4)
70
+
binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
71
+
headers = append(headers, topicNamesB...)
72
+
headers = append(headers, b...)
73
+
74
+
startAtTypeB := make([]byte, 2)
75
+
binary.BigEndian.PutUint16(startAtTypeB, uint16(startAtType))
76
+
headers = append(headers, startAtTypeB...)
56
77
57
-
_, err = conn.Write(append(headers, b...))
58
-
if err != nil {
59
-
return fmt.Errorf("failed to subscribe to topics: %w", err)
60
-
}
78
+
if startAtType == server.From {
79
+
fromB := make([]byte, 2)
80
+
binary.BigEndian.PutUint16(fromB, uint16(startAtIndex))
81
+
headers = append(headers, fromB...)
82
+
}
61
83
62
-
var resp server.Status
63
-
err = binary.Read(conn, binary.BigEndian, &resp)
64
-
if err != nil {
65
-
return fmt.Errorf("failed to read confirmation of subscription: %w", err)
66
-
}
84
+
_, err = conn.Write(headers)
85
+
if err != nil {
86
+
return fmt.Errorf("failed to subscribe to topics: %w", err)
87
+
}
67
88
68
-
if resp == server.Subscribed {
69
-
return nil
70
-
}
89
+
var resp server.Status
90
+
err = binary.Read(conn, binary.BigEndian, &resp)
91
+
if err != nil {
92
+
return fmt.Errorf("failed to read confirmation of subscribe: %w", err)
93
+
}
71
94
72
-
var dataLen uint32
73
-
err = binary.Read(conn, binary.BigEndian, &dataLen)
74
-
if err != nil {
75
-
return fmt.Errorf("received status %s:", resp)
76
-
}
95
+
if resp == server.Subscribed {
96
+
return nil
97
+
}
77
98
78
-
buf := make([]byte, dataLen)
79
-
_, err = conn.Read(buf)
80
-
if err != nil {
81
-
return fmt.Errorf("received status %s:", resp)
82
-
}
99
+
var dataLen uint32
100
+
err = binary.Read(conn, binary.BigEndian, &dataLen)
101
+
if err != nil {
102
+
return fmt.Errorf("received status %s:", resp)
103
+
}
83
104
84
-
return fmt.Errorf("received status %s - %s", resp, buf)
105
+
buf := make([]byte, dataLen)
106
+
_, err = conn.Read(buf)
107
+
if err != nil {
108
+
return fmt.Errorf("received status %s:", resp)
85
109
}
86
110
87
-
return s.connOperation(op)
111
+
return fmt.Errorf("received status %s - %s", resp, buf)
88
112
}
89
113
90
-
// UnsubscribeToTopics will unsubscribe to the provided topics
91
-
func (s *Subscriber) UnsubscribeToTopics(topicNames []string) error {
92
-
op := func(conn net.Conn) error {
93
-
actionB := make([]byte, 2)
94
-
binary.BigEndian.PutUint16(actionB, uint16(server.Unsubscribe))
95
-
headers := actionB
114
+
func unsubscribeToTopics(conn net.Conn, topicNames []string) error {
115
+
actionB := make([]byte, 2)
116
+
binary.BigEndian.PutUint16(actionB, uint16(server.Unsubscribe))
117
+
headers := actionB
96
118
97
-
b, err := json.Marshal(topicNames)
98
-
if err != nil {
99
-
return fmt.Errorf("failed to marshal topic names: %w", err)
100
-
}
101
-
102
-
topicNamesB := make([]byte, 4)
103
-
binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
104
-
headers = append(headers, topicNamesB...)
119
+
b, err := json.Marshal(topicNames)
120
+
if err != nil {
121
+
return fmt.Errorf("failed to marshal topic names: %w", err)
122
+
}
105
123
106
-
_, err = conn.Write(append(headers, b...))
107
-
if err != nil {
108
-
return fmt.Errorf("failed to unsubscribe to topics: %w", err)
109
-
}
124
+
topicNamesB := make([]byte, 4)
125
+
binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
126
+
headers = append(headers, topicNamesB...)
110
127
111
-
var resp server.Status
112
-
err = binary.Read(conn, binary.BigEndian, &resp)
113
-
if err != nil {
114
-
return fmt.Errorf("failed to read confirmation of unsubscription: %w", err)
115
-
}
128
+
_, err = conn.Write(append(headers, b...))
129
+
if err != nil {
130
+
return fmt.Errorf("failed to unsubscribe to topics: %w", err)
131
+
}
116
132
117
-
if resp == server.Unsubscribed {
118
-
return nil
119
-
}
133
+
var resp server.Status
134
+
err = binary.Read(conn, binary.BigEndian, &resp)
135
+
if err != nil {
136
+
return fmt.Errorf("failed to read confirmation of unsubscribe: %w", err)
137
+
}
120
138
121
-
var dataLen uint32
122
-
err = binary.Read(conn, binary.BigEndian, &dataLen)
123
-
if err != nil {
124
-
return fmt.Errorf("received status %s:", resp)
125
-
}
139
+
if resp == server.Unsubscribed {
140
+
return nil
141
+
}
126
142
127
-
buf := make([]byte, dataLen)
128
-
_, err = conn.Read(buf)
129
-
if err != nil {
130
-
return fmt.Errorf("received status %s:", resp)
131
-
}
143
+
var dataLen uint32
144
+
err = binary.Read(conn, binary.BigEndian, &dataLen)
145
+
if err != nil {
146
+
return fmt.Errorf("received status %s:", resp)
147
+
}
132
148
133
-
return fmt.Errorf("received status %s - %s", resp, buf)
149
+
buf := make([]byte, dataLen)
150
+
_, err = conn.Read(buf)
151
+
if err != nil {
152
+
return fmt.Errorf("received status %s:", resp)
134
153
}
135
154
136
-
return s.connOperation(op)
155
+
return fmt.Errorf("received status %s - %s", resp, buf)
137
156
}
138
157
139
158
// Consumer allows the consumption of messages. If during the consumer receiving messages from the
+3
-3
pubsub/subscriber_test.go
+3
-3
pubsub/subscriber_test.go
···
75
75
76
76
topics := []string{topicA, topicB}
77
77
78
-
err = sub.SubscribeToTopics(topics)
78
+
err = sub.SubscribeToTopics(topics, server.Current, 0)
79
79
require.NoError(t, err)
80
80
}
81
81
···
91
91
92
92
topics := []string{topicA, topicB}
93
93
94
-
err = sub.SubscribeToTopics(topics)
94
+
err = sub.SubscribeToTopics(topics, server.Current, 0)
95
95
require.NoError(t, err)
96
96
97
97
err = sub.UnsubscribeToTopics([]string{topicA})
···
258
258
259
259
topics := []string{topicA, topicB}
260
260
261
-
err = sub.SubscribeToTopics(topics)
261
+
err = sub.SubscribeToTopics(topics, server.Current, 0)
262
262
require.NoError(t, err)
263
263
264
264
ctx, cancel := context.WithCancel(context.Background())
+45
server/message_store.go
+45
server/message_store.go
···
1
+
package server
2
+
3
+
import (
4
+
"sync"
5
+
)
6
+
7
+
// Memory store allows messages to be stored in memory
8
+
type MemoryStore struct {
9
+
mu sync.Mutex
10
+
msgs map[int]message
11
+
nextOffset int
12
+
}
13
+
14
+
// New memory store initializes a new in memory store
15
+
func NewMemoryStore() *MemoryStore {
16
+
return &MemoryStore{
17
+
msgs: make(map[int]message),
18
+
}
19
+
}
20
+
21
+
// Write will write the provided message to the in memory store
22
+
func (m *MemoryStore) Write(msg message) error {
23
+
m.mu.Lock()
24
+
defer m.mu.Unlock()
25
+
26
+
m.msgs[m.nextOffset] = msg
27
+
28
+
m.nextOffset++
29
+
30
+
return nil
31
+
}
32
+
33
+
// ReadFrom will read messages from (and including) the provided offset and pass them to the provided handler
34
+
func (m *MemoryStore) ReadFrom(offset int, handleFunc func(msg message)) {
35
+
if offset < 0 || offset >= m.nextOffset {
36
+
return
37
+
}
38
+
39
+
m.mu.Lock()
40
+
defer m.mu.Unlock()
41
+
42
+
for i := offset; i < len(m.msgs); i++ {
43
+
handleFunc(m.msgs[i])
44
+
}
45
+
}
+59
-31
server/server.go
+59
-31
server/server.go
···
31
31
type Status uint16
32
32
33
33
const (
34
-
Subscribed = 1
35
-
Unsubscribed = 2
36
-
Error = 3
34
+
Subscribed Status = 1
35
+
Unsubscribed Status = 2
36
+
Error Status = 3
37
37
)
38
38
39
39
func (s Status) String() string {
···
48
48
49
49
return ""
50
50
}
51
+
52
+
// StartAtType represents where the subcriber wishes to start subscribing to a topic from
53
+
type StartAtType uint16
54
+
55
+
const (
56
+
Beginning StartAtType = 0
57
+
Current StartAtType = 1
58
+
From StartAtType = 2
59
+
)
51
60
52
61
// Server accepts subscribe and publish connections and passes messages around
53
62
type Server struct {
···
198
207
return nil
199
208
}
200
209
201
-
s.subscribeToTopics(peer, topics)
210
+
var startAtType StartAtType
211
+
err = binary.Read(conn, binary.BigEndian, &startAtType)
212
+
if err != nil {
213
+
slog.Error(err.Error(), "peer", peer.Addr())
214
+
writeStatus(Error, "invalid start at type provided", conn)
215
+
return nil
216
+
}
217
+
var startAt int
218
+
switch startAtType {
219
+
case From:
220
+
var s uint16
221
+
err = binary.Read(conn, binary.BigEndian, &s)
222
+
if err != nil {
223
+
slog.Error(err.Error(), "peer", peer.Addr())
224
+
writeStatus(Error, "invalid start at value provided", conn)
225
+
return nil
226
+
}
227
+
startAt = int(s)
228
+
case Beginning:
229
+
startAt = 0
230
+
case Current:
231
+
startAt = -1
232
+
default:
233
+
slog.Error("invalid start up type provided", "start up type", startAtType)
234
+
writeStatus(Error, "invalid start up type provided", conn)
235
+
return nil
236
+
}
237
+
238
+
s.subscribeToTopics(peer, topics, startAt)
202
239
writeStatus(Subscribed, "", conn)
203
240
204
241
return nil
···
247
284
_ = peer.RunConnOperation(op)
248
285
}
249
286
250
-
type messageToSend struct {
251
-
topic string
252
-
data []byte
253
-
}
254
-
255
287
func (s *Server) handlePublish(peer *peer.Peer) {
256
288
slog.Info("handling publisher", "peer", peer.Addr())
257
289
for {
258
-
var message *messageToSend
259
-
260
290
op := func(conn net.Conn) error {
261
291
dataLen, err := dataLength(conn)
262
292
if err != nil {
···
304
334
return nil
305
335
}
306
336
307
-
message = &messageToSend{
308
-
topic: topicStr,
309
-
data: dataBuf,
337
+
topic := s.getTopic(topicStr)
338
+
if topic == nil {
339
+
topic = newTopic(topicStr)
340
+
s.topics[topicStr] = topic
341
+
}
342
+
343
+
message := newMessage(dataBuf)
344
+
345
+
err = topic.sendMessageToSubscribers(message)
346
+
if err != nil {
347
+
slog.Error("failed to send message to subscribers", "error", err, "peer", peer.Addr())
348
+
writeStatus(Error, "failed to send message to subscribers", conn)
349
+
return nil
310
350
}
351
+
311
352
return nil
312
353
}
313
354
314
355
_ = peer.RunConnOperation(op)
315
-
316
-
if message == nil {
317
-
continue
318
-
}
319
-
320
-
// sending messages to the subscribers can be done async because the publisher doesn't need to wait for
321
-
// subscribers to be sent the message
322
-
go func() {
323
-
topic := s.getTopic(message.topic)
324
-
if topic != nil {
325
-
topic.sendMessageToSubscribers(message.data)
326
-
}
327
-
}()
328
356
}
329
357
}
330
358
331
-
func (s *Server) subscribeToTopics(peer *peer.Peer, topics []string) {
359
+
func (s *Server) subscribeToTopics(peer *peer.Peer, topics []string, startAt int) {
332
360
slog.Info("subscribing peer to topics", "topics", topics, "peer", peer.Addr())
333
361
for _, topic := range topics {
334
-
s.addSubsciberToTopic(topic, peer)
362
+
s.addSubsciberToTopic(topic, peer, startAt)
335
363
}
336
364
}
337
365
338
-
func (s *Server) addSubsciberToTopic(topicName string, peer *peer.Peer) {
366
+
func (s *Server) addSubsciberToTopic(topicName string, peer *peer.Peer, startAt int) {
339
367
s.mu.Lock()
340
368
defer s.mu.Unlock()
341
369
···
344
372
t = newTopic(topicName)
345
373
}
346
374
347
-
t.subscriptions[peer.Addr()] = newSubscriber(peer, topicName, s.ackDelay, s.ackTimeout)
375
+
t.subscriptions[peer.Addr()] = newSubscriber(peer, t, s.ackDelay, s.ackTimeout, startAt)
348
376
349
377
s.topics[topicName] = t
350
378
}
+163
-62
server/server_test.go
+163
-62
server/server_test.go
···
39
39
srv.topics[topicName] = &topic{
40
40
name: topicName,
41
41
subscriptions: make(map[net.Addr]*subscriber),
42
+
messageStore: NewMemoryStore(),
42
43
}
43
44
44
45
return srv
45
46
}
46
47
47
-
func createConnectionAndSubscribe(t *testing.T, topics []string) net.Conn {
48
+
func createConnectionAndSubscribe(t *testing.T, topics []string, startAtType StartAtType, startAtIndex int) net.Conn {
48
49
conn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
49
50
require.NoError(t, err)
50
51
51
-
subscribeOrUnsubscribetoTopics(t, conn, topics, Subscribe)
52
+
subscribeToTopics(t, conn, topics, startAtType, startAtIndex)
52
53
53
54
expectedRes := Subscribed
54
55
···
56
57
err = binary.Read(conn, binary.BigEndian, &resp)
57
58
require.NoError(t, err)
58
59
59
-
assert.Equal(t, expectedRes, int(resp))
60
+
assert.Equal(t, expectedRes, resp)
60
61
61
62
return conn
62
63
}
···
76
77
require.NoError(t, err)
77
78
}
78
79
79
-
func subscribeOrUnsubscribetoTopics(t *testing.T, conn net.Conn, topics []string, action Action) {
80
+
func subscribeToTopics(t *testing.T, conn net.Conn, topics []string, startAtType StartAtType, startAtIndex int) {
81
+
actionB := make([]byte, 2)
82
+
binary.BigEndian.PutUint16(actionB, uint16(Subscribe))
83
+
headers := actionB
84
+
85
+
b, err := json.Marshal(topics)
86
+
require.NoError(t, err)
87
+
88
+
topicNamesB := make([]byte, 4)
89
+
binary.BigEndian.PutUint32(topicNamesB, uint32(len(b)))
90
+
headers = append(headers, topicNamesB...)
91
+
headers = append(headers, b...)
92
+
93
+
startAtTypeB := make([]byte, 2)
94
+
binary.BigEndian.PutUint16(startAtTypeB, uint16(startAtType))
95
+
headers = append(headers, startAtTypeB...)
96
+
97
+
if startAtType == From {
98
+
fromB := make([]byte, 2)
99
+
binary.BigEndian.PutUint16(fromB, uint16(startAtIndex))
100
+
headers = append(headers, fromB...)
101
+
}
102
+
103
+
_, err = conn.Write(headers)
104
+
require.NoError(t, err)
105
+
}
106
+
107
+
func unsubscribetoTopics(t *testing.T, conn net.Conn, topics []string) {
80
108
actionB := make([]byte, 2)
81
-
binary.BigEndian.PutUint16(actionB, uint16(action))
109
+
binary.BigEndian.PutUint16(actionB, uint16(Unsubscribe))
82
110
headers := actionB
83
111
84
112
b, err := json.Marshal(topics)
···
97
125
// existing topic
98
126
srv := createServerWithExistingTopic(t, topicA)
99
127
100
-
_ = createConnectionAndSubscribe(t, []string{topicA, topicB})
128
+
_ = createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
101
129
102
130
assert.Len(t, srv.topics, 2)
103
131
assert.Len(t, srv.topics[topicA].subscriptions, 1)
···
107
135
func TestUnsubscribesFromTopic(t *testing.T) {
108
136
srv := createServerWithExistingTopic(t, topicA)
109
137
110
-
conn := createConnectionAndSubscribe(t, []string{topicA, topicB, topicC})
138
+
conn := createConnectionAndSubscribe(t, []string{topicA, topicB, topicC}, Current, 0)
111
139
112
140
assert.Len(t, srv.topics, 3)
113
141
assert.Len(t, srv.topics[topicA].subscriptions, 1)
···
116
144
117
145
topics := []string{topicA, topicB}
118
146
119
-
subscribeOrUnsubscribetoTopics(t, conn, topics, Unsubscribe)
147
+
unsubscribetoTopics(t, conn, topics)
120
148
121
149
expectedRes := Unsubscribed
122
150
···
124
152
err := binary.Read(conn, binary.BigEndian, &resp)
125
153
require.NoError(t, err)
126
154
127
-
assert.Equal(t, expectedRes, int(resp))
155
+
assert.Equal(t, expectedRes, resp)
128
156
129
157
assert.Len(t, srv.topics, 3)
130
158
assert.Len(t, srv.topics[topicA].subscriptions, 0)
···
135
163
func TestSubscriberClosesWithoutUnsubscribing(t *testing.T) {
136
164
srv := createServer(t)
137
165
138
-
conn := createConnectionAndSubscribe(t, []string{topicA, topicB})
166
+
conn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
139
167
140
168
assert.Len(t, srv.topics, 2)
141
169
assert.Len(t, srv.topics[topicA].subscriptions, 1)
···
155
183
156
184
sendMessage(t, publisherConn, topicA, data)
157
185
186
+
// the timeout for a connection is 100 milliseconds, so we should wait at least this long before checking the unsubscribe
187
+
// TODO: see if theres a better way, but without this, the test is flakey
188
+
time.Sleep(time.Millisecond * 100)
189
+
158
190
assert.Len(t, srv.topics, 2)
159
191
assert.Len(t, srv.topics[topicA].subscriptions, 0)
160
192
assert.Len(t, srv.topics[topicB].subscriptions, 0)
···
175
207
err = binary.Read(conn, binary.BigEndian, &resp)
176
208
require.NoError(t, err)
177
209
178
-
assert.Equal(t, expectedRes, int(resp))
210
+
assert.Equal(t, expectedRes, resp)
179
211
180
212
expectedMessage := "unknown action"
181
213
···
213
245
err = binary.Read(publisherConn, binary.BigEndian, &resp)
214
246
require.NoError(t, err)
215
247
216
-
assert.Equal(t, expectedRes, int(resp))
248
+
assert.Equal(t, expectedRes, resp)
217
249
218
250
expectedMessage := "topic data does not contain 'topic:' prefix"
219
251
···
234
266
235
267
subscribers := make([]net.Conn, 0, 10)
236
268
for i := 0; i < 10; i++ {
237
-
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
269
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
238
270
239
271
subscribers = append(subscribers, subscriberConn)
240
272
}
···
252
284
253
285
// check the subsribers got the data
254
286
for _, conn := range subscribers {
255
-
var topicLen uint64
256
-
err = binary.Read(conn, binary.BigEndian, &topicLen)
257
-
require.NoError(t, err)
258
-
259
-
topicBuf := make([]byte, topicLen)
260
-
_, err = conn.Read(topicBuf)
261
-
require.NoError(t, err)
262
-
assert.Equal(t, topicA, string(topicBuf))
263
-
264
-
var dataLen uint64
265
-
err = binary.Read(conn, binary.BigEndian, &dataLen)
266
-
require.NoError(t, err)
267
-
268
-
buf := make([]byte, dataLen)
269
-
n, err := conn.Read(buf)
270
-
require.NoError(t, err)
271
-
272
-
require.Equal(t, int(dataLen), n)
273
-
274
-
assert.Equal(t, messageData, string(buf))
275
-
276
-
err = binary.Write(conn, binary.BigEndian, Ack)
277
-
require.NoError(t, err)
287
+
msg := readMessage(t, conn)
288
+
assert.Equal(t, messageData, string(msg))
278
289
}
279
290
}
280
291
···
294
305
295
306
subscribeFinCh := make(chan struct{})
296
307
// create a subscriber that will read messages
297
-
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
308
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
298
309
go func() {
299
310
// check subscriber got all messages
300
311
results := make([]string, 0, len(messages))
301
312
for i := 0; i < len(messages); i++ {
302
-
var topicLen uint64
303
-
err = binary.Read(subscriberConn, binary.BigEndian, &topicLen)
304
-
require.NoError(t, err)
305
-
306
-
topicBuf := make([]byte, topicLen)
307
-
_, err = subscriberConn.Read(topicBuf)
308
-
require.NoError(t, err)
309
-
assert.Equal(t, topicA, string(topicBuf))
310
-
311
-
var dataLen uint64
312
-
err = binary.Read(subscriberConn, binary.BigEndian, &dataLen)
313
-
require.NoError(t, err)
314
-
315
-
buf := make([]byte, dataLen)
316
-
n, err := subscriberConn.Read(buf)
317
-
require.NoError(t, err)
318
-
require.Equal(t, int(dataLen), n)
319
-
320
-
results = append(results, string(buf))
321
-
322
-
err = binary.Write(subscriberConn, binary.BigEndian, Ack)
323
-
require.NoError(t, err)
313
+
msg := readMessage(t, subscriberConn)
314
+
results = append(results, string(msg))
324
315
}
325
316
326
317
assert.ElementsMatch(t, results, messages)
···
346
337
func TestSendsDataToTopicSubscriberNacksThenAcks(t *testing.T) {
347
338
_ = createServer(t)
348
339
349
-
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
340
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
350
341
351
342
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
352
343
require.NoError(t, err)
···
400
391
func TestSendsDataToTopicSubscriberDoesntAckMessage(t *testing.T) {
401
392
_ = createServer(t)
402
393
403
-
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
394
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
404
395
405
396
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
406
397
require.NoError(t, err)
···
458
449
func TestSendsDataToTopicSubscriberDeliveryCountTooHighWithNoAck(t *testing.T) {
459
450
_ = createServer(t)
460
451
461
-
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB})
452
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA, topicB}, Current, 0)
462
453
463
454
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
464
455
require.NoError(t, err)
···
514
505
err = binary.Read(subscriberConn, binary.BigEndian, &topicLen)
515
506
require.Error(t, err)
516
507
}
508
+
509
+
func TestSubscribeAndReplaysFromStart(t *testing.T) {
510
+
_ = createServer(t)
511
+
512
+
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
513
+
require.NoError(t, err)
514
+
515
+
err = binary.Write(publisherConn, binary.BigEndian, Publish)
516
+
require.NoError(t, err)
517
+
518
+
messages := make([]string, 0, 10)
519
+
for i := 0; i < 10; i++ {
520
+
messages = append(messages, fmt.Sprintf("message %d", i))
521
+
}
522
+
523
+
topic := fmt.Sprintf("topic:%s", topicA)
524
+
525
+
for _, msg := range messages {
526
+
sendMessage(t, publisherConn, topic, []byte(msg))
527
+
}
528
+
529
+
// send some messages for topic B as well
530
+
sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 1"))
531
+
sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 2"))
532
+
sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 3"))
533
+
534
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA}, From, 0)
535
+
results := make([]string, 0, len(messages))
536
+
for i := 0; i < len(messages); i++ {
537
+
msg := readMessage(t, subscriberConn)
538
+
results = append(results, string(msg))
539
+
}
540
+
assert.ElementsMatch(t, results, messages)
541
+
}
542
+
543
+
func TestSubscribeAndReplaysFromIndex(t *testing.T) {
544
+
_ = createServer(t)
545
+
546
+
publisherConn, err := net.Dial("tcp", fmt.Sprintf("localhost%s", serverAddr))
547
+
require.NoError(t, err)
548
+
549
+
err = binary.Write(publisherConn, binary.BigEndian, Publish)
550
+
require.NoError(t, err)
551
+
552
+
messages := make([]string, 0, 10)
553
+
for i := 0; i < 10; i++ {
554
+
messages = append(messages, fmt.Sprintf("message %d", i))
555
+
}
556
+
557
+
topic := fmt.Sprintf("topic:%s", topicA)
558
+
559
+
// send multiple messages
560
+
for _, msg := range messages {
561
+
sendMessage(t, publisherConn, topic, []byte(msg))
562
+
}
563
+
564
+
// send some messages for topic B as well
565
+
sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 1"))
566
+
sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 2"))
567
+
sendMessage(t, publisherConn, fmt.Sprintf("topic:%s", topicB), []byte("topic b message 3"))
568
+
569
+
subscriberConn := createConnectionAndSubscribe(t, []string{topicA}, From, 3)
570
+
571
+
// now that the subscriber has subecribed send another message that should arrive after all the other messages were consumed
572
+
sendMessage(t, publisherConn, topic, []byte("hello there"))
573
+
574
+
results := make([]string, 0, len(messages))
575
+
for i := 0; i < len(messages)-3; i++ {
576
+
msg := readMessage(t, subscriberConn)
577
+
results = append(results, string(msg))
578
+
}
579
+
require.Len(t, results, 7)
580
+
expMessages := make([]string, 0, 7)
581
+
for i, msg := range messages {
582
+
if i < 3 {
583
+
continue
584
+
}
585
+
expMessages = append(expMessages, msg)
586
+
}
587
+
assert.Equal(t, expMessages, results)
588
+
589
+
// now check we can get the message that was sent after the subscription was created
590
+
msg := readMessage(t, subscriberConn)
591
+
assert.Equal(t, "hello there", string(msg))
592
+
}
593
+
594
+
func readMessage(t *testing.T, subscriberConn net.Conn) []byte {
595
+
var topicLen uint64
596
+
err := binary.Read(subscriberConn, binary.BigEndian, &topicLen)
597
+
require.NoError(t, err)
598
+
599
+
topicBuf := make([]byte, topicLen)
600
+
_, err = subscriberConn.Read(topicBuf)
601
+
require.NoError(t, err)
602
+
assert.Equal(t, topicA, string(topicBuf))
603
+
604
+
var dataLen uint64
605
+
err = binary.Read(subscriberConn, binary.BigEndian, &dataLen)
606
+
require.NoError(t, err)
607
+
608
+
buf := make([]byte, dataLen)
609
+
n, err := subscriberConn.Read(buf)
610
+
require.NoError(t, err)
611
+
require.Equal(t, int(dataLen), n)
612
+
613
+
err = binary.Write(subscriberConn, binary.BigEndian, Ack)
614
+
require.NoError(t, err)
615
+
616
+
return buf
617
+
}
+14
-3
server/subscriber.go
+14
-3
server/subscriber.go
···
29
29
return message{data: data, deliveryCount: 1}
30
30
}
31
31
32
-
func newSubscriber(peer *peer.Peer, topic string, ackDelay, ackTimeout time.Duration) *subscriber {
32
+
func newSubscriber(peer *peer.Peer, topic *topic, ackDelay, ackTimeout time.Duration, startAt int) *subscriber {
33
33
s := &subscriber{
34
34
peer: peer,
35
-
topic: topic,
35
+
topic: topic.name,
36
36
messages: make(chan message),
37
37
ackDelay: ackDelay,
38
38
ackTimeout: ackTimeout,
39
-
unsubscribeCh: make(chan struct{}),
39
+
unsubscribeCh: make(chan struct{}, 1),
40
40
}
41
41
42
42
go s.sendMessages()
43
+
44
+
go func() {
45
+
topic.messageStore.ReadFrom(startAt, func(msg message) {
46
+
select {
47
+
case s.messages <- msg:
48
+
return
49
+
case <-s.unsubscribeCh:
50
+
return
51
+
}
52
+
})
53
+
}()
43
54
44
55
return s
45
56
}
+18
-2
server/topic.go
+18
-2
server/topic.go
···
1
1
package server
2
2
3
3
import (
4
+
"fmt"
4
5
"net"
5
6
"sync"
6
7
)
7
8
9
+
type Store interface {
10
+
Write(msg message) error
11
+
ReadFrom(offset int, handleFunc func(msg message))
12
+
}
13
+
8
14
type topic struct {
9
15
name string
10
16
subscriptions map[net.Addr]*subscriber
11
17
mu sync.Mutex
18
+
messageStore Store
12
19
}
13
20
14
21
func newTopic(name string) *topic {
22
+
messageStore := NewMemoryStore()
15
23
return &topic{
16
24
name: name,
17
25
subscriptions: make(map[net.Addr]*subscriber),
26
+
messageStore: messageStore,
18
27
}
19
28
}
20
29
21
-
func (t *topic) sendMessageToSubscribers(msgData []byte) {
30
+
func (t *topic) sendMessageToSubscribers(msg message) error {
31
+
err := t.messageStore.Write(msg)
32
+
if err != nil {
33
+
return fmt.Errorf("failed to write message to store: %w", err)
34
+
}
35
+
22
36
t.mu.Lock()
23
37
subscribers := t.subscriptions
24
38
t.mu.Unlock()
25
39
26
40
for _, subscriber := range subscribers {
27
-
subscriber.addMessage(newMessage(msgData), 0)
41
+
subscriber.addMessage(msg, 0)
28
42
}
43
+
44
+
return nil
29
45
}