An experimental pub/sub client and server project.
at main 5.5 kB view raw
1package client 2 3import ( 4 "context" 5 "fmt" 6 "testing" 7 "time" 8 9 "github.com/stretchr/testify/assert" 10 "github.com/stretchr/testify/require" 11 "github.com/willdot/messagebroker/internal/server" 12) 13 14const ( 15 serverAddr = ":9999" 16 topicA = "topic a" 17 topicB = "topic b" 18) 19 20func createServer(t *testing.T) { 21 server, err := server.New(serverAddr, time.Millisecond*100, time.Millisecond*100) 22 require.NoError(t, err) 23 24 t.Cleanup(func() { 25 _ = server.Shutdown() 26 }) 27} 28 29func TestNewSubscriber(t *testing.T) { 30 createServer(t) 31 32 sub, err := NewSubscriber(serverAddr) 33 require.NoError(t, err) 34 35 t.Cleanup(func() { 36 sub.Close() 37 }) 38} 39 40func TestNewSubscriberInvalidServerAddr(t *testing.T) { 41 createServer(t) 42 43 _, err := NewSubscriber(":123456") 44 require.Error(t, err) 45} 46 47func TestNewPublisher(t *testing.T) { 48 createServer(t) 49 50 sub, err := NewPublisher(serverAddr) 51 require.NoError(t, err) 52 53 t.Cleanup(func() { 54 sub.Close() 55 }) 56} 57 58func TestNewPublisherInvalidServerAddr(t *testing.T) { 59 createServer(t) 60 61 _, err := NewPublisher(":123456") 62 require.Error(t, err) 63} 64 65func TestSubscribeToTopics(t *testing.T) { 66 createServer(t) 67 68 sub, err := NewSubscriber(serverAddr) 69 require.NoError(t, err) 70 71 t.Cleanup(func() { 72 sub.Close() 73 }) 74 75 topics := []string{topicA, topicB} 76 77 err = sub.SubscribeToTopics(topics, server.Current, 0) 78 require.NoError(t, err) 79} 80 81func TestUnsubscribesFromTopic(t *testing.T) { 82 createServer(t) 83 84 sub, err := NewSubscriber(serverAddr) 85 require.NoError(t, err) 86 87 t.Cleanup(func() { 88 sub.Close() 89 }) 90 91 topics := []string{topicA, topicB} 92 93 err = sub.SubscribeToTopics(topics, server.Current, 0) 94 require.NoError(t, err) 95 96 err = sub.UnsubscribeToTopics([]string{topicA}) 97 require.NoError(t, err) 98 99 ctx, cancel := context.WithCancel(context.Background()) 100 t.Cleanup(func() { 101 cancel() 102 }) 103 104 consumer := sub.Consume(ctx) 105 require.NoError(t, err) 106 107 var receivedMessages []*Message 108 consumerFinCh := make(chan struct{}) 109 go func() { 110 for msg := range consumer.Messages() { 111 msg.Ack(true) 112 receivedMessages = append(receivedMessages, msg) 113 } 114 115 consumerFinCh <- struct{}{} 116 }() 117 118 // publish a message to both topics and check the subscriber only gets the message from the 1 topic 119 // and not the unsubscribed topic 120 121 publisher, err := NewPublisher("localhost:9999") 122 require.NoError(t, err) 123 t.Cleanup(func() { 124 publisher.Close() 125 }) 126 127 msg := NewMessage(topicA, []byte("hello world")) 128 129 err = publisher.PublishMessage(msg) 130 require.NoError(t, err) 131 132 msg.Topic = topicB 133 err = publisher.PublishMessage(msg) 134 require.NoError(t, err) 135 136 // give the consumer some time to read the messages -- TODO: make better! 137 time.Sleep(time.Millisecond * 300) 138 cancel() 139 140 select { 141 case <-consumerFinCh: 142 break 143 case <-time.After(time.Second): 144 t.Fatal("timed out waiting for consumer to read messages") 145 } 146 147 assert.Len(t, receivedMessages, 1) 148 assert.Equal(t, topicB, receivedMessages[0].Topic) 149} 150 151func TestPublishAndSubscribe(t *testing.T) { 152 consumer, cancel := setupConsumer(t) 153 154 var receivedMessages []*Message 155 156 consumerFinCh := make(chan struct{}) 157 go func() { 158 for msg := range consumer.Messages() { 159 msg.Ack(true) 160 receivedMessages = append(receivedMessages, msg) 161 } 162 163 consumerFinCh <- struct{}{} 164 }() 165 166 publisher, err := NewPublisher("localhost:9999") 167 require.NoError(t, err) 168 t.Cleanup(func() { 169 publisher.Close() 170 }) 171 172 // send some messages 173 sentMessages := make([]*Message, 0, 10) 174 for i := 0; i < 10; i++ { 175 msg := NewMessage(topicA, []byte(fmt.Sprintf("message %d", i))) 176 177 sentMessages = append(sentMessages, msg) 178 179 err = publisher.PublishMessage(msg) 180 require.NoError(t, err) 181 } 182 183 // give the consumer some time to read the messages -- TODO: make better! 184 time.Sleep(time.Millisecond * 300) 185 cancel() 186 187 select { 188 case <-consumerFinCh: 189 break 190 case <-time.After(time.Second * 5): 191 t.Fatal("timed out waiting for consumer to read messages") 192 } 193 194 // THIS IS SO HACKY 195 for _, msg := range receivedMessages { 196 msg.ack = nil 197 } 198 199 for _, msg := range sentMessages { 200 msg.ack = nil 201 } 202 203 assert.ElementsMatch(t, receivedMessages, sentMessages) 204} 205 206func TestPublishAndSubscribeNackMessage(t *testing.T) { 207 consumer, cancel := setupConsumer(t) 208 209 var receivedMessages []*Message 210 211 consumerFinCh := make(chan struct{}) 212 timesMsgWasReceived := 0 213 go func() { 214 for msg := range consumer.Messages() { 215 msg.Ack(false) 216 timesMsgWasReceived++ 217 } 218 219 consumerFinCh <- struct{}{} 220 }() 221 222 publisher, err := NewPublisher("localhost:9999") 223 require.NoError(t, err) 224 t.Cleanup(func() { 225 publisher.Close() 226 }) 227 228 // send a message 229 msg := NewMessage(topicA, []byte("hello world")) 230 231 err = publisher.PublishMessage(msg) 232 require.NoError(t, err) 233 234 // give the consumer some time to read the messages -- TODO: make better! 235 time.Sleep(time.Second) 236 cancel() 237 238 select { 239 case <-consumerFinCh: 240 break 241 case <-time.After(time.Second * 5): 242 t.Fatal("timed out waiting for consumer to read messages") 243 } 244 245 assert.Empty(t, receivedMessages) 246 assert.Equal(t, 5, timesMsgWasReceived) 247} 248 249func setupConsumer(t *testing.T) (*Consumer, context.CancelFunc) { 250 createServer(t) 251 252 sub, err := NewSubscriber(serverAddr) 253 require.NoError(t, err) 254 255 t.Cleanup(func() { 256 sub.Close() 257 }) 258 259 topics := []string{topicA, topicB} 260 261 err = sub.SubscribeToTopics(topics, server.Current, 0) 262 require.NoError(t, err) 263 264 ctx, cancel := context.WithCancel(context.Background()) 265 t.Cleanup(func() { 266 cancel() 267 }) 268 269 consumer := sub.Consume(ctx) 270 require.NoError(t, err) 271 272 return consumer, cancel 273}