forked from tangled.org/core
Monorepo for Tangled

knotclient: introduce CursorStore

consumers can configure a cursor-store, where cursors of individual
event sources are stored. the module provides an in-memory store and a
redis-backed store.

Signed-off-by: oppiliappan <me@oppi.li>

authored by oppi.li and committed by Tangled ee71e636 04512271

Changed files
+77 -9
cmd
eventconsumer
knotclient
+11 -7
cmd/eventconsumer/main.go
··· 11 11 ) 12 12 13 13 func main() { 14 - sourcesFlag := flag.String("sources", "", "list of wss sources") 14 + knots := flag.String("knots", "", "list of knots to connect to") 15 15 retryFlag := flag.Duration("retry", 1*time.Minute, "retry interval") 16 16 maxRetryFlag := flag.Duration("max-retry", 30*time.Minute, "max retry interval") 17 17 workerCount := flag.Int("workers", 10, "goroutine pool size") 18 18 19 19 flag.Parse() 20 20 21 - if *sourcesFlag == "" { 22 - fmt.Println("error: -sources is required") 21 + if *knots == "" { 22 + fmt.Println("error: -knots is required") 23 23 flag.Usage() 24 24 return 25 25 } 26 26 27 - sources := strings.Split(*sourcesFlag, ",") 27 + var srcs []knotclient.EventSource 28 + for k := range strings.SplitSeq(*knots, ",") { 29 + srcs = append(srcs, knotclient.EventSource{k}) 30 + } 28 31 29 32 consumer := knotclient.NewEventConsumer(knotclient.ConsumerConfig{ 30 - Sources: sources, 33 + Sources: srcs, 31 34 ProcessFunc: processEvent, 32 35 RetryInterval: *retryFlag, 33 36 MaxRetryInterval: *maxRetryFlag, 34 37 WorkerCount: *workerCount, 38 + Dev: true, 35 39 }) 36 40 37 41 ctx, cancel := context.WithCancel(context.Background()) ··· 41 45 consumer.Stop() 42 46 } 43 47 44 - func processEvent(source string, msg []byte) error { 45 - fmt.Printf("From %s: %s\n", source, string(msg)) 48 + func processEvent(source knotclient.EventSource, msg knotclient.Message) error { 49 + fmt.Printf("From %s (%s, %s): %s\n", source.Knot, msg.Rkey, msg.Nsid, string(msg.EventJson)) 46 50 return nil 47 51 }
+66 -2
knotclient/events.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 + "fmt" 6 7 "log/slog" 7 8 "math/rand" 8 9 "net/url" 9 10 "sync" 10 11 "time" 11 12 13 + "tangled.sh/tangled.sh/core/appview/cache" 12 14 "tangled.sh/tangled.sh/core/log" 13 15 14 16 "github.com/gorilla/websocket" ··· 20 22 Rkey string 21 23 Nsid string 22 24 // do not full deserialize this portion of the message, processFunc can do that 23 - EventJson json.RawMessage 25 + EventJson json.RawMessage `json:"event"` 24 26 } 25 27 26 28 type ConsumerConfig struct { ··· 33 35 QueueSize int 34 36 Logger *slog.Logger 35 37 Dev bool 38 + CursorStore CursorStore 36 39 } 37 40 38 41 type EventSource struct { ··· 58 61 mu sync.RWMutex 59 62 } 60 63 64 + type CursorStore interface { 65 + Set(knot, cursor string) 66 + Get(knot string) (cursor string) 67 + } 68 + 69 + type RedisCursorStore struct { 70 + rdb *cache.Cache 71 + } 72 + 73 + func NewRedisCursorStore(cache *cache.Cache) RedisCursorStore { 74 + return RedisCursorStore{ 75 + rdb: cache, 76 + } 77 + } 78 + 79 + const ( 80 + cursorKey = "cursor:%s" 81 + ) 82 + 83 + func (r *RedisCursorStore) Set(knot, cursor string) { 84 + key := fmt.Sprintf(cursorKey, knot) 85 + r.rdb.Set(context.Background(), key, cursor, 0) 86 + } 87 + 88 + func (r *RedisCursorStore) Get(knot string) (cursor string) { 89 + key := fmt.Sprintf(cursorKey, knot) 90 + val, err := r.rdb.Get(context.Background(), key).Result() 91 + if err != nil { 92 + return "" 93 + } 94 + 95 + return val 96 + } 97 + 98 + type MemoryCursorStore struct { 99 + store sync.Map 100 + } 101 + 102 + func (m *MemoryCursorStore) Set(knot, cursor string) { 103 + m.store.Store(knot, cursor) 104 + } 105 + 106 + func (m *MemoryCursorStore) Get(knot string) (cursor string) { 107 + if result, ok := m.store.Load(knot); ok { 108 + if val, ok := result.(string); ok { 109 + return val 110 + } 111 + } 112 + 113 + return "" 114 + } 115 + 61 116 func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) { 62 117 scheme := "wss" 63 118 if e.cfg.Dev { ··· 100 155 } 101 156 if cfg.QueueSize == 0 { 102 157 cfg.QueueSize = 100 158 + } 159 + if cfg.CursorStore == nil { 160 + cfg.CursorStore = &MemoryCursorStore{} 103 161 } 104 162 return &EventConsumer{ 105 163 cfg: cfg, ··· 111 169 } 112 170 113 171 func (c *EventConsumer) Start(ctx context.Context) { 172 + c.cfg.Logger.Info("starting consumer", "config", c.cfg) 173 + 114 174 // start workers 115 175 for range c.cfg.WorkerCount { 116 176 c.wg.Add(1) ··· 160 220 c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err) 161 221 return 162 222 } 223 + 224 + // update cursor 225 + c.cfg.CursorStore.Set(j.source.Knot, msg.Rkey) 226 + 163 227 if err := c.cfg.ProcessFunc(j.source, msg); err != nil { 164 228 c.logger.Error("error processing message", "source", j.source, "err", err) 165 229 } ··· 204 268 connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) 205 269 defer cancel() 206 270 207 - u, err := url.Parse(source) 271 + cursor := c.cfg.CursorStore.Get(source.Knot) 208 272 209 273 u, err := c.buildUrl(source, cursor) 210 274 if err != nil {