Monorepo for Tangled tangled.org
1package jetstream 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 "time" 8 9 "github.com/bluesky-social/jetstream/pkg/client" 10 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/sotangled/tangled/log" 13) 14 15type DB interface { 16 GetLastTimeUs() (int64, error) 17 SaveLastTimeUs(int64) error 18} 19 20type JetstreamClient struct { 21 cfg *client.ClientConfig 22 client *client.Client 23 ident string 24 25 db DB 26 reconnectCh chan struct{} 27 waitForDid bool 28 mu sync.RWMutex 29} 30 31func (j *JetstreamClient) AddDid(did string) { 32 j.mu.Lock() 33 j.cfg.WantedDids = append(j.cfg.WantedDids, did) 34 j.mu.Unlock() 35 j.reconnectCh <- struct{}{} 36} 37 38func (j *JetstreamClient) UpdateDids(dids []string) { 39 j.mu.Lock() 40 j.cfg.WantedDids = dids 41 j.mu.Unlock() 42 j.reconnectCh <- struct{}{} 43} 44 45func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB, waitForDid bool) (*JetstreamClient, error) { 46 if cfg == nil { 47 cfg = client.DefaultClientConfig() 48 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 49 cfg.WantedCollections = collections 50 } 51 52 return &JetstreamClient{ 53 cfg: cfg, 54 ident: ident, 55 db: db, 56 57 // This will make the goroutine in StartJetstream wait until 58 // cfg.WantedDids has been populated, typically using UpdateDids. 59 waitForDid: waitForDid, 60 reconnectCh: make(chan struct{}, 1), 61 }, nil 62} 63 64// StartJetstream starts the jetstream client and processes events using the provided processFunc. 65// The caller is responsible for saving the last time_us to the database (just use your db.SaveLastTimeUs). 66func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error { 67 logger := log.FromContext(ctx) 68 69 sched := sequential.NewScheduler(j.ident, logger, processFunc) 70 71 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched) 72 if err != nil { 73 return fmt.Errorf("failed to create jetstream client: %w", err) 74 } 75 j.client = client 76 77 go func() { 78 lastTimeUs := j.getLastTimeUs(ctx) 79 if j.waitForDid { 80 for len(j.cfg.WantedDids) == 0 { 81 time.Sleep(time.Second) 82 } 83 } 84 logger.Info("done waiting for did") 85 j.connectAndRead(ctx, &lastTimeUs) 86 }() 87 88 return nil 89} 90 91func (j *JetstreamClient) connectAndRead(ctx context.Context, cursor *int64) { 92 l := log.FromContext(ctx) 93 for { 94 select { 95 case <-j.reconnectCh: 96 l.Info("(re)connecting jetstream client") 97 j.client.Scheduler.Shutdown() 98 if err := j.client.ConnectAndRead(ctx, cursor); err != nil { 99 l.Error("error reading jetstream", "error", err) 100 } 101 default: 102 if err := j.client.ConnectAndRead(ctx, cursor); err != nil { 103 l.Error("error reading jetstream", "error", err) 104 } 105 } 106 } 107} 108 109func (j *JetstreamClient) getLastTimeUs(ctx context.Context) int64 { 110 l := log.FromContext(ctx) 111 lastTimeUs, err := j.db.GetLastTimeUs() 112 if err != nil { 113 l.Warn("couldn't get last time us, starting from now", "error", err) 114 lastTimeUs = time.Now().UnixMicro() 115 err = j.db.SaveLastTimeUs(lastTimeUs) 116 if err != nil { 117 l.Error("failed to save last time us") 118 } 119 } 120 121 // If last time is older than a week, start from now 122 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 123 lastTimeUs = time.Now().UnixMicro() 124 l.Warn("last time us is older than a week. discarding that and starting from now") 125 err = j.db.SaveLastTimeUs(lastTimeUs) 126 if err != nil { 127 l.Error("failed to save last time us") 128 } 129 } 130 131 l.Info("found last time_us", "time_us", lastTimeUs) 132 return lastTimeUs 133}