A locally focused bluesky appview

Compare changes

Choose any two refs to compare.

+38
README.md
··· 201 201 202 202 It will take a minute but it should pull all records from that user. 203 203 204 + ## Upstream Firehose Configuration 205 + 206 + Konbini supports both standard firehose endpoints as well as jetstream. If 207 + bandwidth and CPU usage is a concern, and you trust the jetstream endpoint, 208 + then it may be worth trying that out. 209 + 210 + The configuration file is formatted as follows: 211 + 212 + ```json 213 + { 214 + "backends": [ 215 + { 216 + "type": "jetstream", 217 + "host": "jetstream1.us-west.bsky.network" 218 + } 219 + ] 220 + } 221 + ``` 222 + 223 + The default (implicit) configuration file looks like this: 224 + 225 + ```json 226 + { 227 + "backends": [ 228 + { 229 + "type": "firehose", 230 + "host": "bsky.network" 231 + } 232 + ] 233 + } 234 + ``` 235 + 236 + Note that this is an array of backends, you can specify multiple upstreams, and 237 + konbini will read from all of them. The main intended purpose of this is to be 238 + able to subscribe directly to PDSs. PDSs currently only support the full 239 + firehose endpoint, not jetstream, so be sure to specify a type of "firehose" 240 + for individual PDS endpoints. 241 + 204 242 ## License 205 243 206 244 MIT (whyrusleeping)
+55 -7
backend/backend.go
··· 10 10 11 11 "github.com/bluesky-social/indigo/api/atproto" 12 12 "github.com/bluesky-social/indigo/api/bsky" 13 + "github.com/bluesky-social/indigo/atproto/identity" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 14 15 "github.com/bluesky-social/indigo/util" 15 16 "github.com/bluesky-social/indigo/xrpc" ··· 26 27 27 28 // PostgresBackend handles database operations 28 29 type PostgresBackend struct { 29 - db *gorm.DB 30 - pgx *pgxpool.Pool 31 - tracker RecordTracker 30 + db *gorm.DB 31 + pgx *pgxpool.Pool 32 + 33 + dir identity.Directory 32 34 33 35 client *xrpc.Client 34 36 ··· 43 45 repoCache *lru.TwoQueueCache[string, *Repo] 44 46 reposLk sync.Mutex 45 47 48 + didByIDCache *lru.TwoQueueCache[uint, string] 49 + 46 50 postInfoCache *lru.TwoQueueCache[string, cachedPostInfo] 51 + 52 + missingRecords chan MissingRecord 47 53 } 48 54 49 55 type cachedPostInfo struct { ··· 52 58 } 53 59 54 60 // NewPostgresBackend creates a new PostgresBackend 55 - func NewPostgresBackend(mydid string, db *gorm.DB, pgx *pgxpool.Pool, client *xrpc.Client, tracker RecordTracker) (*PostgresBackend, error) { 61 + func NewPostgresBackend(mydid string, db *gorm.DB, pgx *pgxpool.Pool, client *xrpc.Client, dir identity.Directory) (*PostgresBackend, error) { 56 62 rc, _ := lru.New2Q[string, *Repo](1_000_000) 57 63 pc, _ := lru.New2Q[string, cachedPostInfo](1_000_000) 58 64 revc, _ := lru.New2Q[uint, string](1_000_000) 65 + dbic, _ := lru.New2Q[uint, string](1_000_000) 59 66 60 67 b := &PostgresBackend{ 61 68 client: client, 62 69 mydid: mydid, 63 70 db: db, 64 71 pgx: pgx, 65 - tracker: tracker, 66 72 relevantDids: make(map[string]bool), 67 73 repoCache: rc, 68 74 postInfoCache: pc, 69 75 revCache: revc, 76 + didByIDCache: dbic, 77 + dir: dir, 78 + 79 + missingRecords: make(chan MissingRecord, 1000), 70 80 } 71 81 72 82 r, err := b.GetOrCreateRepo(context.TODO(), mydid) ··· 75 85 } 76 86 77 87 b.myrepo = r 88 + 89 + go b.missingRecordFetcher() 78 90 return b, nil 79 91 } 80 92 81 93 // TrackMissingRecord implements the RecordTracker interface 82 94 func (b *PostgresBackend) TrackMissingRecord(identifier string, wait bool) { 83 - if b.tracker != nil { 84 - b.tracker.TrackMissingRecord(identifier, wait) 95 + mr := MissingRecord{ 96 + Type: mrTypeFromIdent(identifier), 97 + Identifier: identifier, 98 + Wait: wait, 99 + } 100 + 101 + b.addMissingRecord(context.TODO(), mr) 102 + } 103 + 104 + func mrTypeFromIdent(ident string) MissingRecordType { 105 + if strings.HasPrefix(ident, "did:") { 106 + return MissingRecordTypeProfile 107 + } 108 + 109 + puri, _ := syntax.ParseATURI(ident) 110 + switch puri.Collection().String() { 111 + case "app.bsky.feed.post": 112 + return MissingRecordTypePost 113 + case "app.bsky.feed.generator": 114 + return MissingRecordTypeFeedGenerator 115 + default: 116 + return MissingRecordTypeUnknown 85 117 } 118 + 86 119 } 87 120 88 121 // DidToID converts a DID to a database ID ··· 363 396 } 364 397 365 398 return &r, nil 399 + } 400 + 401 + func (b *PostgresBackend) DidFromID(ctx context.Context, uid uint) (string, error) { 402 + val, ok := b.didByIDCache.Get(uid) 403 + if ok { 404 + return val, nil 405 + } 406 + 407 + r, err := b.GetRepoByID(ctx, uid) 408 + if err != nil { 409 + return "", err 410 + } 411 + 412 + b.didByIDCache.Add(uid, r.Did) 413 + return r.Did, nil 366 414 } 367 415 368 416 func (b *PostgresBackend) checkPostExists(ctx context.Context, repo *Repo, rkey string) (bool, error) {
+68
backend/events.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 + "encoding/json" 6 7 "fmt" 7 8 "log/slog" 8 9 "strings" ··· 11 12 "github.com/bluesky-social/indigo/api/atproto" 12 13 "github.com/bluesky-social/indigo/api/bsky" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 + lexutil "github.com/bluesky-social/indigo/lex/util" 14 16 "github.com/bluesky-social/indigo/repo" 17 + jsmodels "github.com/bluesky-social/jetstream/pkg/models" 15 18 "github.com/ipfs/go-cid" 16 19 "github.com/jackc/pgx/v5/pgconn" 17 20 "github.com/prometheus/client_golang/prometheus" ··· 63 66 return fmt.Errorf("failed to update rev: %w", err) 64 67 } 65 68 */ 69 + 70 + return nil 71 + } 72 + 73 + func cborBytesFromEvent(evt *jsmodels.Event) ([]byte, error) { 74 + val, err := lexutil.NewFromType(evt.Commit.Collection) 75 + if err != nil { 76 + return nil, fmt.Errorf("failed to load event record type: %w", err) 77 + } 78 + 79 + if err := json.Unmarshal(evt.Commit.Record, val); err != nil { 80 + return nil, err 81 + } 82 + 83 + cval, ok := val.(lexutil.CBOR) 84 + if !ok { 85 + return nil, fmt.Errorf("decoded type was not cbor marshalable") 86 + } 87 + 88 + buf := new(bytes.Buffer) 89 + if err := cval.MarshalCBOR(buf); err != nil { 90 + return nil, fmt.Errorf("failed to marshal event to cbor: %w", err) 91 + } 92 + 93 + rec := buf.Bytes() 94 + return rec, nil 95 + } 96 + 97 + func (b *PostgresBackend) HandleEventJetstream(ctx context.Context, evt *jsmodels.Event) error { 98 + 99 + path := evt.Commit.Collection + "/" + evt.Commit.RKey 100 + switch evt.Commit.Operation { 101 + case jsmodels.CommitOperationCreate: 102 + rec, err := cborBytesFromEvent(evt) 103 + if err != nil { 104 + return err 105 + } 106 + 107 + c, err := cid.Decode(evt.Commit.CID) 108 + if err != nil { 109 + return err 110 + } 111 + 112 + if err := b.HandleCreate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil { 113 + return fmt.Errorf("create record failed: %w", err) 114 + } 115 + case jsmodels.CommitOperationUpdate: 116 + rec, err := cborBytesFromEvent(evt) 117 + if err != nil { 118 + return err 119 + } 120 + 121 + c, err := cid.Decode(evt.Commit.CID) 122 + if err != nil { 123 + return err 124 + } 125 + 126 + if err := b.HandleUpdate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil { 127 + return fmt.Errorf("update record failed: %w", err) 128 + } 129 + case jsmodels.CommitOperationDelete: 130 + if err := b.HandleDelete(ctx, evt.Did, evt.Commit.Rev, path); err != nil { 131 + return fmt.Errorf("delete record failed: %w", err) 132 + } 133 + } 66 134 67 135 return nil 68 136 }
+211
backend/missing.go
··· 1 + package backend 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "log/slog" 8 + 9 + "github.com/bluesky-social/indigo/api/atproto" 10 + "github.com/bluesky-social/indigo/api/bsky" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + xrpclib "github.com/bluesky-social/indigo/xrpc" 13 + "github.com/ipfs/go-cid" 14 + ) 15 + 16 + type MissingRecordType string 17 + 18 + const ( 19 + MissingRecordTypeProfile MissingRecordType = "profile" 20 + MissingRecordTypePost MissingRecordType = "post" 21 + MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator" 22 + MissingRecordTypeUnknown MissingRecordType = "unknown" 23 + ) 24 + 25 + type MissingRecord struct { 26 + Type MissingRecordType 27 + Identifier string // DID for profiles, AT-URI for posts/feedgens 28 + Wait bool 29 + 30 + waitch chan struct{} 31 + } 32 + 33 + func (b *PostgresBackend) addMissingRecord(ctx context.Context, rec MissingRecord) { 34 + if rec.Wait { 35 + rec.waitch = make(chan struct{}) 36 + } 37 + 38 + select { 39 + case b.missingRecords <- rec: 40 + case <-ctx.Done(): 41 + } 42 + 43 + if rec.Wait { 44 + select { 45 + case <-rec.waitch: 46 + case <-ctx.Done(): 47 + } 48 + } 49 + } 50 + 51 + func (b *PostgresBackend) missingRecordFetcher() { 52 + for rec := range b.missingRecords { 53 + var err error 54 + switch rec.Type { 55 + case MissingRecordTypeProfile: 56 + err = b.fetchMissingProfile(context.TODO(), rec.Identifier) 57 + case MissingRecordTypePost: 58 + err = b.fetchMissingPost(context.TODO(), rec.Identifier) 59 + case MissingRecordTypeFeedGenerator: 60 + err = b.fetchMissingFeedGenerator(context.TODO(), rec.Identifier) 61 + default: 62 + slog.Error("unknown missing record type", "type", rec.Type) 63 + continue 64 + } 65 + 66 + if err != nil { 67 + slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err) 68 + } 69 + 70 + if rec.Wait { 71 + close(rec.waitch) 72 + } 73 + } 74 + } 75 + 76 + func (b *PostgresBackend) fetchMissingProfile(ctx context.Context, did string) error { 77 + b.AddRelevantDid(did) 78 + 79 + repo, err := b.GetOrCreateRepo(ctx, did) 80 + if err != nil { 81 + return err 82 + } 83 + 84 + resp, err := b.dir.LookupDID(ctx, syntax.DID(did)) 85 + if err != nil { 86 + return err 87 + } 88 + 89 + c := &xrpclib.Client{ 90 + Host: resp.PDSEndpoint(), 91 + } 92 + 93 + rec, err := atproto.RepoGetRecord(ctx, c, "", "app.bsky.actor.profile", did, "self") 94 + if err != nil { 95 + return err 96 + } 97 + 98 + prof, ok := rec.Value.Val.(*bsky.ActorProfile) 99 + if !ok { 100 + return fmt.Errorf("record we got back wasnt a profile somehow") 101 + } 102 + 103 + buf := new(bytes.Buffer) 104 + if err := prof.MarshalCBOR(buf); err != nil { 105 + return err 106 + } 107 + 108 + cc, err := cid.Decode(*rec.Cid) 109 + if err != nil { 110 + return err 111 + } 112 + 113 + return b.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc) 114 + } 115 + 116 + func (b *PostgresBackend) fetchMissingPost(ctx context.Context, uri string) error { 117 + puri, err := syntax.ParseATURI(uri) 118 + if err != nil { 119 + return fmt.Errorf("invalid AT URI: %s", uri) 120 + } 121 + 122 + did := puri.Authority().String() 123 + collection := puri.Collection().String() 124 + rkey := puri.RecordKey().String() 125 + 126 + b.AddRelevantDid(did) 127 + 128 + repo, err := b.GetOrCreateRepo(ctx, did) 129 + if err != nil { 130 + return err 131 + } 132 + 133 + resp, err := b.dir.LookupDID(ctx, syntax.DID(did)) 134 + if err != nil { 135 + return err 136 + } 137 + 138 + c := &xrpclib.Client{ 139 + Host: resp.PDSEndpoint(), 140 + } 141 + 142 + rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey) 143 + if err != nil { 144 + return err 145 + } 146 + 147 + post, ok := rec.Value.Val.(*bsky.FeedPost) 148 + if !ok { 149 + return fmt.Errorf("record we got back wasn't a post somehow") 150 + } 151 + 152 + buf := new(bytes.Buffer) 153 + if err := post.MarshalCBOR(buf); err != nil { 154 + return err 155 + } 156 + 157 + cc, err := cid.Decode(*rec.Cid) 158 + if err != nil { 159 + return err 160 + } 161 + 162 + return b.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc) 163 + } 164 + 165 + func (b *PostgresBackend) fetchMissingFeedGenerator(ctx context.Context, uri string) error { 166 + puri, err := syntax.ParseATURI(uri) 167 + if err != nil { 168 + return fmt.Errorf("invalid AT URI: %s", uri) 169 + } 170 + 171 + did := puri.Authority().String() 172 + collection := puri.Collection().String() 173 + rkey := puri.RecordKey().String() 174 + b.AddRelevantDid(did) 175 + 176 + repo, err := b.GetOrCreateRepo(ctx, did) 177 + if err != nil { 178 + return err 179 + } 180 + 181 + resp, err := b.dir.LookupDID(ctx, syntax.DID(did)) 182 + if err != nil { 183 + return err 184 + } 185 + 186 + c := &xrpclib.Client{ 187 + Host: resp.PDSEndpoint(), 188 + } 189 + 190 + rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey) 191 + if err != nil { 192 + return err 193 + } 194 + 195 + feedGen, ok := rec.Value.Val.(*bsky.FeedGenerator) 196 + if !ok { 197 + return fmt.Errorf("record we got back wasn't a feed generator somehow") 198 + } 199 + 200 + buf := new(bytes.Buffer) 201 + if err := feedGen.MarshalCBOR(buf); err != nil { 202 + return err 203 + } 204 + 205 + cc, err := cid.Decode(*rec.Cid) 206 + if err != nil { 207 + return err 208 + } 209 + 210 + return b.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc) 211 + }
+6 -5
go.mod
··· 3 3 go 1.25.1 4 4 5 5 require ( 6 - github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f 6 + github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe 7 + github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 7 8 github.com/gorilla/websocket v1.5.1 8 9 github.com/hashicorp/golang-lru/v2 v2.0.7 9 10 github.com/ipfs/go-cid v0.4.1 ··· 60 61 github.com/ipfs/go-metrics-interface v0.0.1 // indirect 61 62 github.com/ipfs/go-peertaskqueue v0.8.1 // indirect 62 63 github.com/ipfs/go-verifcid v0.0.3 // indirect 63 - github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 // indirect 64 + github.com/ipld/go-car v0.6.2 // indirect 64 65 github.com/ipld/go-codec-dagpb v1.6.0 // indirect 65 66 github.com/ipld/go-ipld-prime v0.21.0 // indirect 66 67 github.com/jackc/pgpassfile v1.0.0 // indirect ··· 69 70 github.com/jbenet/goprocess v0.1.4 // indirect 70 71 github.com/jinzhu/inflection v1.0.0 // indirect 71 72 github.com/jinzhu/now v1.1.5 // indirect 72 - github.com/klauspost/compress v1.17.3 // indirect 73 + github.com/klauspost/compress v1.17.9 // indirect 73 74 github.com/klauspost/cpuid/v2 v2.2.7 // indirect 74 75 github.com/lestrrat-go/blackmagic v1.0.1 // indirect 75 76 github.com/lestrrat-go/httpcc v1.0.1 // indirect ··· 91 92 github.com/orandin/slog-gorm v1.3.2 // indirect 92 93 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect 93 94 github.com/prometheus/client_model v0.6.1 // indirect 94 - github.com/prometheus/common v0.48.0 // indirect 95 - github.com/prometheus/procfs v0.12.0 // indirect 95 + github.com/prometheus/common v0.54.0 // indirect 96 + github.com/prometheus/procfs v0.15.1 // indirect 96 97 github.com/redis/go-redis/v9 v9.3.0 // indirect 97 98 github.com/russross/blackfriday/v2 v2.1.0 // indirect 98 99 github.com/segmentio/asm v1.2.0 // indirect
+12 -10
go.sum
··· 6 6 github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= 7 7 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= 8 8 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= 9 - github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f h1:FugOoTzh0nCMTWGqNGsjttFWVPcwxaaGD3p/nE9V8qY= 10 - github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f/go.mod h1:n6QE1NDPFoi7PRbMUZmc2y7FibCqiVU4ePpsvhHUBR8= 9 + github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe h1:VBhaqE5ewQgXbY5SfSWFZC/AwHFo7cHxZKFYi2ce9Yo= 10 + github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe/go.mod h1:RuQVrCGm42QNsgumKaR6se+XkFKfCPNwdCiTvqKRUck= 11 + github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 h1:ovcRKN1iXZnY5WApVg+0Hw2RkwMH0ziA7lSAA8vellU= 12 + github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1/go.mod h1:5PtGi4r/PjEVBBl+0xWuQn4mBEjr9h6xsfDBADS6cHs= 11 13 github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous= 12 14 github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c= 13 15 github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= ··· 158 160 github.com/ipfs/go-peertaskqueue v0.8.1/go.mod h1:Oxxd3eaK279FxeydSPPVGHzbwVeHjatZ2GA8XD+KbPU= 159 161 github.com/ipfs/go-verifcid v0.0.3 h1:gmRKccqhWDocCRkC+a59g5QW7uJw5bpX9HWBevXa0zs= 160 162 github.com/ipfs/go-verifcid v0.0.3/go.mod h1:gcCtGniVzelKrbk9ooUSX/pM3xlH73fZZJDzQJRvOUw= 161 - github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 h1:oFo19cBmcP0Cmg3XXbrr0V/c+xU9U1huEZp8+OgBzdI= 162 - github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4/go.mod h1:6nkFF8OmR5wLKBzRKi7/YFJpyYR7+oEn1DX+mMWnlLA= 163 + github.com/ipld/go-car v0.6.2 h1:Hlnl3Awgnq8icK+ze3iRghk805lu8YNq3wlREDTF2qc= 164 + github.com/ipld/go-car v0.6.2/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8= 163 165 github.com/ipld/go-car/v2 v2.13.1 h1:KnlrKvEPEzr5IZHKTXLAEub+tPrzeAFQVRlSQvuxBO4= 164 166 github.com/ipld/go-car/v2 v2.13.1/go.mod h1:QkdjjFNGit2GIkpQ953KBwowuoukoM75nP/JI1iDJdo= 165 167 github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= ··· 188 190 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= 189 191 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= 190 192 github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= 191 - github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= 192 - github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= 193 + github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= 194 + github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= 193 195 github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= 194 196 github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= 195 197 github.com/koron/go-ssdp v0.0.3 h1:JivLMY45N76b4p/vsWGOKewBQu6uf39y8l+AQ7sDKx8= ··· 312 314 github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= 313 315 github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= 314 316 github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= 315 - github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= 316 - github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= 317 - github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= 318 - github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= 317 + github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= 318 + github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ= 319 + github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= 320 + github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= 319 321 github.com/redis/go-redis/v9 v9.0.0-rc.4/go.mod h1:Vo3EsyWnicKnSKCA7HhgnvnyA74wOA69Cd2Meli5mmA= 320 322 github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0= 321 323 github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
+8 -8
handlers.go
··· 146 146 } 147 147 148 148 if profile.Raw == nil || len(profile.Raw) == 0 { 149 - s.addMissingProfile(ctx, accdid) 149 + s.backend.TrackMissingRecord(accdid, false) 150 150 return e.JSON(404, map[string]any{ 151 151 "error": "missing profile info for user", 152 152 }) ··· 307 307 } 308 308 309 309 if profile.Raw == nil || len(profile.Raw) == 0 { 310 - s.addMissingProfile(ctx, r.Did) 310 + s.backend.TrackMissingRecord(r.Did, false) 311 311 return &authorInfo{ 312 312 Handle: resp.Handle.String(), 313 313 Did: r.Did, ··· 379 379 380 380 uri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", r.Did, p.Rkey) 381 381 if len(p.Raw) == 0 || p.NotFound { 382 - s.addMissingPost(ctx, uri) 382 + s.backend.TrackMissingRecord(uri, false) 383 383 posts[ix] = postResponse{ 384 384 Uri: uri, 385 385 Missing: true, ··· 515 515 quotedPost, err := s.backend.GetPostByUri(ctx, quotedURI, "*") 516 516 if err != nil { 517 517 slog.Warn("failed to get quoted post", "uri", quotedURI, "error", err) 518 - s.addMissingPost(ctx, quotedURI) 518 + s.backend.TrackMissingRecord(quotedURI, false) 519 519 return s.buildQuoteFallback(quotedURI, quotedCid) 520 520 } 521 521 522 522 if quotedPost == nil || quotedPost.Raw == nil || len(quotedPost.Raw) == 0 || quotedPost.NotFound { 523 - s.addMissingPost(ctx, quotedURI) 523 + s.backend.TrackMissingRecord(quotedURI, false) 524 524 return s.buildQuoteFallback(quotedURI, quotedCid) 525 525 } 526 526 ··· 707 707 prof = &p 708 708 } 709 709 } else { 710 - s.addMissingProfile(ctx, r.Did) 710 + s.backend.TrackMissingRecord(r.Did, false) 711 711 } 712 712 713 713 users = append(users, engagementUser{ ··· 767 767 prof = &p 768 768 } 769 769 } else { 770 - s.addMissingProfile(ctx, r.Did) 770 + s.backend.TrackMissingRecord(r.Did, false) 771 771 } 772 772 773 773 users = append(users, engagementUser{ ··· 835 835 prof = &p 836 836 } 837 837 } else { 838 - s.addMissingProfile(ctx, r.Did) 838 + s.backend.TrackMissingRecord(r.Did, false) 839 839 } 840 840 841 841 users = append(users, engagementUser{
+16 -10
hydration/post.go
··· 17 17 18 18 // PostInfo contains hydrated post information 19 19 type PostInfo struct { 20 + ID uint 20 21 URI string 21 22 Cid string 22 23 Post *bsky.FeedPost ··· 39 40 ctx, span := tracer.Start(ctx, "hydratePost") 40 41 defer span.End() 41 42 43 + p, err := h.backend.GetPostByUri(ctx, uri, "*") 44 + if err != nil { 45 + return nil, err 46 + } 47 + 48 + return h.HydratePostDB(ctx, uri, p, viewerDID) 49 + } 50 + 51 + func (h *Hydrator) HydratePostDB(ctx context.Context, uri string, dbPost *models.Post, viewerDID string) (*PostInfo, error) { 42 52 autoFetch, _ := ctx.Value("auto-fetch").(bool) 43 53 44 54 authorDid := extractDIDFromURI(uri) ··· 47 57 return nil, err 48 58 } 49 59 50 - // Query post from database 51 - var dbPost models.Post 52 - if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ? `, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil { 53 - return nil, fmt.Errorf("failed to query post: %w", err) 54 - } 55 - 56 60 if dbPost.NotFound || len(dbPost.Raw) == 0 { 57 61 if autoFetch { 58 62 h.AddMissingRecord(uri, true) 59 - if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ? `, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil { 63 + if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ?`, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil { 60 64 return nil, fmt.Errorf("failed to query post: %w", err) 61 65 } 62 66 if dbPost.NotFound || len(dbPost.Raw) == 0 { ··· 75 79 76 80 var wg sync.WaitGroup 77 81 78 - // Get author DID 79 - 80 - authorDID := extractDIDFromURI(uri) 82 + authorDID := r.Did 81 83 82 84 // Get engagement counts 83 85 var likes, reposts, replies int ··· 121 123 wg.Wait() 122 124 123 125 info := &PostInfo{ 126 + ID: dbPost.ID, 124 127 URI: uri, 125 128 Cid: dbPost.Cid, 126 129 Post: &feedPost, ··· 385 388 386 389 // hydrateEmbeddedRecord hydrates an embedded record (for quote posts, etc.) 387 390 func (h *Hydrator) hydrateEmbeddedRecord(ctx context.Context, uri string, viewerDID string) *bsky.EmbedRecord_View_Record { 391 + ctx, span := tracer.Start(ctx, "hydrateEmbeddedRecord") 392 + defer span.End() 393 + 388 394 // Check if it's a post URI 389 395 if !isPostURI(uri) { 390 396 // Could be a feed generator, list, labeler, or starter pack
+10
hydration/utils.go
··· 5 5 "fmt" 6 6 7 7 "github.com/bluesky-social/indigo/atproto/syntax" 8 + "github.com/whyrusleeping/market/models" 8 9 ) 9 10 10 11 func (h *Hydrator) NormalizeUri(ctx context.Context, uri string) (string, error) { ··· 27 28 28 29 return fmt.Sprintf("at://%s/%s/%s", did, puri.Collection().String(), puri.RecordKey().String()), nil 29 30 } 31 + 32 + func (h *Hydrator) UriForPost(ctx context.Context, p *models.Post) (string, error) { 33 + did, err := h.backend.DidFromID(ctx, p.Author) 34 + if err != nil { 35 + return "", err 36 + } 37 + 38 + return fmt.Sprintf("at://%s/app.bsky.feed.post/%s", did, p.Rkey), nil 39 + }
+39 -103
main.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 + "encoding/json" 6 7 "fmt" 7 8 "log" 8 9 "log/slog" ··· 19 20 "github.com/bluesky-social/indigo/atproto/identity" 20 21 "github.com/bluesky-social/indigo/atproto/identity/redisdir" 21 22 "github.com/bluesky-social/indigo/atproto/syntax" 22 - "github.com/bluesky-social/indigo/cmd/relay/stream" 23 - "github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel" 24 23 "github.com/bluesky-social/indigo/repo" 25 24 "github.com/bluesky-social/indigo/util/cliutil" 26 25 xrpclib "github.com/bluesky-social/indigo/xrpc" 27 - "github.com/gorilla/websocket" 28 26 "github.com/ipfs/go-cid" 29 27 "github.com/jackc/pgx/v5/pgxpool" 30 28 "github.com/prometheus/client_golang/prometheus" ··· 71 69 &cli.StringFlag{ 72 70 Name: "redis-url", 73 71 }, 72 + &cli.StringFlag{ 73 + Name: "sync-config", 74 + }, 74 75 } 75 76 app.Action = func(cctx *cli.Context) error { 76 77 db, err := cliutil.SetupDatabase(cctx.String("db-url"), cctx.Int("max-db-connections")) ··· 135 136 db.AutoMigrate(SequenceTracker{}) 136 137 db.Exec("CREATE INDEX IF NOT EXISTS reposts_subject_idx ON reposts (subject)") 137 138 db.Exec("CREATE INDEX IF NOT EXISTS posts_reply_to_idx ON posts (reply_to)") 139 + db.Exec("CREATE INDEX IF NOT EXISTS posts_in_thread_idx ON posts (in_thread)") 138 140 139 141 ctx := context.TODO() 140 142 ··· 199 201 client: cc, 200 202 dir: dir, 201 203 202 - missingRecords: make(chan MissingRecord, 1024), 203 - db: db, 204 + db: db, 204 205 } 205 - fmt.Println("MY DID: ", s.mydid) 206 206 207 - pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, nil) 207 + pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, dir) 208 208 if err != nil { 209 209 return err 210 210 } ··· 241 241 http.ListenAndServe(":4445", nil) 242 242 }() 243 243 244 - go s.missingRecordFetcher() 244 + sc := SyncConfig{ 245 + Backends: []SyncBackend{ 246 + { 247 + Type: "firehose", 248 + Host: "bsky.network", 249 + }, 250 + }, 251 + } 245 252 246 - seqno, err := loadLastSeq(db, "firehose_seq") 247 - if err != nil { 248 - fmt.Println("failed to load sequence number, starting over", err) 253 + if scfn := cctx.String("sync-config"); scfn != "" { 254 + { 255 + scfi, err := os.Open(scfn) 256 + if err != nil { 257 + return err 258 + } 259 + defer scfi.Close() 260 + 261 + var lsc SyncConfig 262 + if err := json.NewDecoder(scfi).Decode(&lsc); err != nil { 263 + return err 264 + } 265 + sc = lsc 266 + } 249 267 } 250 268 251 - return s.startLiveTail(ctx, int(seqno), 10, 20) 269 + /* 270 + sc.Backends[0] = SyncBackend{ 271 + Type: "jetstream", 272 + Host: "jetstream1.us-west.bsky.network", 273 + } 274 + */ 275 + 276 + return s.StartSyncEngine(ctx, &sc) 277 + 252 278 } 253 279 254 280 app.RunAndExitOnError() ··· 266 292 seqLk sync.Mutex 267 293 lastSeq int64 268 294 269 - mpLk sync.Mutex 270 - missingRecords chan MissingRecord 295 + mpLk sync.Mutex 271 296 272 297 db *gorm.DB 273 298 } ··· 275 300 func (s *Server) getXrpcClient() (*xrpclib.Client, error) { 276 301 // TODO: handle refreshing the token periodically 277 302 return s.client, nil 278 - } 279 - 280 - func (s *Server) startLiveTail(ctx context.Context, curs int, parWorkers, maxQ int) error { 281 - slog.Info("starting live tail") 282 - 283 - // Connect to the Relay websocket 284 - urlStr := fmt.Sprintf("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", curs) 285 - 286 - d := websocket.DefaultDialer 287 - con, _, err := d.Dial(urlStr, http.Header{ 288 - "User-Agent": []string{"market/0.0.1"}, 289 - }) 290 - if err != nil { 291 - return fmt.Errorf("failed to connect to relay: %w", err) 292 - } 293 - 294 - var lelk sync.Mutex 295 - lastEvent := time.Now() 296 - 297 - go func() { 298 - for range time.Tick(time.Second) { 299 - lelk.Lock() 300 - let := lastEvent 301 - lelk.Unlock() 302 - 303 - if time.Since(let) > time.Second*30 { 304 - slog.Error("firehose connection timed out") 305 - con.Close() 306 - return 307 - } 308 - 309 - } 310 - 311 - }() 312 - 313 - var cclk sync.Mutex 314 - var completeCursor int64 315 - 316 - rsc := &stream.RepoStreamCallbacks{ 317 - RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 318 - ctx := context.Background() 319 - 320 - firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq)) 321 - 322 - s.seqLk.Lock() 323 - if evt.Seq > s.lastSeq { 324 - curs = int(evt.Seq) 325 - s.lastSeq = evt.Seq 326 - 327 - if evt.Seq%1000 == 0 { 328 - if err := storeLastSeq(s.db, "firehose_seq", evt.Seq); err != nil { 329 - fmt.Println("failed to store seqno: ", err) 330 - } 331 - } 332 - } 333 - s.seqLk.Unlock() 334 - 335 - lelk.Lock() 336 - lastEvent = time.Now() 337 - lelk.Unlock() 338 - 339 - if err := s.backend.HandleEvent(ctx, evt); err != nil { 340 - return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err) 341 - } 342 - 343 - cclk.Lock() 344 - if evt.Seq > completeCursor { 345 - completeCursor = evt.Seq 346 - firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq)) 347 - } 348 - cclk.Unlock() 349 - 350 - return nil 351 - }, 352 - RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error { 353 - return nil 354 - }, 355 - // TODO: all the other event types 356 - Error: func(errf *stream.ErrorFrame) error { 357 - return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message) 358 - }, 359 - } 360 - 361 - sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler) 362 - 363 - //s.eventScheduler = sched 364 - //s.streamFinished = make(chan struct{}) 365 - 366 - return stream.HandleRepoStream(ctx, con, sched, slog.Default()) 367 303 } 368 304 369 305 func (s *Server) resolveAccountIdent(ctx context.Context, acc string) (string, error) {
-234
missing.go
··· 1 - package main 2 - 3 - import ( 4 - "bytes" 5 - "context" 6 - "fmt" 7 - "log/slog" 8 - 9 - "github.com/bluesky-social/indigo/api/atproto" 10 - "github.com/bluesky-social/indigo/api/bsky" 11 - "github.com/bluesky-social/indigo/atproto/syntax" 12 - xrpclib "github.com/bluesky-social/indigo/xrpc" 13 - "github.com/ipfs/go-cid" 14 - ) 15 - 16 - type MissingRecordType string 17 - 18 - const ( 19 - MissingRecordTypeProfile MissingRecordType = "profile" 20 - MissingRecordTypePost MissingRecordType = "post" 21 - MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator" 22 - ) 23 - 24 - type MissingRecord struct { 25 - Type MissingRecordType 26 - Identifier string // DID for profiles, AT-URI for posts/feedgens 27 - Wait bool 28 - 29 - waitch chan struct{} 30 - } 31 - 32 - func (s *Server) addMissingRecord(ctx context.Context, rec MissingRecord) { 33 - if rec.Wait { 34 - rec.waitch = make(chan struct{}) 35 - } 36 - 37 - select { 38 - case s.missingRecords <- rec: 39 - case <-ctx.Done(): 40 - } 41 - 42 - if rec.Wait { 43 - select { 44 - case <-rec.waitch: 45 - case <-ctx.Done(): 46 - } 47 - } 48 - } 49 - 50 - // Legacy methods for backward compatibility 51 - func (s *Server) addMissingProfile(ctx context.Context, did string) { 52 - s.addMissingRecord(ctx, MissingRecord{ 53 - Type: MissingRecordTypeProfile, 54 - Identifier: did, 55 - }) 56 - } 57 - 58 - func (s *Server) addMissingPost(ctx context.Context, uri string) { 59 - slog.Info("adding missing post to fetch queue", "uri", uri) 60 - s.addMissingRecord(ctx, MissingRecord{ 61 - Type: MissingRecordTypePost, 62 - Identifier: uri, 63 - }) 64 - } 65 - 66 - func (s *Server) addMissingFeedGenerator(ctx context.Context, uri string) { 67 - slog.Info("adding missing feed generator to fetch queue", "uri", uri) 68 - s.addMissingRecord(ctx, MissingRecord{ 69 - Type: MissingRecordTypeFeedGenerator, 70 - Identifier: uri, 71 - }) 72 - } 73 - 74 - func (s *Server) missingRecordFetcher() { 75 - for rec := range s.missingRecords { 76 - var err error 77 - switch rec.Type { 78 - case MissingRecordTypeProfile: 79 - err = s.fetchMissingProfile(context.TODO(), rec.Identifier) 80 - case MissingRecordTypePost: 81 - err = s.fetchMissingPost(context.TODO(), rec.Identifier) 82 - case MissingRecordTypeFeedGenerator: 83 - err = s.fetchMissingFeedGenerator(context.TODO(), rec.Identifier) 84 - default: 85 - slog.Error("unknown missing record type", "type", rec.Type) 86 - continue 87 - } 88 - 89 - if err != nil { 90 - slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err) 91 - } 92 - 93 - if rec.Wait { 94 - close(rec.waitch) 95 - } 96 - } 97 - } 98 - 99 - func (s *Server) fetchMissingProfile(ctx context.Context, did string) error { 100 - s.backend.AddRelevantDid(did) 101 - 102 - repo, err := s.backend.GetOrCreateRepo(ctx, did) 103 - if err != nil { 104 - return err 105 - } 106 - 107 - resp, err := s.dir.LookupDID(ctx, syntax.DID(did)) 108 - if err != nil { 109 - return err 110 - } 111 - 112 - c := &xrpclib.Client{ 113 - Host: resp.PDSEndpoint(), 114 - } 115 - 116 - rec, err := atproto.RepoGetRecord(ctx, c, "", "app.bsky.actor.profile", did, "self") 117 - if err != nil { 118 - return err 119 - } 120 - 121 - prof, ok := rec.Value.Val.(*bsky.ActorProfile) 122 - if !ok { 123 - return fmt.Errorf("record we got back wasnt a profile somehow") 124 - } 125 - 126 - buf := new(bytes.Buffer) 127 - if err := prof.MarshalCBOR(buf); err != nil { 128 - return err 129 - } 130 - 131 - cc, err := cid.Decode(*rec.Cid) 132 - if err != nil { 133 - return err 134 - } 135 - 136 - return s.backend.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc) 137 - } 138 - 139 - func (s *Server) fetchMissingPost(ctx context.Context, uri string) error { 140 - puri, err := syntax.ParseATURI(uri) 141 - if err != nil { 142 - return fmt.Errorf("invalid AT URI: %s", uri) 143 - } 144 - 145 - did := puri.Authority().String() 146 - collection := puri.Collection().String() 147 - rkey := puri.RecordKey().String() 148 - 149 - s.backend.AddRelevantDid(did) 150 - 151 - repo, err := s.backend.GetOrCreateRepo(ctx, did) 152 - if err != nil { 153 - return err 154 - } 155 - 156 - resp, err := s.dir.LookupDID(ctx, syntax.DID(did)) 157 - if err != nil { 158 - return err 159 - } 160 - 161 - c := &xrpclib.Client{ 162 - Host: resp.PDSEndpoint(), 163 - } 164 - 165 - rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey) 166 - if err != nil { 167 - return err 168 - } 169 - 170 - post, ok := rec.Value.Val.(*bsky.FeedPost) 171 - if !ok { 172 - return fmt.Errorf("record we got back wasn't a post somehow") 173 - } 174 - 175 - buf := new(bytes.Buffer) 176 - if err := post.MarshalCBOR(buf); err != nil { 177 - return err 178 - } 179 - 180 - cc, err := cid.Decode(*rec.Cid) 181 - if err != nil { 182 - return err 183 - } 184 - 185 - return s.backend.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc) 186 - } 187 - 188 - func (s *Server) fetchMissingFeedGenerator(ctx context.Context, uri string) error { 189 - puri, err := syntax.ParseATURI(uri) 190 - if err != nil { 191 - return fmt.Errorf("invalid AT URI: %s", uri) 192 - } 193 - 194 - did := puri.Authority().String() 195 - collection := puri.Collection().String() 196 - rkey := puri.RecordKey().String() 197 - s.backend.AddRelevantDid(did) 198 - 199 - repo, err := s.backend.GetOrCreateRepo(ctx, did) 200 - if err != nil { 201 - return err 202 - } 203 - 204 - resp, err := s.dir.LookupDID(ctx, syntax.DID(did)) 205 - if err != nil { 206 - return err 207 - } 208 - 209 - c := &xrpclib.Client{ 210 - Host: resp.PDSEndpoint(), 211 - } 212 - 213 - rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey) 214 - if err != nil { 215 - return err 216 - } 217 - 218 - feedGen, ok := rec.Value.Val.(*bsky.FeedGenerator) 219 - if !ok { 220 - return fmt.Errorf("record we got back wasn't a feed generator somehow") 221 - } 222 - 223 - buf := new(bytes.Buffer) 224 - if err := feedGen.MarshalCBOR(buf); err != nil { 225 - return err 226 - } 227 - 228 - cc, err := cid.Decode(*rec.Cid) 229 - if err != nil { 230 - return err 231 - } 232 - 233 - return s.backend.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc) 234 - }
+8
sync-config-jetstream.json
··· 1 + { 2 + "backends": [ 3 + { 4 + "type": "jetstream", 5 + "host": "jetstream1.us-west.bsky.network" 6 + } 7 + ] 8 + }
+281
sync.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "net/http" 8 + "sync" 9 + "time" 10 + 11 + "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/cmd/relay/stream" 13 + "github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel" 14 + jsclient "github.com/bluesky-social/jetstream/pkg/client" 15 + jsparallel "github.com/bluesky-social/jetstream/pkg/client/schedulers/parallel" 16 + "github.com/bluesky-social/jetstream/pkg/models" 17 + "github.com/gorilla/websocket" 18 + ) 19 + 20 + type SyncConfig struct { 21 + Backends []SyncBackend `json:"backends"` 22 + } 23 + 24 + type SyncBackend struct { 25 + Type string `json:"type"` 26 + Host string `json:"host"` 27 + MaxWorkers int `json:"max_workers,omitempty"` 28 + } 29 + 30 + func (s *Server) StartSyncEngine(ctx context.Context, sc *SyncConfig) error { 31 + for _, be := range sc.Backends { 32 + switch be.Type { 33 + case "firehose": 34 + go s.runSyncFirehose(ctx, be) 35 + case "jetstream": 36 + go s.runSyncJetstream(ctx, be) 37 + default: 38 + return fmt.Errorf("unrecognized sync backend type: %q", be.Type) 39 + } 40 + } 41 + 42 + <-ctx.Done() 43 + return fmt.Errorf("exiting sync routine") 44 + } 45 + 46 + const failureTimeInterval = time.Second * 5 47 + 48 + func (s *Server) runSyncFirehose(ctx context.Context, be SyncBackend) { 49 + var failures int 50 + for { 51 + seqno, err := loadLastSeq(s.db, be.Host) 52 + if err != nil { 53 + fmt.Println("failed to load sequence number, starting over", err) 54 + } 55 + 56 + maxWorkers := 10 57 + if be.MaxWorkers != 0 { 58 + maxWorkers = be.MaxWorkers 59 + } 60 + 61 + start := time.Now() 62 + if err := s.startLiveTail(ctx, be.Host, int(seqno), maxWorkers, 20); err != nil { 63 + slog.Error("firehose connection lost", "host", be.Host, "error", err) 64 + } 65 + 66 + elapsed := time.Since(start) 67 + 68 + if elapsed > failureTimeInterval { 69 + failures = 0 70 + continue 71 + } 72 + failures++ 73 + 74 + delay := delayForFailureCount(failures) 75 + slog.Warn("retrying connection after delay", "host", be.Host, "delay", delay) 76 + } 77 + } 78 + 79 + func (s *Server) runSyncJetstream(ctx context.Context, be SyncBackend) { 80 + var failures int 81 + for { 82 + // Load last cursor (stored as sequence number in same table) 83 + cursor, err := loadLastSeq(s.db, be.Host) 84 + if err != nil { 85 + slog.Warn("failed to load jetstream cursor, starting from live", "error", err) 86 + cursor = 0 87 + } 88 + 89 + maxWorkers := 10 90 + if be.MaxWorkers != 0 { 91 + maxWorkers = be.MaxWorkers 92 + } 93 + 94 + start := time.Now() 95 + if err := s.startJetstreamTail(ctx, be.Host, cursor, maxWorkers); err != nil { 96 + slog.Error("jetstream connection lost", "host", be.Host, "error", err) 97 + } 98 + 99 + elapsed := time.Since(start) 100 + 101 + if elapsed > failureTimeInterval { 102 + failures = 0 103 + continue 104 + } 105 + failures++ 106 + 107 + delay := delayForFailureCount(failures) 108 + slog.Warn("retrying jetstream connection after delay", "host", be.Host, "delay", delay) 109 + time.Sleep(delay) 110 + } 111 + } 112 + 113 + func delayForFailureCount(n int) time.Duration { 114 + if n < 5 { 115 + return (time.Second * 5) + (time.Second * 2 * time.Duration(n)) 116 + } 117 + 118 + return time.Second * 30 119 + } 120 + 121 + func (s *Server) startLiveTail(ctx context.Context, host string, curs int, parWorkers, maxQ int) error { 122 + ctx, cancel := context.WithCancel(ctx) 123 + defer cancel() 124 + 125 + slog.Info("starting live tail") 126 + 127 + // Connect to the Relay websocket 128 + urlStr := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", host, curs) 129 + 130 + d := websocket.DefaultDialer 131 + con, _, err := d.Dial(urlStr, http.Header{ 132 + "User-Agent": []string{"konbini/0.0.1"}, 133 + }) 134 + if err != nil { 135 + return fmt.Errorf("failed to connect to relay: %w", err) 136 + } 137 + 138 + var lelk sync.Mutex 139 + lastEvent := time.Now() 140 + 141 + go func() { 142 + tick := time.NewTicker(time.Second) 143 + defer tick.Stop() 144 + for { 145 + select { 146 + case <-tick.C: 147 + lelk.Lock() 148 + let := lastEvent 149 + lelk.Unlock() 150 + 151 + if time.Since(let) > time.Second*30 { 152 + slog.Error("firehose connection timed out") 153 + con.Close() 154 + return 155 + } 156 + case <-ctx.Done(): 157 + return 158 + } 159 + } 160 + }() 161 + 162 + var cclk sync.Mutex 163 + var completeCursor int64 164 + 165 + rsc := &stream.RepoStreamCallbacks{ 166 + RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 167 + ctx := context.Background() 168 + 169 + firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq)) 170 + 171 + s.seqLk.Lock() 172 + if evt.Seq > s.lastSeq { 173 + curs = int(evt.Seq) 174 + s.lastSeq = evt.Seq 175 + 176 + if evt.Seq%1000 == 0 { 177 + if err := storeLastSeq(s.db, host, evt.Seq); err != nil { 178 + fmt.Println("failed to store seqno: ", err) 179 + } 180 + } 181 + } 182 + s.seqLk.Unlock() 183 + 184 + lelk.Lock() 185 + lastEvent = time.Now() 186 + lelk.Unlock() 187 + 188 + if err := s.backend.HandleEvent(ctx, evt); err != nil { 189 + return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err) 190 + } 191 + 192 + cclk.Lock() 193 + if evt.Seq > completeCursor { 194 + completeCursor = evt.Seq 195 + firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq)) 196 + } 197 + cclk.Unlock() 198 + 199 + return nil 200 + }, 201 + RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error { 202 + return nil 203 + }, 204 + // TODO: all the other event types 205 + Error: func(errf *stream.ErrorFrame) error { 206 + return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message) 207 + }, 208 + } 209 + 210 + sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler) 211 + 212 + return stream.HandleRepoStream(ctx, con, sched, slog.Default()) 213 + } 214 + 215 + func (s *Server) startJetstreamTail(ctx context.Context, host string, cursor int64, parWorkers int) error { 216 + ctx, cancel := context.WithCancel(ctx) 217 + defer cancel() 218 + 219 + slog.Info("starting jetstream tail", "host", host, "cursor", cursor) 220 + 221 + // Create a scheduler for parallel processing 222 + lastStored := int64(0) 223 + sched := jsparallel.NewScheduler( 224 + parWorkers, 225 + host, 226 + slog.Default(), 227 + func(ctx context.Context, event *models.Event) error { 228 + // Update cursor tracking 229 + s.seqLk.Lock() 230 + if event.TimeUS > s.lastSeq { 231 + s.lastSeq = event.TimeUS 232 + if event.TimeUS-lastStored > 1_000_000 { 233 + // Store checkpoint periodically 234 + if err := storeLastSeq(s.db, host, event.TimeUS); err != nil { 235 + slog.Error("failed to store jetstream cursor", "error", err) 236 + } 237 + lastStored = event.TimeUS 238 + } 239 + } 240 + s.seqLk.Unlock() 241 + 242 + // Update metrics 243 + firehoseCursorGauge.WithLabelValues("ingest").Set(float64(event.TimeUS)) 244 + 245 + // Convert Jetstream event to ATProto event format 246 + if event.Commit != nil { 247 + 248 + if err := s.backend.HandleEventJetstream(ctx, event); err != nil { 249 + return fmt.Errorf("handle event (%s,%d): %w", event.Did, event.TimeUS, err) 250 + } 251 + 252 + firehoseCursorGauge.WithLabelValues("complete").Set(float64(event.TimeUS)) 253 + } 254 + 255 + return nil 256 + }, 257 + ) 258 + 259 + // Configure Jetstream client 260 + config := jsclient.DefaultClientConfig() 261 + config.WebsocketURL = fmt.Sprintf("wss://%s/subscribe", host) 262 + 263 + // Prepare cursor pointer 264 + var cursorPtr *int64 265 + if cursor > 0 { 266 + cursorPtr = &cursor 267 + } 268 + 269 + // Create and connect client 270 + client, err := jsclient.NewClient( 271 + config, 272 + slog.Default(), 273 + sched, 274 + ) 275 + if err != nil { 276 + return fmt.Errorf("create jetstream client: %w", err) 277 + } 278 + 279 + // Start reading from Jetstream 280 + return client.ConnectAndRead(ctx, cursorPtr) 281 + }
+11 -11
xrpc/feed/getPostThread.go
··· 15 15 func HandleGetPostThread(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error { 16 16 uriParam := c.QueryParam("uri") 17 17 if uriParam == "" { 18 - return c.JSON(http.StatusBadRequest, map[string]interface{}{ 18 + return c.JSON(http.StatusBadRequest, map[string]any{ 19 19 "error": "InvalidRequest", 20 20 "message": "uri parameter is required", 21 21 }) ··· 27 27 // Hydrate the requested post 28 28 postInfo, err := hydrator.HydratePost(ctx, uriParam, viewer) 29 29 if err != nil { 30 - return c.JSON(http.StatusNotFound, map[string]interface{}{ 30 + return c.JSON(http.StatusNotFound, map[string]any{ 31 31 "error": "NotFound", 32 32 "message": "post not found", 33 33 }) ··· 74 74 uri: uri, 75 75 replyTo: tp.ReplyTo, 76 76 inThread: tp.InThread, 77 - replies: []interface{}{}, 77 + replies: []any{}, 78 78 } 79 79 } 80 80 ··· 98 98 } 99 99 100 100 if rootNode == nil { 101 - return c.JSON(http.StatusNotFound, map[string]interface{}{ 101 + return c.JSON(http.StatusNotFound, map[string]any{ 102 102 "error": "NotFound", 103 103 "message": "thread root not found", 104 104 }) ··· 107 107 // Build the response by traversing the tree 108 108 thread := buildThreadView(ctx, db, rootNode, postsByID, hydrator, viewer, nil) 109 109 110 - return c.JSON(http.StatusOK, map[string]interface{}{ 110 + return c.JSON(http.StatusOK, map[string]any{ 111 111 "thread": thread, 112 112 }) 113 113 } ··· 117 117 uri string 118 118 replyTo uint 119 119 inThread uint 120 - replies []interface{} 120 + replies []any 121 121 } 122 122 123 - func buildThreadView(ctx context.Context, db *gorm.DB, node *threadPostNode, allNodes map[uint]*threadPostNode, hydrator *hydration.Hydrator, viewer string, parent interface{}) interface{} { 123 + func buildThreadView(ctx context.Context, db *gorm.DB, node *threadPostNode, allNodes map[uint]*threadPostNode, hydrator *hydration.Hydrator, viewer string, parent any) any { 124 124 // Hydrate this post 125 125 postInfo, err := hydrator.HydratePost(ctx, node.uri, viewer) 126 126 if err != nil { 127 127 // Return a notFound post 128 - return map[string]interface{}{ 128 + return map[string]any{ 129 129 "$type": "app.bsky.feed.defs#notFoundPost", 130 130 "uri": node.uri, 131 131 } ··· 134 134 // Hydrate author 135 135 authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author) 136 136 if err != nil { 137 - return map[string]interface{}{ 137 + return map[string]any{ 138 138 "$type": "app.bsky.feed.defs#notFoundPost", 139 139 "uri": node.uri, 140 140 } 141 141 } 142 142 143 143 // Build replies 144 - var replies []interface{} 144 + var replies []any 145 145 for _, replyNode := range node.replies { 146 146 if rn, ok := replyNode.(*threadPostNode); ok { 147 147 replyView := buildThreadView(ctx, db, rn, allNodes, hydrator, viewer, nil) ··· 150 150 } 151 151 152 152 // Build the thread view post 153 - var repliesForView interface{} 153 + var repliesForView any 154 154 if len(replies) > 0 { 155 155 repliesForView = replies 156 156 }
+12
xrpc/notification/listNotifications.go
··· 131 131 cursorPtr = &cursor 132 132 } 133 133 134 + var lastSeen time.Time 135 + if err := db.Raw("SELECT seen_at FROM notification_seens WHERE repo = (select id from repos where did = ?)", viewer).Scan(&lastSeen).Error; err != nil { 136 + return err 137 + } 138 + 139 + var lastSeenStr *string 140 + if !lastSeen.IsZero() { 141 + s := lastSeen.Format(time.RFC3339) 142 + lastSeenStr = &s 143 + } 144 + 134 145 output := &bsky.NotificationListNotifications_Output{ 135 146 Notifications: notifications, 136 147 Cursor: cursorPtr, 148 + SeenAt: lastSeenStr, 137 149 } 138 150 139 151 return c.JSON(http.StatusOK, output)
+159 -131
xrpc/unspecced/getPostThreadV2.go
··· 1 1 package unspecced 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "fmt" 6 7 "log/slog" 7 8 "net/http" 8 9 "strconv" 10 + "sync" 9 11 10 12 "github.com/bluesky-social/indigo/api/bsky" 11 13 "github.com/labstack/echo/v4" 12 14 "github.com/whyrusleeping/konbini/hydration" 13 15 "github.com/whyrusleeping/konbini/views" 16 + "github.com/whyrusleeping/market/models" 17 + "go.opentelemetry.io/otel" 14 18 "gorm.io/gorm" 15 19 ) 16 20 21 + var tracer = otel.Tracer("xrpc/unspecced") 22 + 17 23 // HandleGetPostThreadV2 implements app.bsky.unspecced.getPostThreadV2 18 24 func HandleGetPostThreadV2(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error { 19 - ctx := c.Request().Context() 25 + ctx, span := tracer.Start(c.Request().Context(), "getPostThreadV2") 26 + defer span.End() 20 27 ctx = context.WithValue(ctx, "auto-fetch", true) 21 28 22 29 // Parse parameters ··· 69 76 }) 70 77 } 71 78 72 - // Determine the root post ID for the thread 73 - rootPostID := anchorPostInfo.InThread 74 - if rootPostID == 0 { 75 - // This post is the root - get its ID 76 - var postID uint 77 - db.Raw(` 78 - SELECT id FROM posts 79 - WHERE author = (SELECT id FROM repos WHERE did = ?) 80 - AND rkey = ? 81 - `, extractDIDFromURI(anchorUri), extractRkeyFromURI(anchorUri)).Scan(&postID) 82 - rootPostID = postID 79 + threadID := anchorPostInfo.InThread 80 + if threadID == 0 { 81 + threadID = anchorPostInfo.ID 83 82 } 84 83 85 - // Query all posts in this thread 86 - type threadPostRow struct { 87 - ID uint 88 - Rkey string 89 - ReplyTo uint 90 - InThread uint 91 - AuthorDid string 84 + var threadPosts []*models.Post 85 + if err := db.Raw("SELECT * FROM posts WHERE in_thread = ? OR id = ?", threadID, anchorPostInfo.ID).Scan(&threadPosts).Error; err != nil { 86 + return err 92 87 } 93 - var threadPosts []threadPostRow 94 - db.Raw(` 95 - SELECT p.id, p.rkey, p.reply_to, p.in_thread, r.did as author_did 96 - FROM posts p 97 - JOIN repos r ON r.id = p.author 98 - WHERE (p.id = ? OR p.in_thread = ?) 99 - AND p.not_found = false 100 - ORDER BY p.created ASC 101 - `, rootPostID, rootPostID).Scan(&threadPosts) 102 88 103 - // Build a map of posts by ID 104 - postsByID := make(map[uint]*threadNode) 105 - for _, tp := range threadPosts { 106 - uri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", tp.AuthorDid, tp.Rkey) 107 - postsByID[tp.ID] = &threadNode{ 108 - id: tp.ID, 109 - uri: uri, 110 - replyTo: tp.ReplyTo, 111 - inThread: tp.InThread, 112 - children: []*threadNode{}, 113 - } 114 - } 89 + fmt.Println("GOT THREAD POSTS: ", len(threadPosts)) 115 90 116 - // Build parent-child relationships 117 - for _, node := range postsByID { 118 - if node.replyTo != 0 { 119 - parent := postsByID[node.replyTo] 120 - if parent != nil { 121 - parent.children = append(parent.children, node) 122 - } 123 - } 91 + treeNodes, err := buildThreadTree(ctx, hydrator, db, threadPosts) 92 + if err != nil { 93 + return fmt.Errorf("failed to construct tree: %w", err) 124 94 } 125 95 126 - // Find the anchor node 127 - anchorID := uint(0) 128 - for id, node := range postsByID { 129 - if node.uri == anchorUri { 130 - anchorID = id 131 - break 132 - } 133 - } 134 - 135 - if anchorID == 0 { 136 - return c.JSON(http.StatusNotFound, map[string]interface{}{ 137 - "error": "NotFound", 138 - "message": "anchor post not found in thread", 139 - }) 140 - } 141 - 142 - anchorNode := postsByID[anchorID] 96 + anchor := treeNodes[anchorPostInfo.ID] 143 97 144 98 // Build flat thread items list 145 99 var threadItems []*bsky.UnspeccedGetPostThreadV2_ThreadItem ··· 147 101 148 102 // Add parents if requested 149 103 if above { 150 - parents := collectParents(anchorNode, postsByID) 151 - for i := len(parents) - 1; i >= 0; i-- { 152 - depth := int64(-(len(parents) - i)) 153 - item := buildThreadItem(ctx, hydrator, parents[i], depth, viewer) 104 + parent := anchor.parent 105 + depth := int64(-1) 106 + for parent != nil { 107 + if parent.missing { 108 + fmt.Println("Parent missing: ", depth) 109 + item := &bsky.UnspeccedGetPostThreadV2_ThreadItem{ 110 + Depth: depth, 111 + Uri: parent.uri, 112 + Value: &bsky.UnspeccedGetPostThreadV2_ThreadItem_Value{ 113 + UnspeccedDefs_ThreadItemNotFound: &bsky.UnspeccedDefs_ThreadItemNotFound{ 114 + LexiconTypeID: "app.bsky.unspecced.defs#threadItemNotFound", 115 + }, 116 + }, 117 + } 118 + 119 + threadItems = append(threadItems, item) 120 + break 121 + } 122 + 123 + item := buildThreadItem(ctx, hydrator, parent, depth, viewer) 154 124 if item != nil { 155 125 threadItems = append(threadItems, item) 156 126 } 127 + 128 + parent = parent.parent 129 + depth-- 157 130 } 158 131 } 159 132 160 133 // Add anchor post (depth 0) 161 - anchorItem := buildThreadItem(ctx, hydrator, anchorNode, 0, viewer) 134 + anchorItem := buildThreadItem(ctx, hydrator, anchor, 0, viewer) 162 135 if anchorItem != nil { 163 136 threadItems = append(threadItems, anchorItem) 164 137 } 165 138 166 139 // Add replies below anchor 167 140 if below > 0 { 168 - replies, hasMore := collectReplies(ctx, hydrator, anchorNode, 1, below, branchingFactor, sort, viewer) 141 + replies, err := collectReplies(ctx, hydrator, anchor, 0, below, branchingFactor, sort, viewer) 142 + if err != nil { 143 + return err 144 + } 169 145 threadItems = append(threadItems, replies...) 170 - hasOtherReplies = hasMore 146 + //hasOtherReplies = hasMore 171 147 } 172 148 173 149 return c.JSON(http.StatusOK, &bsky.UnspeccedGetPostThreadV2_Output{ ··· 176 152 }) 177 153 } 178 154 179 - type threadNode struct { 180 - id uint 181 - uri string 182 - replyTo uint 183 - inThread uint 184 - children []*threadNode 185 - } 155 + func collectReplies(ctx context.Context, hydrator *hydration.Hydrator, curnode *threadTree, depth int64, below int64, branchingFactor int64, sort string, viewer string) ([]*bsky.UnspeccedGetPostThreadV2_ThreadItem, error) { 156 + if below == 0 { 157 + return nil, nil 158 + } 186 159 187 - func collectParents(node *threadNode, allNodes map[uint]*threadNode) []*threadNode { 188 - var parents []*threadNode 189 - current := node 190 - for current.replyTo != 0 { 191 - parent := allNodes[current.replyTo] 192 - if parent == nil { 193 - break 194 - } 195 - parents = append(parents, parent) 196 - current = parent 160 + type parThreadResults struct { 161 + node *bsky.UnspeccedGetPostThreadV2_ThreadItem 162 + children []*bsky.UnspeccedGetPostThreadV2_ThreadItem 197 163 } 198 - return parents 199 - } 164 + 165 + results := make([]parThreadResults, len(curnode.children)) 166 + 167 + var wg sync.WaitGroup 168 + for i := range curnode.children { 169 + ix := i 170 + wg.Go(func() { 171 + child := curnode.children[ix] 172 + 173 + results[ix].node = buildThreadItem(ctx, hydrator, child, depth+1, viewer) 174 + if child.missing { 175 + return 176 + } 200 177 201 - func collectReplies(ctx context.Context, hydrator *hydration.Hydrator, node *threadNode, currentDepth, maxDepth, branchingFactor int64, sort string, viewer string) ([]*bsky.UnspeccedGetPostThreadV2_ThreadItem, bool) { 202 - var items []*bsky.UnspeccedGetPostThreadV2_ThreadItem 203 - hasMore := false 178 + sub, err := collectReplies(ctx, hydrator, child, depth+1, below-1, branchingFactor, sort, viewer) 179 + if err != nil { 180 + slog.Error("failed to collect replies", "node", child.uri, "error", err) 181 + return 182 + } 204 183 205 - if currentDepth > maxDepth { 206 - return items, false 184 + results[ix].children = sub 185 + }) 207 186 } 208 187 209 - // Sort children based on sort parameter 210 - children := node.children 211 - // TODO: Actually sort based on the sort parameter (newest/oldest/top) 212 - // For now, just use the order we have 188 + wg.Wait() 213 189 214 - // Limit to branchingFactor 215 - limit := int(branchingFactor) 216 - if len(children) > limit { 217 - hasMore = true 218 - children = children[:limit] 190 + var out []*bsky.UnspeccedGetPostThreadV2_ThreadItem 191 + for _, res := range results { 192 + out = append(out, res.node) 193 + out = append(out, res.children...) 219 194 } 220 195 221 - for _, child := range children { 222 - item := buildThreadItem(ctx, hydrator, child, currentDepth, viewer) 223 - if item != nil { 224 - items = append(items, item) 196 + return out, nil 197 + } 225 198 226 - // Recursively collect replies 227 - if currentDepth < maxDepth { 228 - childReplies, childHasMore := collectReplies(ctx, hydrator, child, currentDepth+1, maxDepth, branchingFactor, sort, viewer) 229 - items = append(items, childReplies...) 230 - if childHasMore { 231 - hasMore = true 232 - } 233 - } 199 + func buildThreadItem(ctx context.Context, hydrator *hydration.Hydrator, node *threadTree, depth int64, viewer string) *bsky.UnspeccedGetPostThreadV2_ThreadItem { 200 + if node.missing { 201 + return &bsky.UnspeccedGetPostThreadV2_ThreadItem{ 202 + Depth: depth, 203 + Uri: node.uri, 204 + Value: &bsky.UnspeccedGetPostThreadV2_ThreadItem_Value{ 205 + UnspeccedDefs_ThreadItemNotFound: &bsky.UnspeccedDefs_ThreadItemNotFound{ 206 + LexiconTypeID: "app.bsky.unspecced.defs#threadItemNotFound", 207 + }, 208 + }, 234 209 } 235 210 } 236 211 237 - return items, hasMore 238 - } 239 - 240 - func buildThreadItem(ctx context.Context, hydrator *hydration.Hydrator, node *threadNode, depth int64, viewer string) *bsky.UnspeccedGetPostThreadV2_ThreadItem { 241 212 // Hydrate the post 242 - postInfo, err := hydrator.HydratePost(ctx, node.uri, viewer) 213 + postInfo, err := hydrator.HydratePostDB(ctx, node.uri, node.val, viewer) 243 214 if err != nil { 215 + slog.Error("failed to hydrate post in thread item", "uri", node.uri, "error", err) 244 216 // Return not found item 245 217 return &bsky.UnspeccedGetPostThreadV2_ThreadItem{ 246 218 Depth: depth, ··· 256 228 // Hydrate author 257 229 authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author) 258 230 if err != nil { 231 + slog.Error("failed to hydrate actor in thread item", "author", postInfo.Author, "error", err) 259 232 return &bsky.UnspeccedGetPostThreadV2_ThreadItem{ 260 233 Depth: depth, 261 234 Uri: node.uri, ··· 319 292 return string(parts) 320 293 } 321 294 322 - func extractRkeyFromURI(uri string) string { 323 - // URI format: at://did:plc:xxx/collection/rkey 324 - if len(uri) < 5 || uri[:5] != "at://" { 325 - return "" 295 + type threadTree struct { 296 + parent *threadTree 297 + children []*threadTree 298 + 299 + val *models.Post 300 + 301 + missing bool 302 + 303 + uri string 304 + cid string 305 + } 306 + 307 + func buildThreadTree(ctx context.Context, hydrator *hydration.Hydrator, db *gorm.DB, posts []*models.Post) (map[uint]*threadTree, error) { 308 + nodes := make(map[uint]*threadTree) 309 + for _, p := range posts { 310 + puri, err := hydrator.UriForPost(ctx, p) 311 + if err != nil { 312 + return nil, err 313 + } 314 + 315 + t := &threadTree{ 316 + val: p, 317 + uri: puri, 318 + } 319 + 320 + nodes[p.ID] = t 326 321 } 327 - // Find last slash 328 - for i := len(uri) - 1; i >= 5; i-- { 329 - if uri[i] == '/' { 330 - return uri[i+1:] 322 + 323 + missing := make(map[uint]*threadTree) 324 + for _, node := range nodes { 325 + if node.val.ReplyTo == 0 { 326 + continue 331 327 } 328 + 329 + pnode, ok := nodes[node.val.ReplyTo] 330 + if !ok { 331 + pnode = &threadTree{ 332 + missing: true, 333 + } 334 + missing[node.val.ReplyTo] = pnode 335 + 336 + var bspost bsky.FeedPost 337 + if err := bspost.UnmarshalCBOR(bytes.NewReader(node.val.Raw)); err != nil { 338 + return nil, err 339 + } 340 + 341 + if bspost.Reply == nil || bspost.Reply.Parent == nil { 342 + return nil, fmt.Errorf("node with parent had no parent in object") 343 + } 344 + 345 + pnode.uri = bspost.Reply.Parent.Uri 346 + pnode.cid = bspost.Reply.Parent.Cid 347 + 348 + /* Maybe we could force hydrate these? 349 + hydrator.AddMissingRecord(puri, true) 350 + */ 351 + } 352 + 353 + pnode.children = append(pnode.children, node) 354 + node.parent = pnode 332 355 } 333 - return "" 356 + 357 + for k, v := range missing { 358 + nodes[k] = v 359 + } 360 + 361 + return nodes, nil 334 362 }