A locally focused bluesky appview

add support for jetstream backend and multiple sync backends

+38
README.md
··· 201 201 202 202 It will take a minute but it should pull all records from that user. 203 203 204 + ## Upstream Firehose Configuration 205 + 206 + Konbini supports both standard firehose endpoints as well as jetstream. If 207 + bandwidth and CPU usage is a concern, and you trust the jetstream endpoint, 208 + then it may be worth trying that out. 209 + 210 + The configuration file is formatted as follows: 211 + 212 + ```json 213 + { 214 + "backends": [ 215 + { 216 + "type": "jetstream", 217 + "host": "jetstream1.us-west.bsky.network" 218 + } 219 + ] 220 + } 221 + ``` 222 + 223 + The default (implicit) configuration file looks like this: 224 + 225 + ```json 226 + { 227 + "backends": [ 228 + { 229 + "type": "firehose", 230 + "host": "bsky.network" 231 + } 232 + ] 233 + } 234 + ``` 235 + 236 + Note that this is an array of backends, you can specify multiple upstreams, and 237 + konbini will read from all of them. The main intended purpose of this is to be 238 + able to subscribe directly to PDSs. PDSs currently only support the full 239 + firehose endpoint, not jetstream, so be sure to specify a type of "firehose" 240 + for individual PDS endpoints. 241 + 204 242 ## License 205 243 206 244 MIT (whyrusleeping)
+68
backend/events.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 + "encoding/json" 6 7 "fmt" 7 8 "log/slog" 8 9 "strings" ··· 11 12 "github.com/bluesky-social/indigo/api/atproto" 12 13 "github.com/bluesky-social/indigo/api/bsky" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 + lexutil "github.com/bluesky-social/indigo/lex/util" 14 16 "github.com/bluesky-social/indigo/repo" 17 + jsmodels "github.com/bluesky-social/jetstream/pkg/models" 15 18 "github.com/ipfs/go-cid" 16 19 "github.com/jackc/pgx/v5/pgconn" 17 20 "github.com/prometheus/client_golang/prometheus" ··· 63 66 return fmt.Errorf("failed to update rev: %w", err) 64 67 } 65 68 */ 69 + 70 + return nil 71 + } 72 + 73 + func cborBytesFromEvent(evt *jsmodels.Event) ([]byte, error) { 74 + val, err := lexutil.NewFromType(evt.Commit.Collection) 75 + if err != nil { 76 + return nil, fmt.Errorf("failed to load event record type: %w", err) 77 + } 78 + 79 + if err := json.Unmarshal(evt.Commit.Record, val); err != nil { 80 + return nil, err 81 + } 82 + 83 + cval, ok := val.(lexutil.CBOR) 84 + if !ok { 85 + return nil, fmt.Errorf("decoded type was not cbor marshalable") 86 + } 87 + 88 + buf := new(bytes.Buffer) 89 + if err := cval.MarshalCBOR(buf); err != nil { 90 + return nil, fmt.Errorf("failed to marshal event to cbor: %w", err) 91 + } 92 + 93 + rec := buf.Bytes() 94 + return rec, nil 95 + } 96 + 97 + func (b *PostgresBackend) HandleEventJetstream(ctx context.Context, evt *jsmodels.Event) error { 98 + 99 + path := evt.Commit.Collection + "/" + evt.Commit.RKey 100 + switch evt.Commit.Operation { 101 + case jsmodels.CommitOperationCreate: 102 + rec, err := cborBytesFromEvent(evt) 103 + if err != nil { 104 + return err 105 + } 106 + 107 + c, err := cid.Decode(evt.Commit.CID) 108 + if err != nil { 109 + return err 110 + } 111 + 112 + if err := b.HandleCreate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil { 113 + return fmt.Errorf("create record failed: %w", err) 114 + } 115 + case jsmodels.CommitOperationUpdate: 116 + rec, err := cborBytesFromEvent(evt) 117 + if err != nil { 118 + return err 119 + } 120 + 121 + c, err := cid.Decode(evt.Commit.CID) 122 + if err != nil { 123 + return err 124 + } 125 + 126 + if err := b.HandleUpdate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil { 127 + return fmt.Errorf("update record failed: %w", err) 128 + } 129 + case jsmodels.CommitOperationDelete: 130 + if err := b.HandleDelete(ctx, evt.Did, evt.Commit.Rev, path); err != nil { 131 + return fmt.Errorf("delete record failed: %w", err) 132 + } 133 + } 66 134 67 135 return nil 68 136 }
+6 -5
go.mod
··· 3 3 go 1.25.1 4 4 5 5 require ( 6 - github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f 6 + github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe 7 + github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 7 8 github.com/gorilla/websocket v1.5.1 8 9 github.com/hashicorp/golang-lru/v2 v2.0.7 9 10 github.com/ipfs/go-cid v0.4.1 ··· 60 61 github.com/ipfs/go-metrics-interface v0.0.1 // indirect 61 62 github.com/ipfs/go-peertaskqueue v0.8.1 // indirect 62 63 github.com/ipfs/go-verifcid v0.0.3 // indirect 63 - github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 // indirect 64 + github.com/ipld/go-car v0.6.2 // indirect 64 65 github.com/ipld/go-codec-dagpb v1.6.0 // indirect 65 66 github.com/ipld/go-ipld-prime v0.21.0 // indirect 66 67 github.com/jackc/pgpassfile v1.0.0 // indirect ··· 69 70 github.com/jbenet/goprocess v0.1.4 // indirect 70 71 github.com/jinzhu/inflection v1.0.0 // indirect 71 72 github.com/jinzhu/now v1.1.5 // indirect 72 - github.com/klauspost/compress v1.17.3 // indirect 73 + github.com/klauspost/compress v1.17.9 // indirect 73 74 github.com/klauspost/cpuid/v2 v2.2.7 // indirect 74 75 github.com/lestrrat-go/blackmagic v1.0.1 // indirect 75 76 github.com/lestrrat-go/httpcc v1.0.1 // indirect ··· 91 92 github.com/orandin/slog-gorm v1.3.2 // indirect 92 93 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect 93 94 github.com/prometheus/client_model v0.6.1 // indirect 94 - github.com/prometheus/common v0.48.0 // indirect 95 - github.com/prometheus/procfs v0.12.0 // indirect 95 + github.com/prometheus/common v0.54.0 // indirect 96 + github.com/prometheus/procfs v0.15.1 // indirect 96 97 github.com/redis/go-redis/v9 v9.3.0 // indirect 97 98 github.com/russross/blackfriday/v2 v2.1.0 // indirect 98 99 github.com/segmentio/asm v1.2.0 // indirect
+12 -10
go.sum
··· 6 6 github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= 7 7 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= 8 8 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= 9 - github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f h1:FugOoTzh0nCMTWGqNGsjttFWVPcwxaaGD3p/nE9V8qY= 10 - github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f/go.mod h1:n6QE1NDPFoi7PRbMUZmc2y7FibCqiVU4ePpsvhHUBR8= 9 + github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe h1:VBhaqE5ewQgXbY5SfSWFZC/AwHFo7cHxZKFYi2ce9Yo= 10 + github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe/go.mod h1:RuQVrCGm42QNsgumKaR6se+XkFKfCPNwdCiTvqKRUck= 11 + github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 h1:ovcRKN1iXZnY5WApVg+0Hw2RkwMH0ziA7lSAA8vellU= 12 + github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1/go.mod h1:5PtGi4r/PjEVBBl+0xWuQn4mBEjr9h6xsfDBADS6cHs= 11 13 github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous= 12 14 github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c= 13 15 github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= ··· 158 160 github.com/ipfs/go-peertaskqueue v0.8.1/go.mod h1:Oxxd3eaK279FxeydSPPVGHzbwVeHjatZ2GA8XD+KbPU= 159 161 github.com/ipfs/go-verifcid v0.0.3 h1:gmRKccqhWDocCRkC+a59g5QW7uJw5bpX9HWBevXa0zs= 160 162 github.com/ipfs/go-verifcid v0.0.3/go.mod h1:gcCtGniVzelKrbk9ooUSX/pM3xlH73fZZJDzQJRvOUw= 161 - github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 h1:oFo19cBmcP0Cmg3XXbrr0V/c+xU9U1huEZp8+OgBzdI= 162 - github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4/go.mod h1:6nkFF8OmR5wLKBzRKi7/YFJpyYR7+oEn1DX+mMWnlLA= 163 + github.com/ipld/go-car v0.6.2 h1:Hlnl3Awgnq8icK+ze3iRghk805lu8YNq3wlREDTF2qc= 164 + github.com/ipld/go-car v0.6.2/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8= 163 165 github.com/ipld/go-car/v2 v2.13.1 h1:KnlrKvEPEzr5IZHKTXLAEub+tPrzeAFQVRlSQvuxBO4= 164 166 github.com/ipld/go-car/v2 v2.13.1/go.mod h1:QkdjjFNGit2GIkpQ953KBwowuoukoM75nP/JI1iDJdo= 165 167 github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= ··· 188 190 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= 189 191 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= 190 192 github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= 191 - github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= 192 - github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= 193 + github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= 194 + github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= 193 195 github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= 194 196 github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= 195 197 github.com/koron/go-ssdp v0.0.3 h1:JivLMY45N76b4p/vsWGOKewBQu6uf39y8l+AQ7sDKx8= ··· 312 314 github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= 313 315 github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= 314 316 github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= 315 - github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= 316 - github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= 317 - github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= 318 - github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= 317 + github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= 318 + github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ= 319 + github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= 320 + github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= 319 321 github.com/redis/go-redis/v9 v9.0.0-rc.4/go.mod h1:Vo3EsyWnicKnSKCA7HhgnvnyA74wOA69Cd2Meli5mmA= 320 322 github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0= 321 323 github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
+1 -1
hydration/post.go
··· 60 60 if dbPost.NotFound || len(dbPost.Raw) == 0 { 61 61 if autoFetch { 62 62 h.AddMissingRecord(uri, true) 63 - if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ? `, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil { 63 + if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ?`, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil { 64 64 return nil, fmt.Errorf("failed to query post: %w", err) 65 65 } 66 66 if dbPost.NotFound || len(dbPost.Raw) == 0 {
+36 -97
main.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 + "encoding/json" 6 7 "fmt" 7 8 "log" 8 9 "log/slog" ··· 19 20 "github.com/bluesky-social/indigo/atproto/identity" 20 21 "github.com/bluesky-social/indigo/atproto/identity/redisdir" 21 22 "github.com/bluesky-social/indigo/atproto/syntax" 22 - "github.com/bluesky-social/indigo/cmd/relay/stream" 23 - "github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel" 24 23 "github.com/bluesky-social/indigo/repo" 25 24 "github.com/bluesky-social/indigo/util/cliutil" 26 25 xrpclib "github.com/bluesky-social/indigo/xrpc" 27 - "github.com/gorilla/websocket" 28 26 "github.com/ipfs/go-cid" 29 27 "github.com/jackc/pgx/v5/pgxpool" 30 28 "github.com/prometheus/client_golang/prometheus" ··· 70 68 }, 71 69 &cli.StringFlag{ 72 70 Name: "redis-url", 71 + }, 72 + &cli.StringFlag{ 73 + Name: "sync-config", 73 74 }, 74 75 } 75 76 app.Action = func(cctx *cli.Context) error { ··· 202 203 203 204 db: db, 204 205 } 205 - fmt.Println("MY DID: ", s.mydid) 206 206 207 207 pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, dir) 208 208 if err != nil { ··· 241 241 http.ListenAndServe(":4445", nil) 242 242 }() 243 243 244 - seqno, err := loadLastSeq(db, "firehose_seq") 245 - if err != nil { 246 - fmt.Println("failed to load sequence number, starting over", err) 244 + sc := SyncConfig{ 245 + Backends: []SyncBackend{ 246 + { 247 + Type: "firehose", 248 + Host: "bsky.network", 249 + }, 250 + }, 251 + } 252 + 253 + if scfn := cctx.String("sync-config"); scfn != "" { 254 + { 255 + scfi, err := os.Open(scfn) 256 + if err != nil { 257 + return err 258 + } 259 + defer scfi.Close() 260 + 261 + var lsc SyncConfig 262 + if err := json.NewDecoder(scfi).Decode(&lsc); err != nil { 263 + return err 264 + } 265 + sc = lsc 266 + } 247 267 } 248 268 249 - return s.startLiveTail(ctx, int(seqno), 10, 20) 269 + /* 270 + sc.Backends[0] = SyncBackend{ 271 + Type: "jetstream", 272 + Host: "jetstream1.us-west.bsky.network", 273 + } 274 + */ 275 + 276 + return s.StartSyncEngine(ctx, &sc) 277 + 250 278 } 251 279 252 280 app.RunAndExitOnError() ··· 272 300 func (s *Server) getXrpcClient() (*xrpclib.Client, error) { 273 301 // TODO: handle refreshing the token periodically 274 302 return s.client, nil 275 - } 276 - 277 - func (s *Server) startLiveTail(ctx context.Context, curs int, parWorkers, maxQ int) error { 278 - slog.Info("starting live tail") 279 - 280 - // Connect to the Relay websocket 281 - urlStr := fmt.Sprintf("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", curs) 282 - 283 - d := websocket.DefaultDialer 284 - con, _, err := d.Dial(urlStr, http.Header{ 285 - "User-Agent": []string{"market/0.0.1"}, 286 - }) 287 - if err != nil { 288 - return fmt.Errorf("failed to connect to relay: %w", err) 289 - } 290 - 291 - var lelk sync.Mutex 292 - lastEvent := time.Now() 293 - 294 - go func() { 295 - for range time.Tick(time.Second) { 296 - lelk.Lock() 297 - let := lastEvent 298 - lelk.Unlock() 299 - 300 - if time.Since(let) > time.Second*30 { 301 - slog.Error("firehose connection timed out") 302 - con.Close() 303 - return 304 - } 305 - 306 - } 307 - 308 - }() 309 - 310 - var cclk sync.Mutex 311 - var completeCursor int64 312 - 313 - rsc := &stream.RepoStreamCallbacks{ 314 - RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 315 - ctx := context.Background() 316 - 317 - firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq)) 318 - 319 - s.seqLk.Lock() 320 - if evt.Seq > s.lastSeq { 321 - curs = int(evt.Seq) 322 - s.lastSeq = evt.Seq 323 - 324 - if evt.Seq%1000 == 0 { 325 - if err := storeLastSeq(s.db, "firehose_seq", evt.Seq); err != nil { 326 - fmt.Println("failed to store seqno: ", err) 327 - } 328 - } 329 - } 330 - s.seqLk.Unlock() 331 - 332 - lelk.Lock() 333 - lastEvent = time.Now() 334 - lelk.Unlock() 335 - 336 - if err := s.backend.HandleEvent(ctx, evt); err != nil { 337 - return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err) 338 - } 339 - 340 - cclk.Lock() 341 - if evt.Seq > completeCursor { 342 - completeCursor = evt.Seq 343 - firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq)) 344 - } 345 - cclk.Unlock() 346 - 347 - return nil 348 - }, 349 - RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error { 350 - return nil 351 - }, 352 - // TODO: all the other event types 353 - Error: func(errf *stream.ErrorFrame) error { 354 - return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message) 355 - }, 356 - } 357 - 358 - sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler) 359 - 360 - //s.eventScheduler = sched 361 - //s.streamFinished = make(chan struct{}) 362 - 363 - return stream.HandleRepoStream(ctx, con, sched, slog.Default()) 364 303 } 365 304 366 305 func (s *Server) resolveAccountIdent(ctx context.Context, acc string) (string, error) {
+8
sync-config-jetstream.json
··· 1 + { 2 + "backends": [ 3 + { 4 + "type": "jetstream", 5 + "host": "jetstream1.us-west.bsky.network" 6 + } 7 + ] 8 + }
+281
sync.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "net/http" 8 + "sync" 9 + "time" 10 + 11 + "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/cmd/relay/stream" 13 + "github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel" 14 + jsclient "github.com/bluesky-social/jetstream/pkg/client" 15 + jsparallel "github.com/bluesky-social/jetstream/pkg/client/schedulers/parallel" 16 + "github.com/bluesky-social/jetstream/pkg/models" 17 + "github.com/gorilla/websocket" 18 + ) 19 + 20 + type SyncConfig struct { 21 + Backends []SyncBackend `json:"backends"` 22 + } 23 + 24 + type SyncBackend struct { 25 + Type string `json:"type"` 26 + Host string `json:"host"` 27 + MaxWorkers int `json:"max_workers,omitempty"` 28 + } 29 + 30 + func (s *Server) StartSyncEngine(ctx context.Context, sc *SyncConfig) error { 31 + for _, be := range sc.Backends { 32 + switch be.Type { 33 + case "firehose": 34 + go s.runSyncFirehose(ctx, be) 35 + case "jetstream": 36 + go s.runSyncJetstream(ctx, be) 37 + default: 38 + return fmt.Errorf("unrecognized sync backend type: %q", be.Type) 39 + } 40 + } 41 + 42 + <-ctx.Done() 43 + return fmt.Errorf("exiting sync routine") 44 + } 45 + 46 + const failureTimeInterval = time.Second * 5 47 + 48 + func (s *Server) runSyncFirehose(ctx context.Context, be SyncBackend) { 49 + var failures int 50 + for { 51 + seqno, err := loadLastSeq(s.db, be.Host) 52 + if err != nil { 53 + fmt.Println("failed to load sequence number, starting over", err) 54 + } 55 + 56 + maxWorkers := 10 57 + if be.MaxWorkers != 0 { 58 + maxWorkers = be.MaxWorkers 59 + } 60 + 61 + start := time.Now() 62 + if err := s.startLiveTail(ctx, be.Host, int(seqno), maxWorkers, 20); err != nil { 63 + slog.Error("firehose connection lost", "host", be.Host, "error", err) 64 + } 65 + 66 + elapsed := time.Since(start) 67 + 68 + if elapsed > failureTimeInterval { 69 + failures = 0 70 + continue 71 + } 72 + failures++ 73 + 74 + delay := delayForFailureCount(failures) 75 + slog.Warn("retrying connection after delay", "host", be.Host, "delay", delay) 76 + } 77 + } 78 + 79 + func (s *Server) runSyncJetstream(ctx context.Context, be SyncBackend) { 80 + var failures int 81 + for { 82 + // Load last cursor (stored as sequence number in same table) 83 + cursor, err := loadLastSeq(s.db, be.Host) 84 + if err != nil { 85 + slog.Warn("failed to load jetstream cursor, starting from live", "error", err) 86 + cursor = 0 87 + } 88 + 89 + maxWorkers := 10 90 + if be.MaxWorkers != 0 { 91 + maxWorkers = be.MaxWorkers 92 + } 93 + 94 + start := time.Now() 95 + if err := s.startJetstreamTail(ctx, be.Host, cursor, maxWorkers); err != nil { 96 + slog.Error("jetstream connection lost", "host", be.Host, "error", err) 97 + } 98 + 99 + elapsed := time.Since(start) 100 + 101 + if elapsed > failureTimeInterval { 102 + failures = 0 103 + continue 104 + } 105 + failures++ 106 + 107 + delay := delayForFailureCount(failures) 108 + slog.Warn("retrying jetstream connection after delay", "host", be.Host, "delay", delay) 109 + time.Sleep(delay) 110 + } 111 + } 112 + 113 + func delayForFailureCount(n int) time.Duration { 114 + if n < 5 { 115 + return (time.Second * 5) + (time.Second * 2 * time.Duration(n)) 116 + } 117 + 118 + return time.Second * 30 119 + } 120 + 121 + func (s *Server) startLiveTail(ctx context.Context, host string, curs int, parWorkers, maxQ int) error { 122 + ctx, cancel := context.WithCancel(ctx) 123 + defer cancel() 124 + 125 + slog.Info("starting live tail") 126 + 127 + // Connect to the Relay websocket 128 + urlStr := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", host, curs) 129 + 130 + d := websocket.DefaultDialer 131 + con, _, err := d.Dial(urlStr, http.Header{ 132 + "User-Agent": []string{"konbini/0.0.1"}, 133 + }) 134 + if err != nil { 135 + return fmt.Errorf("failed to connect to relay: %w", err) 136 + } 137 + 138 + var lelk sync.Mutex 139 + lastEvent := time.Now() 140 + 141 + go func() { 142 + tick := time.NewTicker(time.Second) 143 + defer tick.Stop() 144 + for { 145 + select { 146 + case <-tick.C: 147 + lelk.Lock() 148 + let := lastEvent 149 + lelk.Unlock() 150 + 151 + if time.Since(let) > time.Second*30 { 152 + slog.Error("firehose connection timed out") 153 + con.Close() 154 + return 155 + } 156 + case <-ctx.Done(): 157 + return 158 + } 159 + } 160 + }() 161 + 162 + var cclk sync.Mutex 163 + var completeCursor int64 164 + 165 + rsc := &stream.RepoStreamCallbacks{ 166 + RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 167 + ctx := context.Background() 168 + 169 + firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq)) 170 + 171 + s.seqLk.Lock() 172 + if evt.Seq > s.lastSeq { 173 + curs = int(evt.Seq) 174 + s.lastSeq = evt.Seq 175 + 176 + if evt.Seq%1000 == 0 { 177 + if err := storeLastSeq(s.db, host, evt.Seq); err != nil { 178 + fmt.Println("failed to store seqno: ", err) 179 + } 180 + } 181 + } 182 + s.seqLk.Unlock() 183 + 184 + lelk.Lock() 185 + lastEvent = time.Now() 186 + lelk.Unlock() 187 + 188 + if err := s.backend.HandleEvent(ctx, evt); err != nil { 189 + return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err) 190 + } 191 + 192 + cclk.Lock() 193 + if evt.Seq > completeCursor { 194 + completeCursor = evt.Seq 195 + firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq)) 196 + } 197 + cclk.Unlock() 198 + 199 + return nil 200 + }, 201 + RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error { 202 + return nil 203 + }, 204 + // TODO: all the other event types 205 + Error: func(errf *stream.ErrorFrame) error { 206 + return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message) 207 + }, 208 + } 209 + 210 + sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler) 211 + 212 + return stream.HandleRepoStream(ctx, con, sched, slog.Default()) 213 + } 214 + 215 + func (s *Server) startJetstreamTail(ctx context.Context, host string, cursor int64, parWorkers int) error { 216 + ctx, cancel := context.WithCancel(ctx) 217 + defer cancel() 218 + 219 + slog.Info("starting jetstream tail", "host", host, "cursor", cursor) 220 + 221 + // Create a scheduler for parallel processing 222 + lastStored := int64(0) 223 + sched := jsparallel.NewScheduler( 224 + parWorkers, 225 + host, 226 + slog.Default(), 227 + func(ctx context.Context, event *models.Event) error { 228 + // Update cursor tracking 229 + s.seqLk.Lock() 230 + if event.TimeUS > s.lastSeq { 231 + s.lastSeq = event.TimeUS 232 + if event.TimeUS-lastStored > 1_000_000 { 233 + // Store checkpoint periodically 234 + if err := storeLastSeq(s.db, host, event.TimeUS); err != nil { 235 + slog.Error("failed to store jetstream cursor", "error", err) 236 + } 237 + lastStored = event.TimeUS 238 + } 239 + } 240 + s.seqLk.Unlock() 241 + 242 + // Update metrics 243 + firehoseCursorGauge.WithLabelValues("ingest").Set(float64(event.TimeUS)) 244 + 245 + // Convert Jetstream event to ATProto event format 246 + if event.Commit != nil { 247 + 248 + if err := s.backend.HandleEventJetstream(ctx, event); err != nil { 249 + return fmt.Errorf("handle event (%s,%d): %w", event.Did, event.TimeUS, err) 250 + } 251 + 252 + firehoseCursorGauge.WithLabelValues("complete").Set(float64(event.TimeUS)) 253 + } 254 + 255 + return nil 256 + }, 257 + ) 258 + 259 + // Configure Jetstream client 260 + config := jsclient.DefaultClientConfig() 261 + config.WebsocketURL = fmt.Sprintf("wss://%s/subscribe", host) 262 + 263 + // Prepare cursor pointer 264 + var cursorPtr *int64 265 + if cursor > 0 { 266 + cursorPtr = &cursor 267 + } 268 + 269 + // Create and connect client 270 + client, err := jsclient.NewClient( 271 + config, 272 + slog.Default(), 273 + sched, 274 + ) 275 + if err != nil { 276 + return fmt.Errorf("create jetstream client: %w", err) 277 + } 278 + 279 + // Start reading from Jetstream 280 + return client.ConnectAndRead(ctx, cursorPtr) 281 + }