+98
-6
client/subscriber.go
+98
-6
client/subscriber.go
···
6
"encoding/json"
7
"errors"
8
"fmt"
9
"net"
10
"sync"
11
"time"
12
13
"github.com/willdot/messagebroker/internal/server"
···
17
18
// Subscriber allows subscriptions to a server and the consumption of messages
19
type Subscriber struct {
20
-
conn net.Conn
21
-
connMu sync.Mutex
22
}
23
24
// NewSubscriber will connect to the server at the given address
···
30
31
return &Subscriber{
32
conn: conn,
33
}, nil
34
}
35
36
// Close cleanly shuts down the subscriber
37
func (s *Subscriber) Close() error {
38
return s.conn.Close()
···
44
return subscribeToTopics(conn, topicNames, startAtType, startAtIndex)
45
}
46
47
-
return s.connOperation(op)
48
}
49
50
// UnsubscribeToTopics will unsubscribe to the provided topics
···
53
return unsubscribeToTopics(conn, topicNames)
54
}
55
56
-
return s.connOperation(op)
57
}
58
59
func subscribeToTopics(conn net.Conn, topicNames []string, startAtType server.StartAtType, startAtIndex int) error {
···
189
}
190
191
err := s.readMessage(ctx, consumer.msgs)
192
-
if err != nil {
193
-
// TODO: if broken pipe, we need to somehow reconnect and subscribe again....YIKES
194
consumer.Err = err
195
return
196
}
197
}
198
}
199
···
6
"encoding/json"
7
"errors"
8
"fmt"
9
+
"io"
10
+
"log/slog"
11
"net"
12
"sync"
13
+
"syscall"
14
"time"
15
16
"github.com/willdot/messagebroker/internal/server"
···
20
21
// Subscriber allows subscriptions to a server and the consumption of messages
22
type Subscriber struct {
23
+
conn net.Conn
24
+
connMu sync.Mutex
25
+
subscribedTopics []string
26
+
addr string
27
}
28
29
// NewSubscriber will connect to the server at the given address
···
35
36
return &Subscriber{
37
conn: conn,
38
+
addr: addr,
39
}, nil
40
}
41
42
+
func (s *Subscriber) reconnect() error {
43
+
conn, err := net.Dial("tcp", s.addr)
44
+
if err != nil {
45
+
return fmt.Errorf("failed to dial: %w", err)
46
+
}
47
+
48
+
s.conn = conn
49
+
return nil
50
+
}
51
+
52
// Close cleanly shuts down the subscriber
53
func (s *Subscriber) Close() error {
54
return s.conn.Close()
···
60
return subscribeToTopics(conn, topicNames, startAtType, startAtIndex)
61
}
62
63
+
err := s.connOperation(op)
64
+
if err != nil {
65
+
return fmt.Errorf("failed to subscribe to topics: %w", err)
66
+
}
67
+
68
+
s.addToSubscribedTopics(topicNames)
69
+
70
+
return nil
71
+
}
72
+
73
+
func (s *Subscriber) addToSubscribedTopics(topics []string) {
74
+
existingSubs := make(map[string]struct{})
75
+
for _, topic := range s.subscribedTopics {
76
+
existingSubs[topic] = struct{}{}
77
+
}
78
+
79
+
for _, topic := range topics {
80
+
existingSubs[topic] = struct{}{}
81
+
}
82
+
83
+
subs := make([]string, 0, len(existingSubs))
84
+
for topic := range existingSubs {
85
+
subs = append(subs, topic)
86
+
}
87
+
88
+
s.subscribedTopics = subs
89
+
}
90
+
91
+
func (s *Subscriber) removeTopicsFromSubscription(topics []string) {
92
+
existingSubs := make(map[string]struct{})
93
+
for _, topic := range s.subscribedTopics {
94
+
existingSubs[topic] = struct{}{}
95
+
}
96
+
97
+
for _, topic := range topics {
98
+
delete(existingSubs, topic)
99
+
}
100
+
101
+
subs := make([]string, 0, len(existingSubs))
102
+
for topic := range existingSubs {
103
+
subs = append(subs, topic)
104
+
}
105
+
106
+
s.subscribedTopics = subs
107
}
108
109
// UnsubscribeToTopics will unsubscribe to the provided topics
···
112
return unsubscribeToTopics(conn, topicNames)
113
}
114
115
+
err := s.connOperation(op)
116
+
if err != nil {
117
+
return fmt.Errorf("failed to unsubscribe to topics: %w", err)
118
+
}
119
+
120
+
s.removeTopicsFromSubscription(topicNames)
121
+
122
+
return nil
123
}
124
125
func subscribeToTopics(conn net.Conn, topicNames []string, startAtType server.StartAtType, startAtIndex int) error {
···
255
}
256
257
err := s.readMessage(ctx, consumer.msgs)
258
+
if err == nil {
259
+
continue
260
+
}
261
+
262
+
// if we couldn't connect to the server, attempt to reconnect
263
+
if !errors.Is(err, syscall.EPIPE) && !errors.Is(err, io.EOF) {
264
+
slog.Error("failed to read message", "error", err)
265
consumer.Err = err
266
return
267
}
268
+
269
+
slog.Info("attempting to reconnect")
270
+
271
+
for i := 0; i < 5; i++ {
272
+
time.Sleep(time.Millisecond * 500)
273
+
err = s.reconnect()
274
+
if err == nil {
275
+
break
276
+
}
277
+
278
+
slog.Error("Failed to reconnect", "error", err, "attempt", i)
279
+
}
280
+
281
+
slog.Info("attempting to resubscribe")
282
+
283
+
err = s.SubscribeToTopics(s.subscribedTopics, server.Current, 0)
284
+
if err != nil {
285
+
consumer.Err = fmt.Errorf("failed to subscribe to topics after reconnecting: %w", err)
286
+
return
287
+
}
288
+
289
}
290
}
291
+15
-7
example/main.go
+15
-7
example/main.go
···
11
"github.com/willdot/messagebroker/internal/server"
12
)
13
14
-
var publish *bool
15
var consumeFrom *int
16
17
const (
18
topic = "topic-a"
19
)
20
21
func main() {
22
-
publish = flag.Bool("publish", false, "will also publish messages every 500ms until client is stopped")
23
consumeFrom = flag.Int("consume-from", -1, "index of message to start consuming from. If not set it will consume from the most recent")
24
flag.Parse()
25
26
-
if *publish {
27
-
go sendMessages()
28
}
29
30
sub, err := client.NewSubscriber(":3000")
31
if err != nil {
32
panic(err)
···
56
slog.Info("received message", "message", string(msg.Data))
57
msg.Ack(true)
58
}
59
-
60
-
time.Sleep(time.Second * 30)
61
-
62
}
63
64
func sendMessages() {
···
11
"github.com/willdot/messagebroker/internal/server"
12
)
13
14
+
// var publish *bool
15
+
// var consume *bool
16
var consumeFrom *int
17
+
var clientType *string
18
19
const (
20
topic = "topic-a"
21
)
22
23
func main() {
24
+
clientType = flag.String("client-type", "consume", "consume or publish (default consume)")
25
+
// publish = flag.Bool("publish", false, "will publish messages every 500ms until client is stopped")
26
+
// consume = flag.Bool("consume", false, "will consume messages until client is stopped")
27
consumeFrom = flag.Int("consume-from", -1, "index of message to start consuming from. If not set it will consume from the most recent")
28
flag.Parse()
29
30
+
switch *clientType {
31
+
case "consume":
32
+
consume()
33
+
case "publish":
34
+
sendMessages()
35
+
default:
36
+
fmt.Println("unknown client type")
37
}
38
+
}
39
40
+
func consume() {
41
sub, err := client.NewSubscriber(":3000")
42
if err != nil {
43
panic(err)
···
67
slog.Info("received message", "message", string(msg.Data))
68
msg.Ack(true)
69
}
70
}
71
72
func sendMessages() {