+1
-1
example/main.go
+1
-1
example/main.go
+8
-4
pubsub/subscriber.go
+8
-4
pubsub/subscriber.go
···
132
132
// Consumer allows the consumption of messages. It is thread safe to range over the Msgs channel to consume. If during the consumer
133
133
// receiving messages from the server an error occurs, it will be stored in Err
134
134
type Consumer struct {
135
-
Msgs chan messagebroker.Message
135
+
msgs chan messagebroker.Message
136
136
// TODO: better error handling? Maybe a channel of errors?
137
137
Err error
138
+
}
139
+
140
+
func (c *Consumer) Messages() <-chan messagebroker.Message {
141
+
return c.msgs
138
142
}
139
143
140
144
// Consume will create a consumer and start it running in a go routine. You can then use the Msgs channel of the consumer
141
145
// to read the messages
142
146
func (s *Subscriber) Consume(ctx context.Context) *Consumer {
143
147
consumer := &Consumer{
144
-
Msgs: make(chan messagebroker.Message),
148
+
msgs: make(chan messagebroker.Message),
145
149
}
146
150
147
151
go s.consume(ctx, consumer)
···
150
154
}
151
155
152
156
func (s *Subscriber) consume(ctx context.Context, consumer *Consumer) {
153
-
defer close(consumer.Msgs)
157
+
defer close(consumer.msgs)
154
158
for {
155
159
if ctx.Err() != nil {
156
160
return
···
163
167
}
164
168
165
169
if msg != nil {
166
-
consumer.Msgs <- *msg
170
+
consumer.msgs <- *msg
167
171
}
168
172
}
169
173
}
+2
-2
pubsub/subscriber_test.go
+2
-2
pubsub/subscriber_test.go
···
107
107
var receivedMessages []messagebroker.Message
108
108
consumerFinCh := make(chan struct{})
109
109
go func() {
110
-
for msg := range consumer.Msgs {
110
+
for msg := range consumer.Messages() {
111
111
receivedMessages = append(receivedMessages, msg)
112
112
}
113
113
···
180
180
181
181
consumerFinCh := make(chan struct{})
182
182
go func() {
183
-
for msg := range consumer.Msgs {
183
+
for msg := range consumer.Messages() {
184
184
receivedMessages = append(receivedMessages, msg)
185
185
}
186
186