+1
-1
example/main.go
+1
-1
example/main.go
+3
-11
server/server.go
+3
-11
server/server.go
···
75
From StartAtType = 2
76
)
77
78
-
type Store interface {
79
-
Write(msg MessageToSend) error
80
-
ReadFrom(offset int, handleFunc func(msg MessageToSend)) error
81
-
}
82
-
83
// Server accepts subscribe and publish connections and passes messages around
84
type Server struct {
85
Addr string
···
222
}
223
224
var topics []string
225
-
fmt.Println(string(buf))
226
err = json.Unmarshal(buf, &topics)
227
if err != nil {
228
slog.Error("failed to unmarshal subscibers topic data", "error", err, "peer", peer.Addr())
···
316
func (s *Server) handlePublish(peer *peer.Peer) {
317
slog.Info("handling publisher", "peer", peer.Addr())
318
for {
319
-
var message *MessageToSend
320
-
321
op := func(conn net.Conn) error {
322
dataLen, err := dataLength(conn)
323
if err != nil {
···
365
return nil
366
}
367
368
-
message = &MessageToSend{
369
topic: topicStr,
370
data: dataBuf,
371
}
···
376
s.topics[message.topic] = topic
377
}
378
379
-
err = topic.sendMessageToSubscribers(*message)
380
if err != nil {
381
slog.Error("failed to send message to subscribers", "error", err, "peer", peer.Addr())
382
writeStatus(Error, "failed to send message to subscribers", conn)
···
406
t = newTopic(topicName)
407
}
408
409
-
t.subscriptions[peer.Addr()] = newSubscriber(peer, topicName, s.ackDelay, s.ackTimeout, t.messageStore, startAt)
410
411
s.topics[topicName] = t
412
}
···
75
From StartAtType = 2
76
)
77
78
// Server accepts subscribe and publish connections and passes messages around
79
type Server struct {
80
Addr string
···
217
}
218
219
var topics []string
220
err = json.Unmarshal(buf, &topics)
221
if err != nil {
222
slog.Error("failed to unmarshal subscibers topic data", "error", err, "peer", peer.Addr())
···
310
func (s *Server) handlePublish(peer *peer.Peer) {
311
slog.Info("handling publisher", "peer", peer.Addr())
312
for {
313
op := func(conn net.Conn) error {
314
dataLen, err := dataLength(conn)
315
if err != nil {
···
357
return nil
358
}
359
360
+
message := MessageToSend{
361
topic: topicStr,
362
data: dataBuf,
363
}
···
368
s.topics[message.topic] = topic
369
}
370
371
+
err = topic.sendMessageToSubscribers(message)
372
if err != nil {
373
slog.Error("failed to send message to subscribers", "error", err, "peer", peer.Addr())
374
writeStatus(Error, "failed to send message to subscribers", conn)
···
398
t = newTopic(topicName)
399
}
400
401
+
t.subscriptions[peer.Addr()] = newSubscriber(peer, t, s.ackDelay, s.ackTimeout, startAt)
402
403
s.topics[topicName] = t
404
}
+3
-3
server/subscriber.go
+3
-3
server/subscriber.go
···
29
return message{data: data, deliveryCount: 1}
30
}
31
32
-
func newSubscriber(peer *peer.Peer, topic string, ackDelay, ackTimeout time.Duration, messageStore Store, startAt int) *subscriber {
33
s := &subscriber{
34
peer: peer,
35
-
topic: topic,
36
messages: make(chan message),
37
ackDelay: ackDelay,
38
ackTimeout: ackTimeout,
···
44
offset := startAt
45
46
go func() {
47
-
err := messageStore.ReadFrom(offset, func(msg MessageToSend) {
48
s.messages <- newMessage(msg.data)
49
})
50
if err != nil {
···
29
return message{data: data, deliveryCount: 1}
30
}
31
32
+
func newSubscriber(peer *peer.Peer, topic *topic, ackDelay, ackTimeout time.Duration, startAt int) *subscriber {
33
s := &subscriber{
34
peer: peer,
35
+
topic: topic.name,
36
messages: make(chan message),
37
ackDelay: ackDelay,
38
ackTimeout: ackTimeout,
···
44
offset := startAt
45
46
go func() {
47
+
err := topic.messageStore.ReadFrom(offset, func(msg MessageToSend) {
48
s.messages <- newMessage(msg.data)
49
})
50
if err != nil {
+5
server/topic.go
+5
server/topic.go