Monorepo for Tangled tangled.org

jetstream: init separate package

anirudh.fi eb6dd9fe a7c75c2c

verified
Changed files
+152 -109
cmd
knotserver
jetstream
knotserver
+11 -1
cmd/knotserver/main.go
··· 4 4 "context" 5 5 "net/http" 6 6 7 + "github.com/sotangled/tangled/api/tangled" 8 + "github.com/sotangled/tangled/jetstream" 7 9 "github.com/sotangled/tangled/knotserver" 8 10 "github.com/sotangled/tangled/knotserver/config" 9 11 "github.com/sotangled/tangled/knotserver/db" ··· 40 42 return 41 43 } 42 44 43 - mux, err := knotserver.Setup(ctx, c, db, e, l) 45 + jc, err := jetstream.NewJetstreamClient("knotserver", []string{ 46 + tangled.PublicKeyNSID, 47 + tangled.KnotMemberNSID, 48 + }, nil, db) 49 + if err != nil { 50 + l.Error("failed to setup jetstream", "error", err) 51 + } 52 + 53 + mux, err := knotserver.Setup(ctx, c, db, e, jc, l) 44 54 if err != nil { 45 55 l.Error("failed to setup server", "error", err) 46 56 return
+136
jetstream/jetstream.go
··· 1 + package jetstream 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "sync" 7 + "time" 8 + 9 + "github.com/bluesky-social/jetstream/pkg/client" 10 + "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 11 + "github.com/bluesky-social/jetstream/pkg/models" 12 + "github.com/sotangled/tangled/log" 13 + ) 14 + 15 + type DB interface { 16 + GetLastTimeUs() (int64, error) 17 + SaveLastTimeUs(int64) error 18 + } 19 + 20 + type JetstreamClient struct { 21 + cfg *client.ClientConfig 22 + client *client.Client 23 + ident string 24 + 25 + db DB 26 + reconnectCh chan struct{} 27 + mu sync.RWMutex 28 + } 29 + 30 + func (j *JetstreamClient) AddDid(did string) { 31 + j.mu.Lock() 32 + j.cfg.WantedDids = append(j.cfg.WantedDids, did) 33 + j.mu.Unlock() 34 + j.reconnectCh <- struct{}{} 35 + } 36 + 37 + func (j *JetstreamClient) UpdateDids(dids []string) { 38 + j.mu.Lock() 39 + j.cfg.WantedDids = dids 40 + j.mu.Unlock() 41 + j.reconnectCh <- struct{}{} 42 + } 43 + 44 + func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB) (*JetstreamClient, error) { 45 + if cfg == nil { 46 + cfg = client.DefaultClientConfig() 47 + cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 48 + cfg.WantedCollections = collections 49 + } 50 + 51 + return &JetstreamClient{ 52 + cfg: cfg, 53 + ident: ident, 54 + db: db, 55 + reconnectCh: make(chan struct{}, 1), 56 + }, nil 57 + } 58 + 59 + func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error { 60 + logger := log.FromContext(ctx) 61 + 62 + pf := func(ctx context.Context, e *models.Event) error { 63 + err := processFunc(ctx, e) 64 + if err != nil { 65 + return err 66 + } 67 + 68 + if err := j.db.SaveLastTimeUs(e.TimeUS); err != nil { 69 + return err 70 + } 71 + 72 + return nil 73 + } 74 + 75 + sched := sequential.NewScheduler(j.ident, logger, pf) 76 + 77 + client, err := client.NewClient(j.cfg, log.New("jetstream"), sched) 78 + if err != nil { 79 + return fmt.Errorf("failed to create jetstream client: %w", err) 80 + } 81 + j.client = client 82 + 83 + go func() { 84 + lastTimeUs := j.getLastTimeUs(ctx) 85 + for len(j.cfg.WantedDids) == 0 { 86 + time.Sleep(time.Second) 87 + } 88 + j.connectAndRead(ctx, &lastTimeUs) 89 + }() 90 + 91 + return nil 92 + } 93 + 94 + func (j *JetstreamClient) connectAndRead(ctx context.Context, cursor *int64) { 95 + l := log.FromContext(ctx) 96 + for { 97 + select { 98 + case <-j.reconnectCh: 99 + l.Info("(re)connecting jetstream client") 100 + j.client.Scheduler.Shutdown() 101 + if err := j.client.ConnectAndRead(ctx, cursor); err != nil { 102 + l.Error("error reading jetstream", "error", err) 103 + } 104 + default: 105 + if err := j.client.ConnectAndRead(ctx, cursor); err != nil { 106 + l.Error("error reading jetstream", "error", err) 107 + } 108 + } 109 + } 110 + } 111 + 112 + func (j *JetstreamClient) getLastTimeUs(ctx context.Context) int64 { 113 + l := log.FromContext(ctx) 114 + lastTimeUs, err := j.db.GetLastTimeUs() 115 + if err != nil { 116 + l.Warn("couldn't get last time us, starting from now", "error", err) 117 + lastTimeUs = time.Now().UnixMicro() 118 + err = j.db.SaveLastTimeUs(lastTimeUs) 119 + if err != nil { 120 + l.Error("failed to save last time us") 121 + } 122 + } 123 + 124 + // If last time is older than a week, start from now 125 + if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 126 + lastTimeUs = time.Now().UnixMicro() 127 + l.Warn("last time us is older than a week. discarding that and starting from now") 128 + err = j.db.SaveLastTimeUs(lastTimeUs) 129 + if err != nil { 130 + l.Error("failed to save last time us") 131 + } 132 + } 133 + 134 + l.Info("found last time_us", "time_us", lastTimeUs) 135 + return lastTimeUs 136 + }
+5 -3
knotserver/handler.go
··· 7 7 "net/http" 8 8 9 9 "github.com/go-chi/chi/v5" 10 + "github.com/sotangled/tangled/jetstream" 10 11 "github.com/sotangled/tangled/knotserver/config" 11 12 "github.com/sotangled/tangled/knotserver/db" 12 13 "github.com/sotangled/tangled/rbac" ··· 19 20 type Handle struct { 20 21 c *config.Config 21 22 db *db.DB 22 - jc *JetstreamClient 23 + jc *jetstream.JetstreamClient 23 24 e *rbac.Enforcer 24 25 l *slog.Logger 25 26 ··· 29 30 knotInitialized bool 30 31 } 31 32 32 - func Setup(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer, l *slog.Logger) (http.Handler, error) { 33 + func Setup(ctx context.Context, c *config.Config, db *db.DB, e *rbac.Enforcer, jc *jetstream.JetstreamClient, l *slog.Logger) (http.Handler, error) { 33 34 r := chi.NewRouter() 34 35 35 36 h := Handle{ ··· 37 38 db: db, 38 39 e: e, 39 40 l: l, 41 + jc: jc, 40 42 init: make(chan struct{}), 41 43 } 42 44 ··· 45 47 return nil, fmt.Errorf("failed to setup enforcer: %w", err) 46 48 } 47 49 48 - err = h.StartJetstream(ctx) 50 + err = h.jc.StartJetstream(ctx, h.processMessages) 49 51 if err != nil { 50 52 return nil, fmt.Errorf("failed to start jetstream: %w", err) 51 53 }
-105
knotserver/jetstream.go
··· 8 8 "net/http" 9 9 "net/url" 10 10 "strings" 11 - "sync" 12 - "time" 13 11 14 - "github.com/bluesky-social/jetstream/pkg/client" 15 - "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 16 12 "github.com/bluesky-social/jetstream/pkg/models" 17 13 "github.com/sotangled/tangled/api/tangled" 18 14 "github.com/sotangled/tangled/knotserver/db" 19 15 "github.com/sotangled/tangled/log" 20 16 ) 21 - 22 - type JetstreamClient struct { 23 - cfg *client.ClientConfig 24 - client *client.Client 25 - reconnectCh chan struct{} 26 - mu sync.RWMutex 27 - } 28 - 29 - func (h *Handle) StartJetstream(ctx context.Context) error { 30 - l := h.l 31 - ctx = log.IntoContext(ctx, l) 32 - collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID} 33 - dids := []string{} 34 - 35 - cfg := client.DefaultClientConfig() 36 - cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 37 - cfg.WantedCollections = collections 38 - cfg.WantedDids = dids 39 - 40 - sched := sequential.NewScheduler("knotserver", l, h.processMessages) 41 - 42 - client, err := client.NewClient(cfg, l, sched) 43 - if err != nil { 44 - l.Error("failed to create jetstream client", "error", err) 45 - } 46 - 47 - jc := &JetstreamClient{ 48 - cfg: cfg, 49 - client: client, 50 - reconnectCh: make(chan struct{}, 1), 51 - } 52 - 53 - h.jc = jc 54 - 55 - go func() { 56 - lastTimeUs := h.getLastTimeUs(ctx) 57 - for len(h.jc.cfg.WantedDids) == 0 { 58 - time.Sleep(time.Second) 59 - } 60 - h.connectAndRead(ctx, &lastTimeUs) 61 - }() 62 - return nil 63 - } 64 - 65 - func (h *Handle) connectAndRead(ctx context.Context, cursor *int64) { 66 - l := log.FromContext(ctx) 67 - for { 68 - select { 69 - case <-h.jc.reconnectCh: 70 - l.Info("(re)connecting jetstream client") 71 - h.jc.client.Scheduler.Shutdown() 72 - if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil { 73 - l.Error("error reading jetstream", "error", err) 74 - } 75 - default: 76 - if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil { 77 - l.Error("error reading jetstream", "error", err) 78 - } 79 - } 80 - } 81 - } 82 - 83 - func (j *JetstreamClient) AddDid(did string) { 84 - j.mu.Lock() 85 - j.cfg.WantedDids = append(j.cfg.WantedDids, did) 86 - j.mu.Unlock() 87 - j.reconnectCh <- struct{}{} 88 - } 89 - 90 - func (j *JetstreamClient) UpdateDids(dids []string) { 91 - j.mu.Lock() 92 - j.cfg.WantedDids = dids 93 - j.mu.Unlock() 94 - j.reconnectCh <- struct{}{} 95 - } 96 - 97 - func (h *Handle) getLastTimeUs(ctx context.Context) int64 { 98 - l := log.FromContext(ctx) 99 - lastTimeUs, err := h.db.GetLastTimeUs() 100 - if err != nil { 101 - l.Warn("couldn't get last time us, starting from now", "error", err) 102 - lastTimeUs = time.Now().UnixMicro() 103 - err = h.db.SaveLastTimeUs(lastTimeUs) 104 - if err != nil { 105 - l.Error("failed to save last time us") 106 - } 107 - } 108 - 109 - // If last time is older than a week, start from now 110 - if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 111 - lastTimeUs = time.Now().UnixMicro() 112 - l.Warn("last time us is older than a week. discarding that and starting from now") 113 - err = h.db.SaveLastTimeUs(lastTimeUs) 114 - if err != nil { 115 - l.Error("failed to save last time us") 116 - } 117 - } 118 - 119 - l.Info("found last time_us", "time_us", lastTimeUs) 120 - return lastTimeUs 121 - } 122 17 123 18 func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 124 19 l := log.FromContext(ctx)