forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
this repo has no description
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package jetstream
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "os"
8 "os/signal"
9 "sync"
10 "syscall"
11 "time"
12
13 "github.com/bluesky-social/jetstream/pkg/client"
14 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
15 "github.com/bluesky-social/jetstream/pkg/models"
16 "tangled.sh/tangled.sh/core/log"
17)
18
19type DB interface {
20 GetLastTimeUs() (int64, error)
21 SaveLastTimeUs(int64) error
22}
23
24type Set[T comparable] map[T]struct{}
25
26type JetstreamClient struct {
27 cfg *client.ClientConfig
28 client *client.Client
29 ident string
30 l *slog.Logger
31
32 logDids bool
33 wantedDids Set[string]
34 db DB
35 waitForDid bool
36 mu sync.RWMutex
37
38 cancel context.CancelFunc
39 cancelMu sync.Mutex
40}
41
42func (j *JetstreamClient) AddDid(did string) {
43 if did == "" {
44 return
45 }
46
47 if j.logDids {
48 j.l.Info("adding did to in-memory filter", "did", did)
49 }
50 j.mu.Lock()
51 j.wantedDids[did] = struct{}{}
52 j.mu.Unlock()
53}
54
55type processor func(context.Context, *models.Event) error
56
57func (j *JetstreamClient) withDidFilter(processFunc processor) processor {
58 // empty filter => all dids allowed
59 if len(j.wantedDids) == 0 {
60 return processFunc
61 }
62 // since this closure references j.WantedDids; it should auto-update
63 // existing instances of the closure when j.WantedDids is mutated
64 return func(ctx context.Context, evt *models.Event) error {
65 if _, ok := j.wantedDids[evt.Did]; ok {
66 return processFunc(ctx, evt)
67 } else {
68 return nil
69 }
70 }
71}
72
73func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid, logDids bool) (*JetstreamClient, error) {
74 if cfg == nil {
75 cfg = client.DefaultClientConfig()
76 cfg.WebsocketURL = endpoint
77 cfg.WantedCollections = collections
78 }
79
80 return &JetstreamClient{
81 cfg: cfg,
82 ident: ident,
83 db: db,
84 l: logger,
85 wantedDids: make(map[string]struct{}),
86
87 logDids: logDids,
88
89 // This will make the goroutine in StartJetstream wait until
90 // j.wantedDids has been populated, typically using addDids.
91 waitForDid: waitForDid,
92 }, nil
93}
94
95// StartJetstream starts the jetstream client and processes events using the provided processFunc.
96// The caller is responsible for saving the last time_us to the database (just use your db.UpdateLastTimeUs).
97func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
98 logger := j.l
99
100 sched := sequential.NewScheduler(j.ident, logger, j.withDidFilter(processFunc))
101
102 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
103 if err != nil {
104 return fmt.Errorf("failed to create jetstream client: %w", err)
105 }
106 j.client = client
107
108 go func() {
109 if j.waitForDid {
110 for len(j.wantedDids) == 0 {
111 time.Sleep(time.Second)
112 }
113 }
114 logger.Info("done waiting for did")
115
116 go j.periodicLastTimeSave(ctx)
117 j.saveIfKilled(ctx)
118
119 j.connectAndRead(ctx)
120 }()
121
122 return nil
123}
124
125func (j *JetstreamClient) connectAndRead(ctx context.Context) {
126 l := log.FromContext(ctx)
127 for {
128 cursor := j.getLastTimeUs(ctx)
129
130 connCtx, cancel := context.WithCancel(ctx)
131 j.cancelMu.Lock()
132 j.cancel = cancel
133 j.cancelMu.Unlock()
134
135 if err := j.client.ConnectAndRead(connCtx, cursor); err != nil {
136 l.Error("error reading jetstream", "error", err)
137 cancel()
138 continue
139 }
140
141 select {
142 case <-ctx.Done():
143 l.Info("context done, stopping jetstream")
144 return
145 case <-connCtx.Done():
146 l.Info("connection context done, reconnecting")
147 continue
148 }
149 }
150}
151
152// save cursor periodically
153func (j *JetstreamClient) periodicLastTimeSave(ctx context.Context) {
154 ticker := time.NewTicker(time.Minute)
155 defer ticker.Stop()
156
157 for {
158 select {
159 case <-ctx.Done():
160 return
161 case <-ticker.C:
162 j.db.SaveLastTimeUs(time.Now().UnixMicro())
163 }
164 }
165}
166
167func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 {
168 l := log.FromContext(ctx)
169 lastTimeUs, err := j.db.GetLastTimeUs()
170 if err != nil {
171 l.Warn("couldn't get last time us, starting from now", "error", err)
172 lastTimeUs = time.Now().UnixMicro()
173 err = j.db.SaveLastTimeUs(lastTimeUs)
174 if err != nil {
175 l.Error("failed to save last time us", "error", err)
176 }
177 }
178
179 // If last time is older than 2 days, start from now
180 if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 {
181 lastTimeUs = time.Now().UnixMicro()
182 l.Warn("last time us is older than 2 days; discarding that and starting from now")
183 err = j.db.SaveLastTimeUs(lastTimeUs)
184 if err != nil {
185 l.Error("failed to save last time us", "error", err)
186 }
187 }
188
189 l.Info("found last time_us", "time_us", lastTimeUs)
190 return &lastTimeUs
191}
192
193func (j *JetstreamClient) saveIfKilled(ctx context.Context) context.Context {
194 ctxWithCancel, cancel := context.WithCancel(ctx)
195
196 sigChan := make(chan os.Signal, 1)
197
198 signal.Notify(sigChan,
199 syscall.SIGINT,
200 syscall.SIGTERM,
201 syscall.SIGQUIT,
202 syscall.SIGHUP,
203 syscall.SIGKILL,
204 syscall.SIGSTOP,
205 )
206
207 go func() {
208 sig := <-sigChan
209 j.l.Info("Received signal, initiating graceful shutdown", "signal", sig)
210
211 lastTimeUs := time.Now().UnixMicro()
212 if err := j.db.SaveLastTimeUs(lastTimeUs); err != nil {
213 j.l.Error("Failed to save last time during shutdown", "error", err)
214 }
215 j.l.Info("Saved lastTimeUs before shutdown", "lastTimeUs", lastTimeUs)
216
217 j.cancelMu.Lock()
218 if j.cancel != nil {
219 j.cancel()
220 }
221 j.cancelMu.Unlock()
222
223 cancel()
224
225 os.Exit(0)
226 }()
227
228 return ctxWithCancel
229}