Monorepo for Tangled tangled.org
1package jetstream 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 "time" 8 9 "github.com/bluesky-social/jetstream/pkg/client" 10 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/sotangled/tangled/log" 13) 14 15type DB interface { 16 GetLastTimeUs() (int64, error) 17 SaveLastTimeUs(int64) error 18} 19 20type JetstreamClient struct { 21 cfg *client.ClientConfig 22 client *client.Client 23 ident string 24 25 db DB 26 reconnectCh chan struct{} 27 mu sync.RWMutex 28} 29 30func (j *JetstreamClient) AddDid(did string) { 31 j.mu.Lock() 32 j.cfg.WantedDids = append(j.cfg.WantedDids, did) 33 j.mu.Unlock() 34 j.reconnectCh <- struct{}{} 35} 36 37func (j *JetstreamClient) UpdateDids(dids []string) { 38 j.mu.Lock() 39 j.cfg.WantedDids = dids 40 j.mu.Unlock() 41 j.reconnectCh <- struct{}{} 42} 43 44func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB) (*JetstreamClient, error) { 45 if cfg == nil { 46 cfg = client.DefaultClientConfig() 47 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 48 cfg.WantedCollections = collections 49 } 50 51 return &JetstreamClient{ 52 cfg: cfg, 53 ident: ident, 54 db: db, 55 reconnectCh: make(chan struct{}, 1), 56 }, nil 57} 58 59func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error { 60 logger := log.FromContext(ctx) 61 62 pf := func(ctx context.Context, e *models.Event) error { 63 err := processFunc(ctx, e) 64 if err != nil { 65 return err 66 } 67 68 if err := j.db.SaveLastTimeUs(e.TimeUS); err != nil { 69 return err 70 } 71 72 return nil 73 } 74 75 sched := sequential.NewScheduler(j.ident, logger, pf) 76 77 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched) 78 if err != nil { 79 return fmt.Errorf("failed to create jetstream client: %w", err) 80 } 81 j.client = client 82 83 go func() { 84 lastTimeUs := j.getLastTimeUs(ctx) 85 for len(j.cfg.WantedDids) == 0 { 86 time.Sleep(time.Second) 87 } 88 j.connectAndRead(ctx, &lastTimeUs) 89 }() 90 91 return nil 92} 93 94func (j *JetstreamClient) connectAndRead(ctx context.Context, cursor *int64) { 95 l := log.FromContext(ctx) 96 for { 97 select { 98 case <-j.reconnectCh: 99 l.Info("(re)connecting jetstream client") 100 j.client.Scheduler.Shutdown() 101 if err := j.client.ConnectAndRead(ctx, cursor); err != nil { 102 l.Error("error reading jetstream", "error", err) 103 } 104 default: 105 if err := j.client.ConnectAndRead(ctx, cursor); err != nil { 106 l.Error("error reading jetstream", "error", err) 107 } 108 } 109 } 110} 111 112func (j *JetstreamClient) getLastTimeUs(ctx context.Context) int64 { 113 l := log.FromContext(ctx) 114 lastTimeUs, err := j.db.GetLastTimeUs() 115 if err != nil { 116 l.Warn("couldn't get last time us, starting from now", "error", err) 117 lastTimeUs = time.Now().UnixMicro() 118 err = j.db.SaveLastTimeUs(lastTimeUs) 119 if err != nil { 120 l.Error("failed to save last time us") 121 } 122 } 123 124 // If last time is older than a week, start from now 125 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 126 lastTimeUs = time.Now().UnixMicro() 127 l.Warn("last time us is older than a week. discarding that and starting from now") 128 err = j.db.SaveLastTimeUs(lastTimeUs) 129 if err != nil { 130 l.Error("failed to save last time us") 131 } 132 } 133 134 l.Info("found last time_us", "time_us", lastTimeUs) 135 return lastTimeUs 136}