A locally focused bluesky appview
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "net/http"
8 "sync"
9 "time"
10
11 "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/cmd/relay/stream"
13 "github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel"
14 jsclient "github.com/bluesky-social/jetstream/pkg/client"
15 jsparallel "github.com/bluesky-social/jetstream/pkg/client/schedulers/parallel"
16 "github.com/bluesky-social/jetstream/pkg/models"
17 "github.com/gorilla/websocket"
18)
19
20type SyncConfig struct {
21 Backends []SyncBackend `json:"backends"`
22}
23
24type SyncBackend struct {
25 Type string `json:"type"`
26 Host string `json:"host"`
27 MaxWorkers int `json:"max_workers,omitempty"`
28}
29
30func (s *Server) StartSyncEngine(ctx context.Context, sc *SyncConfig) error {
31 for _, be := range sc.Backends {
32 switch be.Type {
33 case "firehose":
34 go s.runSyncFirehose(ctx, be)
35 case "jetstream":
36 go s.runSyncJetstream(ctx, be)
37 default:
38 return fmt.Errorf("unrecognized sync backend type: %q", be.Type)
39 }
40 }
41
42 <-ctx.Done()
43 return fmt.Errorf("exiting sync routine")
44}
45
46const failureTimeInterval = time.Second * 5
47
48func (s *Server) runSyncFirehose(ctx context.Context, be SyncBackend) {
49 var failures int
50 for {
51 seqno, err := loadLastSeq(s.db, be.Host)
52 if err != nil {
53 fmt.Println("failed to load sequence number, starting over", err)
54 }
55
56 maxWorkers := 10
57 if be.MaxWorkers != 0 {
58 maxWorkers = be.MaxWorkers
59 }
60
61 start := time.Now()
62 if err := s.startLiveTail(ctx, be.Host, int(seqno), maxWorkers, 20); err != nil {
63 slog.Error("firehose connection lost", "host", be.Host, "error", err)
64 }
65
66 elapsed := time.Since(start)
67
68 if elapsed > failureTimeInterval {
69 failures = 0
70 continue
71 }
72 failures++
73
74 delay := delayForFailureCount(failures)
75 slog.Warn("retrying connection after delay", "host", be.Host, "delay", delay)
76 }
77}
78
79func (s *Server) runSyncJetstream(ctx context.Context, be SyncBackend) {
80 var failures int
81 for {
82 // Load last cursor (stored as sequence number in same table)
83 cursor, err := loadLastSeq(s.db, be.Host)
84 if err != nil {
85 slog.Warn("failed to load jetstream cursor, starting from live", "error", err)
86 cursor = 0
87 }
88
89 maxWorkers := 10
90 if be.MaxWorkers != 0 {
91 maxWorkers = be.MaxWorkers
92 }
93
94 start := time.Now()
95 if err := s.startJetstreamTail(ctx, be.Host, cursor, maxWorkers); err != nil {
96 slog.Error("jetstream connection lost", "host", be.Host, "error", err)
97 }
98
99 elapsed := time.Since(start)
100
101 if elapsed > failureTimeInterval {
102 failures = 0
103 continue
104 }
105 failures++
106
107 delay := delayForFailureCount(failures)
108 slog.Warn("retrying jetstream connection after delay", "host", be.Host, "delay", delay)
109 time.Sleep(delay)
110 }
111}
112
113func delayForFailureCount(n int) time.Duration {
114 if n < 5 {
115 return (time.Second * 5) + (time.Second * 2 * time.Duration(n))
116 }
117
118 return time.Second * 30
119}
120
121func (s *Server) startLiveTail(ctx context.Context, host string, curs int, parWorkers, maxQ int) error {
122 ctx, cancel := context.WithCancel(ctx)
123 defer cancel()
124
125 slog.Info("starting live tail")
126
127 // Connect to the Relay websocket
128 urlStr := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", host, curs)
129
130 d := websocket.DefaultDialer
131 con, _, err := d.Dial(urlStr, http.Header{
132 "User-Agent": []string{"konbini/0.0.1"},
133 })
134 if err != nil {
135 return fmt.Errorf("failed to connect to relay: %w", err)
136 }
137
138 var lelk sync.Mutex
139 lastEvent := time.Now()
140
141 go func() {
142 tick := time.NewTicker(time.Second)
143 defer tick.Stop()
144 for {
145 select {
146 case <-tick.C:
147 lelk.Lock()
148 let := lastEvent
149 lelk.Unlock()
150
151 if time.Since(let) > time.Second*30 {
152 slog.Error("firehose connection timed out")
153 con.Close()
154 return
155 }
156 case <-ctx.Done():
157 return
158 }
159 }
160 }()
161
162 var cclk sync.Mutex
163 var completeCursor int64
164
165 rsc := &stream.RepoStreamCallbacks{
166 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
167 ctx := context.Background()
168
169 firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq))
170
171 s.seqLk.Lock()
172 if evt.Seq > s.lastSeq {
173 curs = int(evt.Seq)
174 s.lastSeq = evt.Seq
175
176 if evt.Seq%1000 == 0 {
177 if err := storeLastSeq(s.db, host, evt.Seq); err != nil {
178 fmt.Println("failed to store seqno: ", err)
179 }
180 }
181 }
182 s.seqLk.Unlock()
183
184 lelk.Lock()
185 lastEvent = time.Now()
186 lelk.Unlock()
187
188 if err := s.backend.HandleEvent(ctx, evt); err != nil {
189 return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err)
190 }
191
192 cclk.Lock()
193 if evt.Seq > completeCursor {
194 completeCursor = evt.Seq
195 firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq))
196 }
197 cclk.Unlock()
198
199 return nil
200 },
201 RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error {
202 return nil
203 },
204 // TODO: all the other event types
205 Error: func(errf *stream.ErrorFrame) error {
206 return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message)
207 },
208 }
209
210 sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler)
211
212 return stream.HandleRepoStream(ctx, con, sched, slog.Default())
213}
214
215func (s *Server) startJetstreamTail(ctx context.Context, host string, cursor int64, parWorkers int) error {
216 ctx, cancel := context.WithCancel(ctx)
217 defer cancel()
218
219 slog.Info("starting jetstream tail", "host", host, "cursor", cursor)
220
221 // Create a scheduler for parallel processing
222 lastStored := int64(0)
223 sched := jsparallel.NewScheduler(
224 parWorkers,
225 host,
226 slog.Default(),
227 func(ctx context.Context, event *models.Event) error {
228 // Update cursor tracking
229 s.seqLk.Lock()
230 if event.TimeUS > s.lastSeq {
231 s.lastSeq = event.TimeUS
232 if event.TimeUS-lastStored > 1_000_000 {
233 // Store checkpoint periodically
234 if err := storeLastSeq(s.db, host, event.TimeUS); err != nil {
235 slog.Error("failed to store jetstream cursor", "error", err)
236 }
237 lastStored = event.TimeUS
238 }
239 }
240 s.seqLk.Unlock()
241
242 // Update metrics
243 firehoseCursorGauge.WithLabelValues("ingest").Set(float64(event.TimeUS))
244
245 // Convert Jetstream event to ATProto event format
246 if event.Commit != nil {
247
248 if err := s.backend.HandleEventJetstream(ctx, event); err != nil {
249 return fmt.Errorf("handle event (%s,%d): %w", event.Did, event.TimeUS, err)
250 }
251
252 firehoseCursorGauge.WithLabelValues("complete").Set(float64(event.TimeUS))
253 }
254
255 return nil
256 },
257 )
258
259 // Configure Jetstream client
260 config := jsclient.DefaultClientConfig()
261 config.WebsocketURL = fmt.Sprintf("wss://%s/subscribe", host)
262
263 // Prepare cursor pointer
264 var cursorPtr *int64
265 if cursor > 0 {
266 cursorPtr = &cursor
267 }
268
269 // Create and connect client
270 client, err := jsclient.NewClient(
271 config,
272 slog.Default(),
273 sched,
274 )
275 if err != nil {
276 return fmt.Errorf("create jetstream client: %w", err)
277 }
278
279 // Start reading from Jetstream
280 return client.ConnectAndRead(ctx, cursorPtr)
281}