+9
-4
pubsub/subscriber.go
+9
-4
pubsub/subscriber.go
···
173
return
174
}
175
176
-
err := s.readMessage(consumer.msgs)
177
if err != nil {
178
consumer.Err = err
179
return
···
181
}
182
}
183
184
-
func (s *Subscriber) readMessage(msgChan chan *Message) error {
185
-
// var msg *Message
186
op := func(conn net.Conn) error {
187
err := s.conn.SetReadDeadline(time.Now().Add(time.Second))
188
if err != nil {
···
225
226
msgChan <- msg
227
228
-
ack := <-msg.ack
229
230
ackMessage := server.Nack
231
if ack {
···
173
return
174
}
175
176
+
err := s.readMessage(ctx, consumer.msgs)
177
if err != nil {
178
consumer.Err = err
179
return
···
181
}
182
}
183
184
+
func (s *Subscriber) readMessage(ctx context.Context, msgChan chan *Message) error {
185
op := func(conn net.Conn) error {
186
err := s.conn.SetReadDeadline(time.Now().Add(time.Second))
187
if err != nil {
···
224
225
msgChan <- msg
226
227
+
var ack bool
228
+
select {
229
+
case <-ctx.Done():
230
+
return ctx.Err()
231
+
case ack = <-msg.ack:
232
+
}
233
+
//ack := <-msg.ack
234
235
ackMessage := server.Nack
236
if ack {
+3
-2
pubsub/subscriber_test.go
+3
-2
pubsub/subscriber_test.go
···
134
err = publisher.PublishMessage(msg)
135
require.NoError(t, err)
136
137
cancel()
138
139
select {
···
180
}
181
182
// give the consumer some time to read the messages -- TODO: make better!
183
-
time.Sleep(time.Millisecond * 500)
184
cancel()
185
186
select {
···
231
require.NoError(t, err)
232
233
// give the consumer some time to read the messages -- TODO: make better!
234
-
time.Sleep(time.Millisecond * 500)
235
cancel()
236
237
select {
···
134
err = publisher.PublishMessage(msg)
135
require.NoError(t, err)
136
137
+
time.Sleep(time.Second)
138
cancel()
139
140
select {
···
181
}
182
183
// give the consumer some time to read the messages -- TODO: make better!
184
+
time.Sleep(time.Second)
185
cancel()
186
187
select {
···
232
require.NoError(t, err)
233
234
// give the consumer some time to read the messages -- TODO: make better!
235
+
time.Sleep(time.Second)
236
cancel()
237
238
select {