An experimental pub/sub client and server project.
1package client
2
3import (
4 "context"
5 "fmt"
6 "testing"
7 "time"
8
9 "github.com/stretchr/testify/assert"
10 "github.com/stretchr/testify/require"
11 "github.com/willdot/messagebroker/internal/server"
12)
13
14const (
15 serverAddr = ":9999"
16 topicA = "topic a"
17 topicB = "topic b"
18)
19
20func createServer(t *testing.T) {
21 server, err := server.New(serverAddr, time.Millisecond*100, time.Millisecond*100)
22 require.NoError(t, err)
23
24 t.Cleanup(func() {
25 _ = server.Shutdown()
26 })
27}
28
29func TestNewSubscriber(t *testing.T) {
30 createServer(t)
31
32 sub, err := NewSubscriber(serverAddr)
33 require.NoError(t, err)
34
35 t.Cleanup(func() {
36 sub.Close()
37 })
38}
39
40func TestNewSubscriberInvalidServerAddr(t *testing.T) {
41 createServer(t)
42
43 _, err := NewSubscriber(":123456")
44 require.Error(t, err)
45}
46
47func TestNewPublisher(t *testing.T) {
48 createServer(t)
49
50 sub, err := NewPublisher(serverAddr)
51 require.NoError(t, err)
52
53 t.Cleanup(func() {
54 sub.Close()
55 })
56}
57
58func TestNewPublisherInvalidServerAddr(t *testing.T) {
59 createServer(t)
60
61 _, err := NewPublisher(":123456")
62 require.Error(t, err)
63}
64
65func TestSubscribeToTopics(t *testing.T) {
66 createServer(t)
67
68 sub, err := NewSubscriber(serverAddr)
69 require.NoError(t, err)
70
71 t.Cleanup(func() {
72 sub.Close()
73 })
74
75 topics := []string{topicA, topicB}
76
77 err = sub.SubscribeToTopics(topics, server.Current, 0)
78 require.NoError(t, err)
79}
80
81func TestUnsubscribesFromTopic(t *testing.T) {
82 createServer(t)
83
84 sub, err := NewSubscriber(serverAddr)
85 require.NoError(t, err)
86
87 t.Cleanup(func() {
88 sub.Close()
89 })
90
91 topics := []string{topicA, topicB}
92
93 err = sub.SubscribeToTopics(topics, server.Current, 0)
94 require.NoError(t, err)
95
96 err = sub.UnsubscribeToTopics([]string{topicA})
97 require.NoError(t, err)
98
99 ctx, cancel := context.WithCancel(context.Background())
100 t.Cleanup(func() {
101 cancel()
102 })
103
104 consumer := sub.Consume(ctx)
105 require.NoError(t, err)
106
107 var receivedMessages []*Message
108 consumerFinCh := make(chan struct{})
109 go func() {
110 for msg := range consumer.Messages() {
111 msg.Ack(true)
112 receivedMessages = append(receivedMessages, msg)
113 }
114
115 consumerFinCh <- struct{}{}
116 }()
117
118 // publish a message to both topics and check the subscriber only gets the message from the 1 topic
119 // and not the unsubscribed topic
120
121 publisher, err := NewPublisher("localhost:9999")
122 require.NoError(t, err)
123 t.Cleanup(func() {
124 publisher.Close()
125 })
126
127 msg := NewMessage(topicA, []byte("hello world"))
128
129 err = publisher.PublishMessage(msg)
130 require.NoError(t, err)
131
132 msg.Topic = topicB
133 err = publisher.PublishMessage(msg)
134 require.NoError(t, err)
135
136 // give the consumer some time to read the messages -- TODO: make better!
137 time.Sleep(time.Millisecond * 300)
138 cancel()
139
140 select {
141 case <-consumerFinCh:
142 break
143 case <-time.After(time.Second):
144 t.Fatal("timed out waiting for consumer to read messages")
145 }
146
147 assert.Len(t, receivedMessages, 1)
148 assert.Equal(t, topicB, receivedMessages[0].Topic)
149}
150
151func TestPublishAndSubscribe(t *testing.T) {
152 consumer, cancel := setupConsumer(t)
153
154 var receivedMessages []*Message
155
156 consumerFinCh := make(chan struct{})
157 go func() {
158 for msg := range consumer.Messages() {
159 msg.Ack(true)
160 receivedMessages = append(receivedMessages, msg)
161 }
162
163 consumerFinCh <- struct{}{}
164 }()
165
166 publisher, err := NewPublisher("localhost:9999")
167 require.NoError(t, err)
168 t.Cleanup(func() {
169 publisher.Close()
170 })
171
172 // send some messages
173 sentMessages := make([]*Message, 0, 10)
174 for i := 0; i < 10; i++ {
175 msg := NewMessage(topicA, []byte(fmt.Sprintf("message %d", i)))
176
177 sentMessages = append(sentMessages, msg)
178
179 err = publisher.PublishMessage(msg)
180 require.NoError(t, err)
181 }
182
183 // give the consumer some time to read the messages -- TODO: make better!
184 time.Sleep(time.Millisecond * 300)
185 cancel()
186
187 select {
188 case <-consumerFinCh:
189 break
190 case <-time.After(time.Second * 5):
191 t.Fatal("timed out waiting for consumer to read messages")
192 }
193
194 // THIS IS SO HACKY
195 for _, msg := range receivedMessages {
196 msg.ack = nil
197 }
198
199 for _, msg := range sentMessages {
200 msg.ack = nil
201 }
202
203 assert.ElementsMatch(t, receivedMessages, sentMessages)
204}
205
206func TestPublishAndSubscribeNackMessage(t *testing.T) {
207 consumer, cancel := setupConsumer(t)
208
209 var receivedMessages []*Message
210
211 consumerFinCh := make(chan struct{})
212 timesMsgWasReceived := 0
213 go func() {
214 for msg := range consumer.Messages() {
215 msg.Ack(false)
216 timesMsgWasReceived++
217 }
218
219 consumerFinCh <- struct{}{}
220 }()
221
222 publisher, err := NewPublisher("localhost:9999")
223 require.NoError(t, err)
224 t.Cleanup(func() {
225 publisher.Close()
226 })
227
228 // send a message
229 msg := NewMessage(topicA, []byte("hello world"))
230
231 err = publisher.PublishMessage(msg)
232 require.NoError(t, err)
233
234 // give the consumer some time to read the messages -- TODO: make better!
235 time.Sleep(time.Second)
236 cancel()
237
238 select {
239 case <-consumerFinCh:
240 break
241 case <-time.After(time.Second * 5):
242 t.Fatal("timed out waiting for consumer to read messages")
243 }
244
245 assert.Empty(t, receivedMessages)
246 assert.Equal(t, 5, timesMsgWasReceived)
247}
248
249func setupConsumer(t *testing.T) (*Consumer, context.CancelFunc) {
250 createServer(t)
251
252 sub, err := NewSubscriber(serverAddr)
253 require.NoError(t, err)
254
255 t.Cleanup(func() {
256 sub.Close()
257 })
258
259 topics := []string{topicA, topicB}
260
261 err = sub.SubscribeToTopics(topics, server.Current, 0)
262 require.NoError(t, err)
263
264 ctx, cancel := context.WithCancel(context.Background())
265 t.Cleanup(func() {
266 cancel()
267 })
268
269 consumer := sub.Consume(ctx)
270 require.NoError(t, err)
271
272 return consumer, cancel
273}