forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
Monorepo for Tangled
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package jetstream
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/jetstream/pkg/client"
11 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
12 "github.com/bluesky-social/jetstream/pkg/models"
13 "tangled.sh/tangled.sh/core/log"
14)
15
16type DB interface {
17 GetLastTimeUs() (int64, error)
18 SaveLastTimeUs(int64) error
19 UpdateLastTimeUs(int64) error
20}
21
22type JetstreamClient struct {
23 cfg *client.ClientConfig
24 client *client.Client
25 ident string
26 l *slog.Logger
27
28 db DB
29 waitForDid bool
30 mu sync.RWMutex
31
32 cancel context.CancelFunc
33 cancelMu sync.Mutex
34}
35
36func (j *JetstreamClient) AddDid(did string) {
37 if did == "" {
38 return
39 }
40 j.mu.Lock()
41 j.cfg.WantedDids = append(j.cfg.WantedDids, did)
42 j.mu.Unlock()
43}
44
45func (j *JetstreamClient) UpdateDids(dids []string) {
46 j.mu.Lock()
47 for _, did := range dids {
48 if did != "" {
49 j.cfg.WantedDids = append(j.cfg.WantedDids, did)
50 }
51 }
52 j.mu.Unlock()
53
54 j.cancelMu.Lock()
55 if j.cancel != nil {
56 j.cancel()
57 }
58 j.cancelMu.Unlock()
59}
60
61func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid bool) (*JetstreamClient, error) {
62 if cfg == nil {
63 cfg = client.DefaultClientConfig()
64 cfg.WebsocketURL = endpoint
65 cfg.WantedCollections = collections
66 }
67
68 return &JetstreamClient{
69 cfg: cfg,
70 ident: ident,
71 db: db,
72 l: logger,
73
74 // This will make the goroutine in StartJetstream wait until
75 // cfg.WantedDids has been populated, typically using UpdateDids.
76 waitForDid: waitForDid,
77 }, nil
78}
79
80// StartJetstream starts the jetstream client and processes events using the provided processFunc.
81// The caller is responsible for saving the last time_us to the database (just use your db.SaveLastTimeUs).
82func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
83 logger := j.l
84
85 sched := sequential.NewScheduler(j.ident, logger, processFunc)
86
87 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
88 if err != nil {
89 return fmt.Errorf("failed to create jetstream client: %w", err)
90 }
91 j.client = client
92
93 go func() {
94 if j.waitForDid {
95 for len(j.cfg.WantedDids) == 0 {
96 time.Sleep(time.Second)
97 }
98 }
99 logger.Info("done waiting for did")
100 j.connectAndRead(ctx)
101 }()
102
103 return nil
104}
105
106func (j *JetstreamClient) connectAndRead(ctx context.Context) {
107 l := log.FromContext(ctx)
108 for {
109 cursor := j.getLastTimeUs(ctx)
110
111 connCtx, cancel := context.WithCancel(ctx)
112 j.cancelMu.Lock()
113 j.cancel = cancel
114 j.cancelMu.Unlock()
115
116 if err := j.client.ConnectAndRead(connCtx, cursor); err != nil {
117 l.Error("error reading jetstream", "error", err)
118 cancel()
119 continue
120 }
121
122 select {
123 case <-ctx.Done():
124 l.Info("context done, stopping jetstream")
125 return
126 case <-connCtx.Done():
127 l.Info("connection context done, reconnecting")
128 continue
129 }
130 }
131}
132
133func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 {
134 l := log.FromContext(ctx)
135 lastTimeUs, err := j.db.GetLastTimeUs()
136 if err != nil {
137 l.Warn("couldn't get last time us, starting from now", "error", err)
138 lastTimeUs = time.Now().UnixMicro()
139 err = j.db.SaveLastTimeUs(lastTimeUs)
140 if err != nil {
141 l.Error("failed to save last time us", "error", err)
142 }
143 }
144
145 // If last time is older than a week, start from now
146 if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 {
147 lastTimeUs = time.Now().UnixMicro()
148 l.Warn("last time us is older than 2 days; discarding that and starting from now")
149 err = j.db.UpdateLastTimeUs(lastTimeUs)
150 if err != nil {
151 l.Error("failed to save last time us", "error", err)
152 }
153 }
154
155 l.Info("found last time_us", "time_us", lastTimeUs)
156 return &lastTimeUs
157}