forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
this repo has no description
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package jetstream
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "os"
8 "os/signal"
9 "sync"
10 "syscall"
11 "time"
12
13 "github.com/bluesky-social/jetstream/pkg/client"
14 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
15 "github.com/bluesky-social/jetstream/pkg/models"
16 "tangled.sh/tangled.sh/core/log"
17)
18
19type DB interface {
20 GetLastTimeUs() (int64, error)
21 SaveLastTimeUs(int64) error
22}
23
24type Set[T comparable] map[T]struct{}
25
26type JetstreamClient struct {
27 cfg *client.ClientConfig
28 client *client.Client
29 ident string
30 l *slog.Logger
31
32 logDids bool
33 wantedDids Set[string]
34 db DB
35 waitForDid bool
36 mu sync.RWMutex
37
38 cancel context.CancelFunc
39 cancelMu sync.Mutex
40}
41
42func (j *JetstreamClient) AddDid(did string) {
43 if did == "" {
44 return
45 }
46
47 if j.logDids {
48 j.l.Info("adding did to in-memory filter", "did", did)
49 }
50 j.mu.Lock()
51 j.wantedDids[did] = struct{}{}
52 j.mu.Unlock()
53}
54
55func (j *JetstreamClient) RemoveDid(did string) {
56 if did == "" {
57 return
58 }
59
60 if j.logDids {
61 j.l.Info("removing did from in-memory filter", "did", did)
62 }
63 j.mu.Lock()
64 delete(j.wantedDids, did)
65 j.mu.Unlock()
66}
67
68type processor func(context.Context, *models.Event) error
69
70func (j *JetstreamClient) withDidFilter(processFunc processor) processor {
71 // since this closure references j.WantedDids; it should auto-update
72 // existing instances of the closure when j.WantedDids is mutated
73 return func(ctx context.Context, evt *models.Event) error {
74
75 // empty filter => all dids allowed
76 if len(j.wantedDids) == 0 {
77 return processFunc(ctx, evt)
78 }
79
80 if _, ok := j.wantedDids[evt.Did]; ok {
81 return processFunc(ctx, evt)
82 } else {
83 return nil
84 }
85 }
86}
87
88func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid, logDids bool) (*JetstreamClient, error) {
89 if cfg == nil {
90 cfg = client.DefaultClientConfig()
91 cfg.WebsocketURL = endpoint
92 cfg.WantedCollections = collections
93 }
94
95 return &JetstreamClient{
96 cfg: cfg,
97 ident: ident,
98 db: db,
99 l: logger,
100 wantedDids: make(map[string]struct{}),
101
102 logDids: logDids,
103
104 // This will make the goroutine in StartJetstream wait until
105 // j.wantedDids has been populated, typically using addDids.
106 waitForDid: waitForDid,
107 }, nil
108}
109
110// StartJetstream starts the jetstream client and processes events using the provided processFunc.
111// The caller is responsible for saving the last time_us to the database (just use your db.UpdateLastTimeUs).
112func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
113 logger := j.l
114
115 sched := sequential.NewScheduler(j.ident, logger, j.withDidFilter(processFunc))
116
117 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
118 if err != nil {
119 return fmt.Errorf("failed to create jetstream client: %w", err)
120 }
121 j.client = client
122
123 go func() {
124 if j.waitForDid {
125 for len(j.wantedDids) == 0 {
126 time.Sleep(time.Second)
127 }
128 }
129 logger.Info("done waiting for did")
130
131 go j.periodicLastTimeSave(ctx)
132 j.saveIfKilled(ctx)
133
134 j.connectAndRead(ctx)
135 }()
136
137 return nil
138}
139
140func (j *JetstreamClient) connectAndRead(ctx context.Context) {
141 l := log.FromContext(ctx)
142 for {
143 cursor := j.getLastTimeUs(ctx)
144
145 connCtx, cancel := context.WithCancel(ctx)
146 j.cancelMu.Lock()
147 j.cancel = cancel
148 j.cancelMu.Unlock()
149
150 if err := j.client.ConnectAndRead(connCtx, cursor); err != nil {
151 l.Error("error reading jetstream", "error", err)
152 cancel()
153 continue
154 }
155
156 select {
157 case <-ctx.Done():
158 l.Info("context done, stopping jetstream")
159 return
160 case <-connCtx.Done():
161 l.Info("connection context done, reconnecting")
162 continue
163 }
164 }
165}
166
167// save cursor periodically
168func (j *JetstreamClient) periodicLastTimeSave(ctx context.Context) {
169 ticker := time.NewTicker(time.Minute)
170 defer ticker.Stop()
171
172 for {
173 select {
174 case <-ctx.Done():
175 return
176 case <-ticker.C:
177 j.db.SaveLastTimeUs(time.Now().UnixMicro())
178 }
179 }
180}
181
182func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 {
183 l := log.FromContext(ctx)
184 lastTimeUs, err := j.db.GetLastTimeUs()
185 if err != nil {
186 l.Warn("couldn't get last time us, starting from now", "error", err)
187 lastTimeUs = time.Now().UnixMicro()
188 err = j.db.SaveLastTimeUs(lastTimeUs)
189 if err != nil {
190 l.Error("failed to save last time us", "error", err)
191 }
192 }
193
194 // If last time is older than 2 days, start from now
195 if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 {
196 lastTimeUs = time.Now().UnixMicro()
197 l.Warn("last time us is older than 2 days; discarding that and starting from now")
198 err = j.db.SaveLastTimeUs(lastTimeUs)
199 if err != nil {
200 l.Error("failed to save last time us", "error", err)
201 }
202 }
203
204 l.Info("found last time_us", "time_us", lastTimeUs)
205 return &lastTimeUs
206}
207
208func (j *JetstreamClient) saveIfKilled(ctx context.Context) context.Context {
209 ctxWithCancel, cancel := context.WithCancel(ctx)
210
211 sigChan := make(chan os.Signal, 1)
212
213 signal.Notify(sigChan,
214 syscall.SIGINT,
215 syscall.SIGTERM,
216 syscall.SIGQUIT,
217 syscall.SIGHUP,
218 syscall.SIGKILL,
219 syscall.SIGSTOP,
220 )
221
222 go func() {
223 sig := <-sigChan
224 j.l.Info("Received signal, initiating graceful shutdown", "signal", sig)
225
226 lastTimeUs := time.Now().UnixMicro()
227 if err := j.db.SaveLastTimeUs(lastTimeUs); err != nil {
228 j.l.Error("Failed to save last time during shutdown", "error", err)
229 }
230 j.l.Info("Saved lastTimeUs before shutdown", "lastTimeUs", lastTimeUs)
231
232 j.cancelMu.Lock()
233 if j.cancel != nil {
234 j.cancel()
235 }
236 j.cancelMu.Unlock()
237
238 cancel()
239
240 os.Exit(0)
241 }()
242
243 return ctxWithCancel
244}