+47
-5
client/publisher.go
+47
-5
client/publisher.go
···
2
3
import (
4
"encoding/binary"
5
"fmt"
6
"net"
7
"sync"
8
9
"github.com/willdot/messagebroker/internal/server"
10
)
···
13
type Publisher struct {
14
conn net.Conn
15
connMu sync.Mutex
16
}
17
18
// NewPublisher connects to the server at the given address and registers as a publisher
19
func NewPublisher(addr string) (*Publisher, error) {
20
conn, err := net.Dial("tcp", addr)
21
if err != nil {
22
return nil, fmt.Errorf("failed to dial: %w", err)
···
27
conn.Close()
28
return nil, fmt.Errorf("failed to register publish to server: %w", err)
29
}
30
-
31
-
return &Publisher{
32
-
conn: conn,
33
-
}, nil
34
}
35
36
// Close cleanly shuts down the publisher
···
40
41
// Publish will publish the given message to the server
42
func (p *Publisher) PublishMessage(message *Message) error {
43
op := func(conn net.Conn) error {
44
// send topic first
45
topic := fmt.Sprintf("topic:%s", message.Topic)
···
60
return nil
61
}
62
63
-
return p.connOperation(op)
64
}
65
66
func (p *Publisher) connOperation(op connOpp) error {
···
2
3
import (
4
"encoding/binary"
5
+
"errors"
6
"fmt"
7
+
"log/slog"
8
"net"
9
"sync"
10
+
"syscall"
11
12
"github.com/willdot/messagebroker/internal/server"
13
)
···
16
type Publisher struct {
17
conn net.Conn
18
connMu sync.Mutex
19
+
addr string
20
}
21
22
// NewPublisher connects to the server at the given address and registers as a publisher
23
func NewPublisher(addr string) (*Publisher, error) {
24
+
conn, err := connect(addr)
25
+
if err != nil {
26
+
return nil, fmt.Errorf("failed to connect to server: %w", err)
27
+
}
28
+
29
+
return &Publisher{
30
+
conn: conn,
31
+
addr: addr,
32
+
}, nil
33
+
}
34
+
35
+
func connect(addr string) (net.Conn, error) {
36
conn, err := net.Dial("tcp", addr)
37
if err != nil {
38
return nil, fmt.Errorf("failed to dial: %w", err)
···
43
conn.Close()
44
return nil, fmt.Errorf("failed to register publish to server: %w", err)
45
}
46
+
return conn, nil
47
}
48
49
// Close cleanly shuts down the publisher
···
53
54
// Publish will publish the given message to the server
55
func (p *Publisher) PublishMessage(message *Message) error {
56
+
return p.publishMessageWithRetry(message, 0)
57
+
}
58
+
59
+
func (p *Publisher) publishMessageWithRetry(message *Message, attempt int) error {
60
op := func(conn net.Conn) error {
61
// send topic first
62
topic := fmt.Sprintf("topic:%s", message.Topic)
···
77
return nil
78
}
79
80
+
err := p.connOperation(op)
81
+
if err == nil {
82
+
return nil
83
+
}
84
+
85
+
// we can handle a broken pipe by trying to reconnect, but if it's a different error return it
86
+
if !errors.Is(err, syscall.EPIPE) {
87
+
return err
88
+
}
89
+
90
+
slog.Info("error is broken pipe")
91
+
92
+
if attempt >= 5 {
93
+
return fmt.Errorf("failed to publish message after max attempts to reconnect (%d): %w", attempt, err)
94
+
}
95
+
96
+
slog.Error("failed to publish message", "error", err)
97
+
98
+
conn, connectErr := connect(p.addr)
99
+
if connectErr != nil {
100
+
return fmt.Errorf("failed to reconnect after failing to publish message: %w", connectErr)
101
+
}
102
+
103
+
p.conn = conn
104
+
105
+
return p.publishMessageWithRetry(message, attempt+1)
106
}
107
108
func (p *Publisher) connOperation(op connOpp) error {
+1
client/subscriber.go
+1
client/subscriber.go
+4
example/main.go
+4
example/main.go