An experimental pub/sub client and server project.
at main 1.9 kB view raw
1package main 2 3import ( 4 "context" 5 "flag" 6 "fmt" 7 "log/slog" 8 "time" 9 10 "github.com/willdot/messagebroker/client" 11 "github.com/willdot/messagebroker/internal/server" 12) 13 14// var publish *bool 15// var consume *bool 16var consumeFrom *int 17var clientType *string 18 19const ( 20 topic = "topic-a" 21) 22 23func main() { 24 clientType = flag.String("client-type", "consume", "consume or publish (default consume)") 25 // publish = flag.Bool("publish", false, "will publish messages every 500ms until client is stopped") 26 // consume = flag.Bool("consume", false, "will consume messages until client is stopped") 27 consumeFrom = flag.Int("consume-from", -1, "index of message to start consuming from. If not set it will consume from the most recent") 28 flag.Parse() 29 30 switch *clientType { 31 case "consume": 32 consume() 33 case "publish": 34 sendMessages() 35 default: 36 fmt.Println("unknown client type") 37 } 38} 39 40func consume() { 41 sub, err := client.NewSubscriber(":3000") 42 if err != nil { 43 panic(err) 44 } 45 46 defer func() { 47 _ = sub.Close() 48 }() 49 startAt := 0 50 startAtType := server.Current 51 if *consumeFrom > -1 { 52 startAtType = server.From 53 startAt = *consumeFrom 54 } 55 56 err = sub.SubscribeToTopics([]string{topic}, startAtType, startAt) 57 if err != nil { 58 panic(err) 59 } 60 61 consumer := sub.Consume(context.Background()) 62 if consumer.Err != nil { 63 panic(err) 64 } 65 66 for msg := range consumer.Messages() { 67 slog.Info("received message", "message", string(msg.Data)) 68 msg.Ack(true) 69 } 70} 71 72func sendMessages() { 73 publisher, err := client.NewPublisher("localhost:3000") 74 if err != nil { 75 panic(err) 76 } 77 78 defer func() { 79 _ = publisher.Close() 80 }() 81 82 // send some messages 83 i := 0 84 for { 85 i++ 86 msg := client.NewMessage(topic, []byte(fmt.Sprintf("message %d", i))) 87 88 err = publisher.PublishMessage(msg) 89 if err != nil { 90 slog.Error("failed to publish message", "error", err) 91 continue 92 } 93 94 slog.Info("message sent") 95 96 time.Sleep(time.Millisecond * 500) 97 } 98}