An experimental pub/sub client and server project.
1package main
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "github.com/willdot/messagebroker/client"
11 "github.com/willdot/messagebroker/internal/server"
12)
13
14// var publish *bool
15// var consume *bool
16var consumeFrom *int
17var clientType *string
18
19const (
20 topic = "topic-a"
21)
22
23func main() {
24 clientType = flag.String("client-type", "consume", "consume or publish (default consume)")
25 // publish = flag.Bool("publish", false, "will publish messages every 500ms until client is stopped")
26 // consume = flag.Bool("consume", false, "will consume messages until client is stopped")
27 consumeFrom = flag.Int("consume-from", -1, "index of message to start consuming from. If not set it will consume from the most recent")
28 flag.Parse()
29
30 switch *clientType {
31 case "consume":
32 consume()
33 case "publish":
34 sendMessages()
35 default:
36 fmt.Println("unknown client type")
37 }
38}
39
40func consume() {
41 sub, err := client.NewSubscriber(":3000")
42 if err != nil {
43 panic(err)
44 }
45
46 defer func() {
47 _ = sub.Close()
48 }()
49 startAt := 0
50 startAtType := server.Current
51 if *consumeFrom > -1 {
52 startAtType = server.From
53 startAt = *consumeFrom
54 }
55
56 err = sub.SubscribeToTopics([]string{topic}, startAtType, startAt)
57 if err != nil {
58 panic(err)
59 }
60
61 consumer := sub.Consume(context.Background())
62 if consumer.Err != nil {
63 panic(err)
64 }
65
66 for msg := range consumer.Messages() {
67 slog.Info("received message", "message", string(msg.Data))
68 msg.Ack(true)
69 }
70}
71
72func sendMessages() {
73 publisher, err := client.NewPublisher("localhost:3000")
74 if err != nil {
75 panic(err)
76 }
77
78 defer func() {
79 _ = publisher.Close()
80 }()
81
82 // send some messages
83 i := 0
84 for {
85 i++
86 msg := client.NewMessage(topic, []byte(fmt.Sprintf("message %d", i)))
87
88 err = publisher.PublishMessage(msg)
89 if err != nil {
90 slog.Error("failed to publish message", "error", err)
91 continue
92 }
93
94 slog.Info("message sent")
95
96 time.Sleep(time.Millisecond * 500)
97 }
98}