Monorepo for Tangled
at master 266 lines 5.6 kB view raw
1package eventconsumer 2 3import ( 4 "context" 5 "encoding/json" 6 "log/slog" 7 "math/rand" 8 "net/url" 9 "sync" 10 "time" 11 12 "tangled.org/core/eventconsumer/cursor" 13 "tangled.org/core/log" 14 15 "github.com/avast/retry-go/v4" 16 "github.com/gorilla/websocket" 17) 18 19type ProcessFunc func(ctx context.Context, source Source, message Message) error 20 21type Message struct { 22 Rkey string 23 Nsid string 24 Created int64 `json:"created"` 25 EventJson json.RawMessage `json:"event"` 26} 27 28type ConsumerConfig struct { 29 Sources map[Source]struct{} 30 ProcessFunc ProcessFunc 31 RetryInterval time.Duration 32 MaxRetryInterval time.Duration 33 ConnectionTimeout time.Duration 34 WorkerCount int 35 QueueSize int 36 Logger *slog.Logger 37 Dev bool 38 CursorStore cursor.Store 39} 40 41func NewConsumerConfig() *ConsumerConfig { 42 return &ConsumerConfig{ 43 Sources: make(map[Source]struct{}), 44 } 45} 46 47type Source interface { 48 // url to start streaming events from 49 Url(cursor int64, dev bool) (*url.URL, error) 50 // cache key for cursor storage 51 Key() string 52} 53 54type Consumer struct { 55 wg sync.WaitGroup 56 dialer *websocket.Dialer 57 connMap sync.Map 58 jobQueue chan job 59 logger *slog.Logger 60 randSource *rand.Rand 61 62 // rw lock over edits to ConsumerConfig 63 cfgMu sync.RWMutex 64 cfg ConsumerConfig 65} 66 67type job struct { 68 source Source 69 message []byte 70} 71 72func NewConsumer(cfg ConsumerConfig) *Consumer { 73 if cfg.RetryInterval == 0 { 74 cfg.RetryInterval = 15 * time.Minute 75 } 76 if cfg.ConnectionTimeout == 0 { 77 cfg.ConnectionTimeout = 10 * time.Second 78 } 79 if cfg.WorkerCount <= 0 { 80 cfg.WorkerCount = 5 81 } 82 if cfg.MaxRetryInterval == 0 { 83 cfg.MaxRetryInterval = 1 * time.Hour 84 } 85 if cfg.Logger == nil { 86 cfg.Logger = log.New("consumer") 87 } 88 if cfg.QueueSize == 0 { 89 cfg.QueueSize = 100 90 } 91 if cfg.CursorStore == nil { 92 cfg.CursorStore = &cursor.MemoryStore{} 93 } 94 return &Consumer{ 95 cfg: cfg, 96 dialer: websocket.DefaultDialer, 97 jobQueue: make(chan job, cfg.QueueSize), // buffered job queue 98 logger: cfg.Logger, 99 randSource: rand.New(rand.NewSource(time.Now().UnixNano())), 100 } 101} 102 103func (c *Consumer) Start(ctx context.Context) { 104 c.cfg.Logger.Info("starting consumer", "config", c.cfg) 105 106 // start workers 107 for range c.cfg.WorkerCount { 108 c.wg.Add(1) 109 go c.worker(ctx) 110 } 111 112 // start streaming 113 for source := range c.cfg.Sources { 114 c.wg.Add(1) 115 go c.startConnectionLoop(ctx, source) 116 } 117} 118 119func (c *Consumer) Stop() { 120 c.connMap.Range(func(_, val any) bool { 121 if conn, ok := val.(*websocket.Conn); ok { 122 conn.Close() 123 } 124 return true 125 }) 126 c.wg.Wait() 127 close(c.jobQueue) 128} 129 130func (c *Consumer) AddSource(ctx context.Context, s Source) { 131 // we are already listening to this source 132 if _, ok := c.cfg.Sources[s]; ok { 133 c.logger.Info("source already present", "source", s) 134 return 135 } 136 137 c.cfgMu.Lock() 138 c.cfg.Sources[s] = struct{}{} 139 c.wg.Add(1) 140 go c.startConnectionLoop(ctx, s) 141 c.cfgMu.Unlock() 142} 143 144func (c *Consumer) worker(ctx context.Context) { 145 defer c.wg.Done() 146 for { 147 select { 148 case <-ctx.Done(): 149 return 150 case j, ok := <-c.jobQueue: 151 if !ok { 152 return 153 } 154 155 var msg Message 156 err := json.Unmarshal(j.message, &msg) 157 if err != nil { 158 c.logger.Error("error deserializing message", "source", j.source.Key(), "err", err) 159 return 160 } 161 162 cursorVal := msg.Created 163 if cursorVal == 0 { 164 cursorVal = time.Now().UnixNano() 165 } 166 c.cfg.CursorStore.Set(j.source.Key(), cursorVal) 167 168 if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil { 169 c.logger.Error("error processing message", "source", j.source, "err", err) 170 } 171 } 172 } 173} 174 175func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) { 176 defer c.wg.Done() 177 178 // attempt connection initially 179 err := c.runConnection(ctx, source) 180 if err != nil { 181 c.logger.Error("failed to run connection", "err", err) 182 } 183 184 timer := time.NewTimer(1 * time.Minute) 185 defer timer.Stop() 186 187 // every subsequent attempt is delayed by 1 minute 188 for { 189 select { 190 case <-ctx.Done(): 191 return 192 case <-timer.C: 193 err := c.runConnection(ctx, source) 194 if err != nil { 195 c.logger.Error("failed to run connection", "err", err) 196 } 197 timer.Reset(1 * time.Minute) 198 } 199 } 200} 201 202func (c *Consumer) runConnection(ctx context.Context, source Source) error { 203 cursor := c.cfg.CursorStore.Get(source.Key()) 204 205 u, err := source.Url(cursor, c.cfg.Dev) 206 if err != nil { 207 return err 208 } 209 210 c.logger.Info("connecting", "url", u.String()) 211 212 retryOpts := []retry.Option{ 213 retry.Attempts(0), // infinite attempts 214 retry.DelayType(retry.BackOffDelay), 215 retry.Delay(c.cfg.RetryInterval), 216 retry.MaxDelay(c.cfg.MaxRetryInterval), 217 retry.MaxJitter(c.cfg.RetryInterval / 5), 218 retry.OnRetry(func(n uint, err error) { 219 c.logger.Info("retrying connection", 220 "source", source, 221 "url", u.String(), 222 "attempt", n+1, 223 "err", err, 224 ) 225 }), 226 retry.Context(ctx), 227 } 228 229 var conn *websocket.Conn 230 231 err = retry.Do(func() error { 232 connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) 233 defer cancel() 234 conn, _, err = c.dialer.DialContext(connCtx, u.String(), nil) 235 return err 236 }, retryOpts...) 237 if err != nil { 238 return err 239 } 240 241 c.connMap.Store(source, conn) 242 defer conn.Close() 243 defer c.connMap.Delete(source) 244 245 c.logger.Info("connected", "source", source) 246 247 for { 248 select { 249 case <-ctx.Done(): 250 return nil 251 default: 252 msgType, msg, err := conn.ReadMessage() 253 if err != nil { 254 return err 255 } 256 if msgType != websocket.TextMessage { 257 continue 258 } 259 select { 260 case c.jobQueue <- job{source: source, message: msg}: 261 case <-ctx.Done(): 262 return nil 263 } 264 } 265 } 266}