forked from tangled.org/core
this repo has no description

eventconsumer: extract knotclient/events into its own package

Signed-off-by: oppiliappan <me@oppi.li>

authored by oppi.li and committed by Tangled 85720a2c fc4ba7b2

+39
eventconsumer/knot.go
···
··· 1 + package eventconsumer 2 + 3 + import ( 4 + "fmt" 5 + "net/url" 6 + ) 7 + 8 + type KnotSource struct { 9 + Knot string 10 + } 11 + 12 + func (k KnotSource) Key() string { 13 + return k.Knot 14 + } 15 + 16 + func (k KnotSource) Url(cursor int64, dev bool) (*url.URL, error) { 17 + scheme := "wss" 18 + if dev { 19 + scheme = "ws" 20 + } 21 + 22 + u, err := url.Parse(scheme + "://" + k.Knot + "/events") 23 + if err != nil { 24 + return nil, err 25 + } 26 + 27 + if cursor != 0 { 28 + query := url.Values{} 29 + query.Add("cursor", fmt.Sprintf("%d", cursor)) 30 + u.RawQuery = query.Encode() 31 + } 32 + return u, nil 33 + } 34 + 35 + func NewKnotSource(knot string) KnotSource { 36 + return KnotSource{ 37 + Knot: knot, 38 + } 39 + }
+39
eventconsumer/spindle.go
···
··· 1 + package eventconsumer 2 + 3 + import ( 4 + "fmt" 5 + "net/url" 6 + ) 7 + 8 + type SpindleSource struct { 9 + Spindle string 10 + } 11 + 12 + func (s SpindleSource) Key() string { 13 + return s.Spindle 14 + } 15 + 16 + func (s SpindleSource) Url(cursor int64, dev bool) (*url.URL, error) { 17 + scheme := "wss" 18 + if dev { 19 + scheme = "ws" 20 + } 21 + 22 + u, err := url.Parse(scheme + "://" + s.Spindle + "/events") 23 + if err != nil { 24 + return nil, err 25 + } 26 + 27 + if cursor != 0 { 28 + query := url.Values{} 29 + query.Add("cursor", fmt.Sprintf("%d", cursor)) 30 + u.RawQuery = query.Encode() 31 + } 32 + return u, nil 33 + } 34 + 35 + func NewSpindleSource(spindle string) SpindleSource { 36 + return SpindleSource{ 37 + Spindle: spindle, 38 + } 39 + }
knotclient/cursor/memory.go eventconsumer/cursor/memory.go
knotclient/cursor/redis.go eventconsumer/cursor/redis.go
knotclient/cursor/sqlite.go eventconsumer/cursor/sqlite.go
knotclient/cursor/store.go eventconsumer/cursor/store.go
+25 -48
knotclient/events.go eventconsumer/consumer.go
··· 1 - package knotclient 2 3 import ( 4 "context" 5 "encoding/json" 6 - "fmt" 7 "log/slog" 8 "math/rand" 9 "net/url" 10 "sync" 11 "time" 12 13 - "tangled.sh/tangled.sh/core/knotclient/cursor" 14 "tangled.sh/tangled.sh/core/log" 15 16 "github.com/gorilla/websocket" 17 ) 18 19 - type ProcessFunc func(ctx context.Context, source EventSource, message Message) error 20 21 type Message struct { 22 Rkey string ··· 26 } 27 28 type ConsumerConfig struct { 29 - Sources map[EventSource]struct{} 30 ProcessFunc ProcessFunc 31 RetryInterval time.Duration 32 MaxRetryInterval time.Duration ··· 40 41 func NewConsumerConfig() *ConsumerConfig { 42 return &ConsumerConfig{ 43 - Sources: make(map[EventSource]struct{}), 44 } 45 } 46 47 - type EventSource struct { 48 - Knot string 49 } 50 51 - func NewEventSource(knot string) EventSource { 52 - return EventSource{ 53 - Knot: knot, 54 - } 55 - } 56 - 57 - type EventConsumer struct { 58 wg sync.WaitGroup 59 dialer *websocket.Dialer 60 connMap sync.Map ··· 67 cfg ConsumerConfig 68 } 69 70 - func (e *EventConsumer) buildUrl(s EventSource, cursor int64) (*url.URL, error) { 71 - scheme := "wss" 72 - if e.cfg.Dev { 73 - scheme = "ws" 74 - } 75 - 76 - u, err := url.Parse(scheme + "://" + s.Knot + "/events") 77 - if err != nil { 78 - return nil, err 79 - } 80 - 81 - if cursor != 0 { 82 - query := url.Values{} 83 - query.Add("cursor", fmt.Sprintf("%d", cursor)) 84 - u.RawQuery = query.Encode() 85 - } 86 - return u, nil 87 - } 88 - 89 type job struct { 90 - source EventSource 91 message []byte 92 } 93 94 - func NewEventConsumer(cfg ConsumerConfig) *EventConsumer { 95 if cfg.RetryInterval == 0 { 96 cfg.RetryInterval = 15 * time.Minute 97 } ··· 105 cfg.MaxRetryInterval = 1 * time.Hour 106 } 107 if cfg.Logger == nil { 108 - cfg.Logger = log.New("eventconsumer") 109 } 110 if cfg.QueueSize == 0 { 111 cfg.QueueSize = 100 ··· 113 if cfg.CursorStore == nil { 114 cfg.CursorStore = &cursor.MemoryStore{} 115 } 116 - return &EventConsumer{ 117 cfg: cfg, 118 dialer: websocket.DefaultDialer, 119 jobQueue: make(chan job, cfg.QueueSize), // buffered job queue ··· 122 } 123 } 124 125 - func (c *EventConsumer) Start(ctx context.Context) { 126 c.cfg.Logger.Info("starting consumer", "config", c.cfg) 127 128 // start workers ··· 138 } 139 } 140 141 - func (c *EventConsumer) Stop() { 142 c.connMap.Range(func(_, val any) bool { 143 if conn, ok := val.(*websocket.Conn); ok { 144 conn.Close() ··· 149 close(c.jobQueue) 150 } 151 152 - func (c *EventConsumer) AddSource(ctx context.Context, s EventSource) { 153 // we are already listening to this source 154 if _, ok := c.cfg.Sources[s]; ok { 155 c.logger.Info("source already present", "source", s) ··· 163 c.cfgMu.Unlock() 164 } 165 166 - func (c *EventConsumer) worker(ctx context.Context) { 167 defer c.wg.Done() 168 for { 169 select { ··· 177 var msg Message 178 err := json.Unmarshal(j.message, &msg) 179 if err != nil { 180 - c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err) 181 return 182 } 183 184 // update cursor 185 - c.cfg.CursorStore.Set(j.source.Knot, time.Now().UnixNano()) 186 187 if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil { 188 c.logger.Error("error processing message", "source", j.source, "err", err) ··· 191 } 192 } 193 194 - func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) { 195 defer c.wg.Done() 196 retryInterval := c.cfg.RetryInterval 197 for { ··· 224 } 225 } 226 227 - func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error { 228 connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) 229 defer cancel() 230 231 - cursor := c.cfg.CursorStore.Get(source.Knot) 232 233 - u, err := c.buildUrl(source, cursor) 234 if err != nil { 235 return err 236 }
··· 1 + package eventconsumer 2 3 import ( 4 "context" 5 "encoding/json" 6 "log/slog" 7 "math/rand" 8 "net/url" 9 "sync" 10 "time" 11 12 + "tangled.sh/tangled.sh/core/eventconsumer/cursor" 13 "tangled.sh/tangled.sh/core/log" 14 15 "github.com/gorilla/websocket" 16 ) 17 18 + type ProcessFunc func(ctx context.Context, source Source, message Message) error 19 20 type Message struct { 21 Rkey string ··· 25 } 26 27 type ConsumerConfig struct { 28 + Sources map[Source]struct{} 29 ProcessFunc ProcessFunc 30 RetryInterval time.Duration 31 MaxRetryInterval time.Duration ··· 39 40 func NewConsumerConfig() *ConsumerConfig { 41 return &ConsumerConfig{ 42 + Sources: make(map[Source]struct{}), 43 } 44 } 45 46 + type Source interface { 47 + // url to start streaming events from 48 + Url(cursor int64, dev bool) (*url.URL, error) 49 + // cache key for cursor storage 50 + Key() string 51 } 52 53 + type Consumer struct { 54 wg sync.WaitGroup 55 dialer *websocket.Dialer 56 connMap sync.Map ··· 63 cfg ConsumerConfig 64 } 65 66 type job struct { 67 + source Source 68 message []byte 69 } 70 71 + func NewConsumer(cfg ConsumerConfig) *Consumer { 72 if cfg.RetryInterval == 0 { 73 cfg.RetryInterval = 15 * time.Minute 74 } ··· 82 cfg.MaxRetryInterval = 1 * time.Hour 83 } 84 if cfg.Logger == nil { 85 + cfg.Logger = log.New("consumer") 86 } 87 if cfg.QueueSize == 0 { 88 cfg.QueueSize = 100 ··· 90 if cfg.CursorStore == nil { 91 cfg.CursorStore = &cursor.MemoryStore{} 92 } 93 + return &Consumer{ 94 cfg: cfg, 95 dialer: websocket.DefaultDialer, 96 jobQueue: make(chan job, cfg.QueueSize), // buffered job queue ··· 99 } 100 } 101 102 + func (c *Consumer) Start(ctx context.Context) { 103 c.cfg.Logger.Info("starting consumer", "config", c.cfg) 104 105 // start workers ··· 115 } 116 } 117 118 + func (c *Consumer) Stop() { 119 c.connMap.Range(func(_, val any) bool { 120 if conn, ok := val.(*websocket.Conn); ok { 121 conn.Close() ··· 126 close(c.jobQueue) 127 } 128 129 + func (c *Consumer) AddSource(ctx context.Context, s Source) { 130 // we are already listening to this source 131 if _, ok := c.cfg.Sources[s]; ok { 132 c.logger.Info("source already present", "source", s) ··· 140 c.cfgMu.Unlock() 141 } 142 143 + func (c *Consumer) worker(ctx context.Context) { 144 defer c.wg.Done() 145 for { 146 select { ··· 154 var msg Message 155 err := json.Unmarshal(j.message, &msg) 156 if err != nil { 157 + c.logger.Error("error deserializing message", "source", j.source.Key(), "err", err) 158 return 159 } 160 161 // update cursor 162 + c.cfg.CursorStore.Set(j.source.Key(), time.Now().UnixNano()) 163 164 if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil { 165 c.logger.Error("error processing message", "source", j.source, "err", err) ··· 168 } 169 } 170 171 + func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) { 172 defer c.wg.Done() 173 retryInterval := c.cfg.RetryInterval 174 for { ··· 201 } 202 } 203 204 + func (c *Consumer) runConnection(ctx context.Context, source Source) error { 205 connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) 206 defer cancel() 207 208 + cursor := c.cfg.CursorStore.Get(source.Key()) 209 210 + u, err := source.Url(cursor, c.cfg.Dev) 211 if err != nil { 212 return err 213 }
+2 -2
spindle/ingester.go
··· 6 "fmt" 7 8 "tangled.sh/tangled.sh/core/api/tangled" 9 - "tangled.sh/tangled.sh/core/knotclient" 10 11 "github.com/bluesky-social/jetstream/pkg/models" 12 ) ··· 128 } 129 130 // add this knot to the event consumer 131 - src := knotclient.NewEventSource(record.Knot) 132 s.ks.AddSource(context.Background(), src) 133 134 return nil
··· 6 "fmt" 7 8 "tangled.sh/tangled.sh/core/api/tangled" 9 + "tangled.sh/tangled.sh/core/eventconsumer" 10 11 "github.com/bluesky-social/jetstream/pkg/models" 12 ) ··· 128 } 129 130 // add this knot to the event consumer 131 + src := eventconsumer.NewKnotSource(record.Knot) 132 s.ks.AddSource(context.Background(), src) 133 134 return nil
+8 -8
spindle/server.go
··· 9 10 "github.com/go-chi/chi/v5" 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/jetstream" 13 - "tangled.sh/tangled.sh/core/knotclient" 14 - "tangled.sh/tangled.sh/core/knotclient/cursor" 15 "tangled.sh/tangled.sh/core/log" 16 "tangled.sh/tangled.sh/core/notifier" 17 "tangled.sh/tangled.sh/core/rbac" ··· 35 eng *engine.Engine 36 jq *queue.Queue 37 cfg *config.Config 38 - ks *knotclient.EventConsumer 39 } 40 41 func Run(ctx context.Context) error { ··· 114 // for each incoming sh.tangled.pipeline, we execute 115 // spindle.processPipeline, which in turn enqueues the pipeline 116 // job in the above registered queue. 117 - ccfg := knotclient.NewConsumerConfig() 118 ccfg.Logger = logger 119 ccfg.Dev = cfg.Server.Dev 120 ccfg.ProcessFunc = spindle.processPipeline ··· 125 } 126 for _, knot := range knownKnots { 127 logger.Info("adding source start", "knot", knot) 128 - ccfg.Sources[knotclient.EventSource{knot}] = struct{}{} 129 } 130 - spindle.ks = knotclient.NewEventConsumer(*ccfg) 131 132 go func() { 133 logger.Info("starting knot event consumer") ··· 151 return mux 152 } 153 154 - func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error { 155 if msg.Nsid == tangled.PipelineNSID { 156 pipeline := tangled.Pipeline{} 157 err := json.Unmarshal(msg.EventJson, &pipeline) ··· 179 } 180 181 pipelineId := models.PipelineId{ 182 - Knot: src.Knot, 183 Rkey: msg.Rkey, 184 } 185
··· 9 10 "github.com/go-chi/chi/v5" 11 "tangled.sh/tangled.sh/core/api/tangled" 12 + "tangled.sh/tangled.sh/core/eventconsumer" 13 + "tangled.sh/tangled.sh/core/eventconsumer/cursor" 14 "tangled.sh/tangled.sh/core/jetstream" 15 "tangled.sh/tangled.sh/core/log" 16 "tangled.sh/tangled.sh/core/notifier" 17 "tangled.sh/tangled.sh/core/rbac" ··· 35 eng *engine.Engine 36 jq *queue.Queue 37 cfg *config.Config 38 + ks *eventconsumer.Consumer 39 } 40 41 func Run(ctx context.Context) error { ··· 114 // for each incoming sh.tangled.pipeline, we execute 115 // spindle.processPipeline, which in turn enqueues the pipeline 116 // job in the above registered queue. 117 + ccfg := eventconsumer.NewConsumerConfig() 118 ccfg.Logger = logger 119 ccfg.Dev = cfg.Server.Dev 120 ccfg.ProcessFunc = spindle.processPipeline ··· 125 } 126 for _, knot := range knownKnots { 127 logger.Info("adding source start", "knot", knot) 128 + ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 129 } 130 + spindle.ks = eventconsumer.NewConsumer(*ccfg) 131 132 go func() { 133 logger.Info("starting knot event consumer") ··· 151 return mux 152 } 153 154 + func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 155 if msg.Nsid == tangled.PipelineNSID { 156 pipeline := tangled.Pipeline{} 157 err := json.Unmarshal(msg.EventJson, &pipeline) ··· 179 } 180 181 pipelineId := models.PipelineId{ 182 + Knot: src.Key(), 183 Rkey: msg.Rkey, 184 } 185
+24 -5
spindle/stream.go
··· 2 3 import ( 4 "context" 5 "fmt" 6 "net/http" 7 "strconv" ··· 206 } 207 208 func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { 209 - ops, err := s.db.GetEvents(*cursor) 210 if err != nil { 211 s.l.Debug("err", "err", err) 212 return err 213 } 214 - s.l.Debug("ops", "ops", ops) 215 216 - for _, op := range ops { 217 - if err := conn.WriteJSON(op); err != nil { 218 s.l.Debug("err", "err", err) 219 return err 220 } 221 - *cursor = op.Created 222 } 223 224 return nil
··· 2 3 import ( 4 "context" 5 + "encoding/json" 6 "fmt" 7 "net/http" 8 "strconv" ··· 207 } 208 209 func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error { 210 + events, err := s.db.GetEvents(*cursor) 211 if err != nil { 212 s.l.Debug("err", "err", err) 213 return err 214 } 215 + s.l.Debug("ops", "ops", events) 216 217 + for _, event := range events { 218 + // first extract the inner json into a map 219 + var eventJson map[string]any 220 + err := json.Unmarshal([]byte(event.EventJson), &eventJson) 221 + if err != nil { 222 + s.l.Error("failed to unmarshal event", "err", err) 223 + return err 224 + } 225 + 226 + jsonMsg, err := json.Marshal(map[string]any{ 227 + "rkey": event.Rkey, 228 + "nsid": event.Nsid, 229 + "event": eventJson, 230 + }) 231 + if err != nil { 232 + s.l.Error("failed to marshal record", "err", err) 233 + return err 234 + } 235 + 236 + if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil { 237 s.l.Debug("err", "err", err) 238 return err 239 } 240 + *cursor = event.Created 241 } 242 243 return nil