forked from tangled.org/core
this repo has no description

knotclient: introduce event consumer

generic websocket consumer for one or more event streams from knots

Signed-off-by: oppiliappan <me@oppi.li>

authored by oppi.li and committed by Tangled 46d66329 d2445179

Changed files
+232
cmd
eventconsumer
knotclient
+47
cmd/eventconsumer/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "flag" 6 + "fmt" 7 + "strings" 8 + "time" 9 + 10 + "tangled.sh/tangled.sh/core/knotclient" 11 + ) 12 + 13 + func main() { 14 + sourcesFlag := flag.String("sources", "", "list of wss sources") 15 + retryFlag := flag.Duration("retry", 1*time.Minute, "retry interval") 16 + maxRetryFlag := flag.Duration("max-retry", 30*time.Minute, "max retry interval") 17 + workerCount := flag.Int("workers", 10, "goroutine pool size") 18 + 19 + flag.Parse() 20 + 21 + if *sourcesFlag == "" { 22 + fmt.Println("error: -sources is required") 23 + flag.Usage() 24 + return 25 + } 26 + 27 + sources := strings.Split(*sourcesFlag, ",") 28 + 29 + consumer := knotclient.NewEventConsumer(knotclient.ConsumerConfig{ 30 + Sources: sources, 31 + ProcessFunc: processEvent, 32 + RetryInterval: *retryFlag, 33 + MaxRetryInterval: *maxRetryFlag, 34 + WorkerCount: *workerCount, 35 + }) 36 + 37 + ctx, cancel := context.WithCancel(context.Background()) 38 + consumer.Start(ctx) 39 + time.Sleep(1 * time.Hour) 40 + cancel() 41 + consumer.Stop() 42 + } 43 + 44 + func processEvent(source string, msg []byte) error { 45 + fmt.Printf("From %s: %s\n", source, string(msg)) 46 + return nil 47 + }
+185
knotclient/events.go
··· 1 + package knotclient 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + "math/rand" 7 + "net/url" 8 + "sync" 9 + "time" 10 + 11 + "tangled.sh/tangled.sh/core/log" 12 + 13 + "github.com/gorilla/websocket" 14 + ) 15 + 16 + type ProcessFunc func(source string, message []byte) error 17 + 18 + type ConsumerConfig struct { 19 + Sources []string 20 + ProcessFunc ProcessFunc 21 + RetryInterval time.Duration 22 + MaxRetryInterval time.Duration 23 + ConnectionTimeout time.Duration 24 + WorkerCount int 25 + QueueSize int 26 + Logger *slog.Logger 27 + } 28 + 29 + type EventConsumer struct { 30 + cfg ConsumerConfig 31 + wg sync.WaitGroup 32 + dialer *websocket.Dialer 33 + connMap sync.Map 34 + jobQueue chan job 35 + logger *slog.Logger 36 + randSource *rand.Rand 37 + } 38 + 39 + type job struct { 40 + source string 41 + message []byte 42 + } 43 + 44 + func NewEventConsumer(cfg ConsumerConfig) *EventConsumer { 45 + if cfg.RetryInterval == 0 { 46 + cfg.RetryInterval = 15 * time.Minute 47 + } 48 + if cfg.ConnectionTimeout == 0 { 49 + cfg.ConnectionTimeout = 10 * time.Second 50 + } 51 + if cfg.WorkerCount <= 0 { 52 + cfg.WorkerCount = 5 53 + } 54 + if cfg.MaxRetryInterval == 0 { 55 + cfg.MaxRetryInterval = 1 * time.Hour 56 + } 57 + if cfg.Logger == nil { 58 + cfg.Logger = log.New("eventconsumer") 59 + } 60 + if cfg.QueueSize == 0 { 61 + cfg.QueueSize = 100 62 + } 63 + return &EventConsumer{ 64 + cfg: cfg, 65 + dialer: websocket.DefaultDialer, 66 + jobQueue: make(chan job, cfg.QueueSize), // buffered job queue 67 + logger: cfg.Logger, 68 + randSource: rand.New(rand.NewSource(time.Now().UnixNano())), 69 + } 70 + } 71 + 72 + func (c *EventConsumer) Start(ctx context.Context) { 73 + // start workers 74 + for range c.cfg.WorkerCount { 75 + c.wg.Add(1) 76 + go c.worker(ctx) 77 + } 78 + 79 + // start streaming 80 + for _, source := range c.cfg.Sources { 81 + c.wg.Add(1) 82 + go c.startConnectionLoop(ctx, source) 83 + } 84 + } 85 + 86 + func (c *EventConsumer) Stop() { 87 + c.connMap.Range(func(_, val any) bool { 88 + if conn, ok := val.(*websocket.Conn); ok { 89 + conn.Close() 90 + } 91 + return true 92 + }) 93 + c.wg.Wait() 94 + close(c.jobQueue) 95 + } 96 + 97 + func (c *EventConsumer) worker(ctx context.Context) { 98 + defer c.wg.Done() 99 + for { 100 + select { 101 + case <-ctx.Done(): 102 + return 103 + case j, ok := <-c.jobQueue: 104 + if !ok { 105 + return 106 + } 107 + if err := c.cfg.ProcessFunc(j.source, j.message); err != nil { 108 + c.logger.Error("error processing message", "source", j.source, "err", err) 109 + } 110 + } 111 + } 112 + } 113 + 114 + func (c *EventConsumer) startConnectionLoop(ctx context.Context, source string) { 115 + defer c.wg.Done() 116 + retryInterval := c.cfg.RetryInterval 117 + for { 118 + select { 119 + case <-ctx.Done(): 120 + return 121 + default: 122 + err := c.runConnection(ctx, source) 123 + if err != nil { 124 + c.logger.Error("connection failed", "source", source, "err", err) 125 + } 126 + 127 + // apply jitter 128 + jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5)) 129 + delay := retryInterval + jitter 130 + 131 + if retryInterval < c.cfg.MaxRetryInterval { 132 + retryInterval *= 2 133 + if retryInterval > c.cfg.MaxRetryInterval { 134 + retryInterval = c.cfg.MaxRetryInterval 135 + } 136 + } 137 + c.logger.Info("retrying connection", "source", source, "delay", delay) 138 + select { 139 + case <-time.After(delay): 140 + case <-ctx.Done(): 141 + return 142 + } 143 + } 144 + } 145 + } 146 + 147 + func (c *EventConsumer) runConnection(ctx context.Context, source string) error { 148 + connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) 149 + defer cancel() 150 + 151 + u, err := url.Parse(source) 152 + if err != nil { 153 + return err 154 + } 155 + 156 + conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil) 157 + if err != nil { 158 + return err 159 + } 160 + defer conn.Close() 161 + c.connMap.Store(source, conn) 162 + defer c.connMap.Delete(source) 163 + 164 + c.logger.Info("connected", "source", source) 165 + 166 + for { 167 + select { 168 + case <-ctx.Done(): 169 + return nil 170 + default: 171 + msgType, msg, err := conn.ReadMessage() 172 + if err != nil { 173 + return err 174 + } 175 + if msgType != websocket.TextMessage { 176 + continue 177 + } 178 + select { 179 + case c.jobQueue <- job{source: source, message: msg}: 180 + case <-ctx.Done(): 181 + return nil 182 + } 183 + } 184 + } 185 + }