Monorepo for Tangled
tangled.org
1package jetstream
2
3import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/bluesky-social/jetstream/pkg/client"
10 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/sotangled/tangled/log"
13)
14
15type DB interface {
16 GetLastTimeUs() (int64, error)
17 SaveLastTimeUs(int64) error
18}
19
20type JetstreamClient struct {
21 cfg *client.ClientConfig
22 client *client.Client
23 ident string
24
25 db DB
26 reconnectCh chan struct{}
27 mu sync.RWMutex
28}
29
30func (j *JetstreamClient) AddDid(did string) {
31 j.mu.Lock()
32 j.cfg.WantedDids = append(j.cfg.WantedDids, did)
33 j.mu.Unlock()
34 j.reconnectCh <- struct{}{}
35}
36
37func (j *JetstreamClient) UpdateDids(dids []string) {
38 j.mu.Lock()
39 j.cfg.WantedDids = dids
40 j.mu.Unlock()
41 j.reconnectCh <- struct{}{}
42}
43
44func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB) (*JetstreamClient, error) {
45 if cfg == nil {
46 cfg = client.DefaultClientConfig()
47 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
48 cfg.WantedCollections = collections
49 }
50
51 return &JetstreamClient{
52 cfg: cfg,
53 ident: ident,
54 db: db,
55 reconnectCh: make(chan struct{}, 1),
56 }, nil
57}
58
59func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
60 logger := log.FromContext(ctx)
61
62 pf := func(ctx context.Context, e *models.Event) error {
63 err := processFunc(ctx, e)
64 if err != nil {
65 return err
66 }
67
68 if err := j.db.SaveLastTimeUs(e.TimeUS); err != nil {
69 return err
70 }
71
72 return nil
73 }
74
75 sched := sequential.NewScheduler(j.ident, logger, pf)
76
77 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
78 if err != nil {
79 return fmt.Errorf("failed to create jetstream client: %w", err)
80 }
81 j.client = client
82
83 go func() {
84 lastTimeUs := j.getLastTimeUs(ctx)
85 for len(j.cfg.WantedDids) == 0 {
86 time.Sleep(time.Second)
87 }
88 j.connectAndRead(ctx, &lastTimeUs)
89 }()
90
91 return nil
92}
93
94func (j *JetstreamClient) connectAndRead(ctx context.Context, cursor *int64) {
95 l := log.FromContext(ctx)
96 for {
97 select {
98 case <-j.reconnectCh:
99 l.Info("(re)connecting jetstream client")
100 j.client.Scheduler.Shutdown()
101 if err := j.client.ConnectAndRead(ctx, cursor); err != nil {
102 l.Error("error reading jetstream", "error", err)
103 }
104 default:
105 if err := j.client.ConnectAndRead(ctx, cursor); err != nil {
106 l.Error("error reading jetstream", "error", err)
107 }
108 }
109 }
110}
111
112func (j *JetstreamClient) getLastTimeUs(ctx context.Context) int64 {
113 l := log.FromContext(ctx)
114 lastTimeUs, err := j.db.GetLastTimeUs()
115 if err != nil {
116 l.Warn("couldn't get last time us, starting from now", "error", err)
117 lastTimeUs = time.Now().UnixMicro()
118 err = j.db.SaveLastTimeUs(lastTimeUs)
119 if err != nil {
120 l.Error("failed to save last time us")
121 }
122 }
123
124 // If last time is older than a week, start from now
125 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
126 lastTimeUs = time.Now().UnixMicro()
127 l.Warn("last time us is older than a week. discarding that and starting from now")
128 err = j.db.SaveLastTimeUs(lastTimeUs)
129 if err != nil {
130 l.Error("failed to save last time us")
131 }
132 }
133
134 l.Info("found last time_us", "time_us", lastTimeUs)
135 return lastTimeUs
136}