A locally focused bluesky appview

Compare changes

Choose any two refs to compare.

+38
README.md
··· 201 201 202 202 It will take a minute but it should pull all records from that user. 203 203 204 + ## Upstream Firehose Configuration 205 + 206 + Konbini supports both standard firehose endpoints as well as jetstream. If 207 + bandwidth and CPU usage is a concern, and you trust the jetstream endpoint, 208 + then it may be worth trying that out. 209 + 210 + The configuration file is formatted as follows: 211 + 212 + ```json 213 + { 214 + "backends": [ 215 + { 216 + "type": "jetstream", 217 + "host": "jetstream1.us-west.bsky.network" 218 + } 219 + ] 220 + } 221 + ``` 222 + 223 + The default (implicit) configuration file looks like this: 224 + 225 + ```json 226 + { 227 + "backends": [ 228 + { 229 + "type": "firehose", 230 + "host": "bsky.network" 231 + } 232 + ] 233 + } 234 + ``` 235 + 236 + Note that this is an array of backends, you can specify multiple upstreams, and 237 + konbini will read from all of them. The main intended purpose of this is to be 238 + able to subscribe directly to PDSs. PDSs currently only support the full 239 + firehose endpoint, not jetstream, so be sure to specify a type of "firehose" 240 + for individual PDS endpoints. 241 + 204 242 ## License 205 243 206 244 MIT (whyrusleeping)
+55 -7
backend/backend.go
··· 10 10 11 11 "github.com/bluesky-social/indigo/api/atproto" 12 12 "github.com/bluesky-social/indigo/api/bsky" 13 + "github.com/bluesky-social/indigo/atproto/identity" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 14 15 "github.com/bluesky-social/indigo/util" 15 16 "github.com/bluesky-social/indigo/xrpc" ··· 26 27 27 28 // PostgresBackend handles database operations 28 29 type PostgresBackend struct { 29 - db *gorm.DB 30 - pgx *pgxpool.Pool 31 - tracker RecordTracker 30 + db *gorm.DB 31 + pgx *pgxpool.Pool 32 + 33 + dir identity.Directory 32 34 33 35 client *xrpc.Client 34 36 ··· 43 45 repoCache *lru.TwoQueueCache[string, *Repo] 44 46 reposLk sync.Mutex 45 47 48 + didByIDCache *lru.TwoQueueCache[uint, string] 49 + 46 50 postInfoCache *lru.TwoQueueCache[string, cachedPostInfo] 51 + 52 + missingRecords chan MissingRecord 47 53 } 48 54 49 55 type cachedPostInfo struct { ··· 52 58 } 53 59 54 60 // NewPostgresBackend creates a new PostgresBackend 55 - func NewPostgresBackend(mydid string, db *gorm.DB, pgx *pgxpool.Pool, client *xrpc.Client, tracker RecordTracker) (*PostgresBackend, error) { 61 + func NewPostgresBackend(mydid string, db *gorm.DB, pgx *pgxpool.Pool, client *xrpc.Client, dir identity.Directory) (*PostgresBackend, error) { 56 62 rc, _ := lru.New2Q[string, *Repo](1_000_000) 57 63 pc, _ := lru.New2Q[string, cachedPostInfo](1_000_000) 58 64 revc, _ := lru.New2Q[uint, string](1_000_000) 65 + dbic, _ := lru.New2Q[uint, string](1_000_000) 59 66 60 67 b := &PostgresBackend{ 61 68 client: client, 62 69 mydid: mydid, 63 70 db: db, 64 71 pgx: pgx, 65 - tracker: tracker, 66 72 relevantDids: make(map[string]bool), 67 73 repoCache: rc, 68 74 postInfoCache: pc, 69 75 revCache: revc, 76 + didByIDCache: dbic, 77 + dir: dir, 78 + 79 + missingRecords: make(chan MissingRecord, 1000), 70 80 } 71 81 72 82 r, err := b.GetOrCreateRepo(context.TODO(), mydid) ··· 75 85 } 76 86 77 87 b.myrepo = r 88 + 89 + go b.missingRecordFetcher() 78 90 return b, nil 79 91 } 80 92 81 93 // TrackMissingRecord implements the RecordTracker interface 82 94 func (b *PostgresBackend) TrackMissingRecord(identifier string, wait bool) { 83 - if b.tracker != nil { 84 - b.tracker.TrackMissingRecord(identifier, wait) 95 + mr := MissingRecord{ 96 + Type: mrTypeFromIdent(identifier), 97 + Identifier: identifier, 98 + Wait: wait, 99 + } 100 + 101 + b.addMissingRecord(context.TODO(), mr) 102 + } 103 + 104 + func mrTypeFromIdent(ident string) MissingRecordType { 105 + if strings.HasPrefix(ident, "did:") { 106 + return MissingRecordTypeProfile 107 + } 108 + 109 + puri, _ := syntax.ParseATURI(ident) 110 + switch puri.Collection().String() { 111 + case "app.bsky.feed.post": 112 + return MissingRecordTypePost 113 + case "app.bsky.feed.generator": 114 + return MissingRecordTypeFeedGenerator 115 + default: 116 + return MissingRecordTypeUnknown 85 117 } 118 + 86 119 } 87 120 88 121 // DidToID converts a DID to a database ID ··· 363 396 } 364 397 365 398 return &r, nil 399 + } 400 + 401 + func (b *PostgresBackend) DidFromID(ctx context.Context, uid uint) (string, error) { 402 + val, ok := b.didByIDCache.Get(uid) 403 + if ok { 404 + return val, nil 405 + } 406 + 407 + r, err := b.GetRepoByID(ctx, uid) 408 + if err != nil { 409 + return "", err 410 + } 411 + 412 + b.didByIDCache.Add(uid, r.Did) 413 + return r.Did, nil 366 414 } 367 415 368 416 func (b *PostgresBackend) checkPostExists(ctx context.Context, repo *Repo, rkey string) (bool, error) {
+1208
backend/events.go
··· 1 + package backend 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "strings" 10 + "time" 11 + 12 + "github.com/bluesky-social/indigo/api/atproto" 13 + "github.com/bluesky-social/indigo/api/bsky" 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + lexutil "github.com/bluesky-social/indigo/lex/util" 16 + "github.com/bluesky-social/indigo/repo" 17 + jsmodels "github.com/bluesky-social/jetstream/pkg/models" 18 + "github.com/ipfs/go-cid" 19 + "github.com/jackc/pgx/v5/pgconn" 20 + "github.com/prometheus/client_golang/prometheus" 21 + "github.com/prometheus/client_golang/prometheus/promauto" 22 + 23 + . "github.com/whyrusleeping/konbini/models" 24 + ) 25 + 26 + var handleOpHist = promauto.NewHistogramVec(prometheus.HistogramOpts{ 27 + Name: "handle_op_duration", 28 + Help: "A histogram of op handling durations", 29 + Buckets: prometheus.ExponentialBuckets(1, 2, 15), 30 + }, []string{"op", "collection"}) 31 + 32 + func (b *PostgresBackend) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error { 33 + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 34 + if err != nil { 35 + return fmt.Errorf("failed to read event repo: %w", err) 36 + } 37 + 38 + for _, op := range evt.Ops { 39 + switch op.Action { 40 + case "create": 41 + c, rec, err := r.GetRecordBytes(ctx, op.Path) 42 + if err != nil { 43 + return err 44 + } 45 + if err := b.HandleCreate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil { 46 + return fmt.Errorf("create record failed: %w", err) 47 + } 48 + case "update": 49 + c, rec, err := r.GetRecordBytes(ctx, op.Path) 50 + if err != nil { 51 + return err 52 + } 53 + if err := b.HandleUpdate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil { 54 + return fmt.Errorf("update record failed: %w", err) 55 + } 56 + case "delete": 57 + if err := b.HandleDelete(ctx, evt.Repo, evt.Rev, op.Path); err != nil { 58 + return fmt.Errorf("delete record failed: %w", err) 59 + } 60 + } 61 + } 62 + 63 + // TODO: sync with the Since field to make sure we don't miss events we care about 64 + /* 65 + if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil { 66 + return fmt.Errorf("failed to update rev: %w", err) 67 + } 68 + */ 69 + 70 + return nil 71 + } 72 + 73 + func cborBytesFromEvent(evt *jsmodels.Event) ([]byte, error) { 74 + val, err := lexutil.NewFromType(evt.Commit.Collection) 75 + if err != nil { 76 + return nil, fmt.Errorf("failed to load event record type: %w", err) 77 + } 78 + 79 + if err := json.Unmarshal(evt.Commit.Record, val); err != nil { 80 + return nil, err 81 + } 82 + 83 + cval, ok := val.(lexutil.CBOR) 84 + if !ok { 85 + return nil, fmt.Errorf("decoded type was not cbor marshalable") 86 + } 87 + 88 + buf := new(bytes.Buffer) 89 + if err := cval.MarshalCBOR(buf); err != nil { 90 + return nil, fmt.Errorf("failed to marshal event to cbor: %w", err) 91 + } 92 + 93 + rec := buf.Bytes() 94 + return rec, nil 95 + } 96 + 97 + func (b *PostgresBackend) HandleEventJetstream(ctx context.Context, evt *jsmodels.Event) error { 98 + 99 + path := evt.Commit.Collection + "/" + evt.Commit.RKey 100 + switch evt.Commit.Operation { 101 + case jsmodels.CommitOperationCreate: 102 + rec, err := cborBytesFromEvent(evt) 103 + if err != nil { 104 + return err 105 + } 106 + 107 + c, err := cid.Decode(evt.Commit.CID) 108 + if err != nil { 109 + return err 110 + } 111 + 112 + if err := b.HandleCreate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil { 113 + return fmt.Errorf("create record failed: %w", err) 114 + } 115 + case jsmodels.CommitOperationUpdate: 116 + rec, err := cborBytesFromEvent(evt) 117 + if err != nil { 118 + return err 119 + } 120 + 121 + c, err := cid.Decode(evt.Commit.CID) 122 + if err != nil { 123 + return err 124 + } 125 + 126 + if err := b.HandleUpdate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil { 127 + return fmt.Errorf("update record failed: %w", err) 128 + } 129 + case jsmodels.CommitOperationDelete: 130 + if err := b.HandleDelete(ctx, evt.Did, evt.Commit.Rev, path); err != nil { 131 + return fmt.Errorf("delete record failed: %w", err) 132 + } 133 + } 134 + 135 + return nil 136 + } 137 + 138 + func (b *PostgresBackend) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error { 139 + start := time.Now() 140 + 141 + rr, err := b.GetOrCreateRepo(ctx, repo) 142 + if err != nil { 143 + return fmt.Errorf("get user failed: %w", err) 144 + } 145 + 146 + lrev, err := b.revForRepo(rr) 147 + if err != nil { 148 + return err 149 + } 150 + if lrev != "" { 151 + if rev < lrev { 152 + slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path) 153 + return nil 154 + } 155 + } 156 + 157 + parts := strings.Split(path, "/") 158 + if len(parts) != 2 { 159 + return fmt.Errorf("invalid path in HandleCreate: %q", path) 160 + } 161 + col := parts[0] 162 + rkey := parts[1] 163 + 164 + defer func() { 165 + handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds())) 166 + }() 167 + 168 + if rkey == "" { 169 + fmt.Printf("messed up path: %q\n", rkey) 170 + } 171 + 172 + switch col { 173 + case "app.bsky.feed.post": 174 + if err := b.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil { 175 + return err 176 + } 177 + case "app.bsky.feed.like": 178 + if err := b.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil { 179 + return err 180 + } 181 + case "app.bsky.feed.repost": 182 + if err := b.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil { 183 + return err 184 + } 185 + case "app.bsky.graph.follow": 186 + if err := b.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil { 187 + return err 188 + } 189 + case "app.bsky.graph.block": 190 + if err := b.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil { 191 + return err 192 + } 193 + case "app.bsky.graph.list": 194 + if err := b.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil { 195 + return err 196 + } 197 + case "app.bsky.graph.listitem": 198 + if err := b.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil { 199 + return err 200 + } 201 + case "app.bsky.graph.listblock": 202 + if err := b.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil { 203 + return err 204 + } 205 + case "app.bsky.actor.profile": 206 + if err := b.HandleCreateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil { 207 + return err 208 + } 209 + case "app.bsky.feed.generator": 210 + if err := b.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil { 211 + return err 212 + } 213 + case "app.bsky.feed.threadgate": 214 + if err := b.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil { 215 + return err 216 + } 217 + case "chat.bsky.actor.declaration": 218 + if err := b.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil { 219 + return err 220 + } 221 + case "app.bsky.feed.postgate": 222 + if err := b.HandleCreatePostGate(ctx, rr, rkey, *rec, *cid); err != nil { 223 + return err 224 + } 225 + case "app.bsky.graph.starterpack": 226 + if err := b.HandleCreateStarterPack(ctx, rr, rkey, *rec, *cid); err != nil { 227 + return err 228 + } 229 + default: 230 + slog.Debug("unrecognized record type", "repo", repo, "path", path, "rev", rev) 231 + } 232 + 233 + b.revCache.Add(rr.ID, rev) 234 + return nil 235 + } 236 + 237 + func (b *PostgresBackend) HandleCreatePost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 238 + exists, err := b.checkPostExists(ctx, repo, rkey) 239 + if err != nil { 240 + return err 241 + } 242 + 243 + // still technically a race condition if two creates for the same post happen concurrently... probably fine 244 + if exists { 245 + return nil 246 + } 247 + 248 + var rec bsky.FeedPost 249 + if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 250 + uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey 251 + slog.Warn("skipping post with malformed data", "uri", uri, "error", err) 252 + return nil // Skip this post rather than failing the entire event 253 + } 254 + 255 + reldids := []string{repo.Did} 256 + // care about a post if its in a thread of a user we are interested in 257 + if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil { 258 + reldids = append(reldids, rec.Reply.Parent.Uri, rec.Reply.Root.Uri) 259 + } 260 + // TODO: maybe also care if its mentioning a user we care about or quoting a user we care about? 261 + if !b.anyRelevantIdents(reldids...) { 262 + return nil 263 + } 264 + 265 + uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey 266 + slog.Warn("adding post", "uri", uri) 267 + 268 + created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 269 + if err != nil { 270 + return fmt.Errorf("invalid timestamp: %w", err) 271 + } 272 + 273 + p := Post{ 274 + Created: created.Time(), 275 + Indexed: time.Now(), 276 + Author: repo.ID, 277 + Rkey: rkey, 278 + Raw: recb, 279 + Cid: cc.String(), 280 + } 281 + 282 + if rec.Reply != nil && rec.Reply.Parent != nil { 283 + if rec.Reply.Root == nil { 284 + return fmt.Errorf("post reply had nil root") 285 + } 286 + 287 + pinfo, err := b.postInfoForUri(ctx, rec.Reply.Parent.Uri) 288 + if err != nil { 289 + return fmt.Errorf("getting reply parent: %w", err) 290 + } 291 + 292 + p.ReplyTo = pinfo.ID 293 + p.ReplyToUsr = pinfo.Author 294 + 295 + thread, err := b.postIDForUri(ctx, rec.Reply.Root.Uri) 296 + if err != nil { 297 + return fmt.Errorf("getting thread root: %w", err) 298 + } 299 + 300 + p.InThread = thread 301 + 302 + r, err := b.GetOrCreateRepo(ctx, b.mydid) 303 + if err != nil { 304 + return err 305 + } 306 + 307 + if p.ReplyToUsr == r.ID { 308 + if err := b.AddNotification(ctx, r.ID, p.Author, uri, cc, NotifKindReply); err != nil { 309 + slog.Warn("failed to create notification", "uri", uri, "error", err) 310 + } 311 + } 312 + } 313 + 314 + if rec.Embed != nil { 315 + var rpref string 316 + if rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil { 317 + rpref = rec.Embed.EmbedRecord.Record.Uri 318 + } 319 + if rec.Embed.EmbedRecordWithMedia != nil && 320 + rec.Embed.EmbedRecordWithMedia.Record != nil && 321 + rec.Embed.EmbedRecordWithMedia.Record.Record != nil { 322 + rpref = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri 323 + } 324 + 325 + if rpref != "" && strings.Contains(rpref, "app.bsky.feed.post") { 326 + rp, err := b.postIDForUri(ctx, rpref) 327 + if err != nil { 328 + return fmt.Errorf("getting quote subject: %w", err) 329 + } 330 + 331 + p.Reposting = rp 332 + } 333 + } 334 + 335 + if err := b.doPostCreate(ctx, &p); err != nil { 336 + return err 337 + } 338 + 339 + // Check for mentions and create notifications 340 + if rec.Facets != nil { 341 + for _, facet := range rec.Facets { 342 + for _, feature := range facet.Features { 343 + if feature.RichtextFacet_Mention != nil { 344 + mentionDid := feature.RichtextFacet_Mention.Did 345 + // This is a mention 346 + mentionedRepo, err := b.GetOrCreateRepo(ctx, mentionDid) 347 + if err != nil { 348 + slog.Warn("failed to get repo for mention", "did", mentionDid, "error", err) 349 + continue 350 + } 351 + 352 + // Create notification if the mentioned user is the current user 353 + if mentionedRepo.ID == b.myrepo.ID { 354 + if err := b.AddNotification(ctx, b.myrepo.ID, p.Author, uri, cc, NotifKindMention); err != nil { 355 + slog.Warn("failed to create mention notification", "uri", uri, "error", err) 356 + } 357 + } 358 + } 359 + } 360 + } 361 + } 362 + 363 + b.postInfoCache.Add(uri, cachedPostInfo{ 364 + ID: p.ID, 365 + Author: p.Author, 366 + }) 367 + 368 + return nil 369 + } 370 + 371 + func (b *PostgresBackend) doPostCreate(ctx context.Context, p *Post) error { 372 + /* 373 + if err := b.db.Clauses(clause.OnConflict{ 374 + Columns: []clause.Column{{Name: "author"}, {Name: "rkey"}}, 375 + DoUpdates: clause.AssignmentColumns([]string{"cid", "not_found", "raw", "created", "indexed"}), 376 + }).Create(p).Error; err != nil { 377 + return err 378 + } 379 + */ 380 + 381 + query := ` 382 + INSERT INTO posts (author, rkey, cid, not_found, raw, created, indexed, reposting, reply_to, reply_to_usr, in_thread) 383 + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) 384 + ON CONFLICT (author, rkey) 385 + DO UPDATE SET 386 + cid = $3, 387 + not_found = $4, 388 + raw = $5, 389 + created = $6, 390 + indexed = $7, 391 + reposting = $8, 392 + reply_to = $9, 393 + reply_to_usr = $10, 394 + in_thread = $11 395 + RETURNING id 396 + ` 397 + 398 + // Execute the query with parameters from the Post struct 399 + if err := b.pgx.QueryRow( 400 + ctx, 401 + query, 402 + p.Author, 403 + p.Rkey, 404 + p.Cid, 405 + p.NotFound, 406 + p.Raw, 407 + p.Created, 408 + p.Indexed, 409 + p.Reposting, 410 + p.ReplyTo, 411 + p.ReplyToUsr, 412 + p.InThread, 413 + ).Scan(&p.ID); err != nil { 414 + return err 415 + } 416 + 417 + return nil 418 + } 419 + 420 + func (b *PostgresBackend) HandleCreateLike(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 421 + var rec bsky.FeedLike 422 + if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 423 + return err 424 + } 425 + 426 + if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) { 427 + return nil 428 + } 429 + 430 + created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 431 + if err != nil { 432 + return fmt.Errorf("invalid timestamp: %w", err) 433 + } 434 + 435 + pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri) 436 + if err != nil { 437 + return fmt.Errorf("getting like subject: %w", err) 438 + } 439 + 440 + if _, err := b.pgx.Exec(ctx, `INSERT INTO "likes" ("created","indexed","author","rkey","subject","cid") VALUES ($1, $2, $3, $4, $5, $6)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID, cc.String()); err != nil { 441 + pgErr, ok := err.(*pgconn.PgError) 442 + if ok && pgErr.Code == "23505" { 443 + return nil 444 + } 445 + return err 446 + } 447 + 448 + // Create notification if the liked post belongs to the current user 449 + if pinfo.Author == b.myrepo.ID { 450 + uri := fmt.Sprintf("at://%s/app.bsky.feed.like/%s", repo.Did, rkey) 451 + if err := b.AddNotification(ctx, b.myrepo.ID, repo.ID, uri, cc, NotifKindLike); err != nil { 452 + slog.Warn("failed to create like notification", "uri", uri, "error", err) 453 + } 454 + } 455 + 456 + return nil 457 + } 458 + 459 + func (b *PostgresBackend) HandleCreateRepost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 460 + var rec bsky.FeedRepost 461 + if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 462 + return err 463 + } 464 + 465 + if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) { 466 + return nil 467 + } 468 + 469 + created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 470 + if err != nil { 471 + return fmt.Errorf("invalid timestamp: %w", err) 472 + } 473 + 474 + pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri) 475 + if err != nil { 476 + return fmt.Errorf("getting repost subject: %w", err) 477 + } 478 + 479 + if _, err := b.pgx.Exec(ctx, `INSERT INTO "reposts" ("created","indexed","author","rkey","subject") VALUES ($1, $2, $3, $4, $5)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID); err != nil { 480 + pgErr, ok := err.(*pgconn.PgError) 481 + if ok && pgErr.Code == "23505" { 482 + return nil 483 + } 484 + return err 485 + } 486 + 487 + // Create notification if the reposted post belongs to the current user 488 + if pinfo.Author == b.myrepo.ID { 489 + uri := fmt.Sprintf("at://%s/app.bsky.feed.repost/%s", repo.Did, rkey) 490 + if err := b.AddNotification(ctx, b.myrepo.ID, repo.ID, uri, cc, NotifKindRepost); err != nil { 491 + slog.Warn("failed to create repost notification", "uri", uri, "error", err) 492 + } 493 + } 494 + 495 + return nil 496 + } 497 + 498 + func (b *PostgresBackend) HandleCreateFollow(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 499 + var rec bsky.GraphFollow 500 + if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 501 + return err 502 + } 503 + 504 + if !b.anyRelevantIdents(repo.Did, rec.Subject) { 505 + return nil 506 + } 507 + 508 + created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 509 + if err != nil { 510 + return fmt.Errorf("invalid timestamp: %w", err) 511 + } 512 + 513 + subj, err := b.GetOrCreateRepo(ctx, rec.Subject) 514 + if err != nil { 515 + return err 516 + } 517 + 518 + if _, err := b.pgx.Exec(ctx, "INSERT INTO follows (created, indexed, author, rkey, subject) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING", created.Time(), time.Now(), repo.ID, rkey, subj.ID); err != nil { 519 + return err 520 + } 521 + 522 + return nil 523 + } 524 + 525 + func (b *PostgresBackend) HandleCreateBlock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 526 + var rec bsky.GraphBlock 527 + if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 528 + return err 529 + } 530 + 531 + if !b.anyRelevantIdents(repo.Did, rec.Subject) { 532 + return nil 533 + } 534 + 535 + created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 536 + if err != nil { 537 + return fmt.Errorf("invalid timestamp: %w", err) 538 + } 539 + 540 + subj, err := b.GetOrCreateRepo(ctx, rec.Subject) 541 + if err != nil { 542 + return err 543 + } 544 + 545 + if err := b.db.Create(&Block{ 546 + Created: created.Time(), 547 + Indexed: time.Now(), 548 + Author: repo.ID, 549 + Rkey: rkey, 550 + Subject: subj.ID, 551 + }).Error; err != nil { 552 + return err 553 + } 554 + 555 + return nil 556 + } 557 + 558 + func (b *PostgresBackend) HandleCreateList(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 559 + var rec bsky.GraphList 560 + if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 561 + return err 562 + } 563 + 564 + if !b.anyRelevantIdents(repo.Did) { 565 + return nil 566 + } 567 + 568 + created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 569 + if err != nil { 570 + return fmt.Errorf("invalid timestamp: %w", err) 571 + } 572 + 573 + if err := b.db.Create(&List{ 574 + Created: created.Time(), 575 + Indexed: time.Now(), 576 + Author: repo.ID, 577 + Rkey: rkey, 578 + Raw: recb, 579 + }).Error; err != nil { 580 + return err 581 + } 582 + 583 + return nil 584 + } 585 + 586 + func (b *PostgresBackend) HandleCreateListitem(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 587 + var rec bsky.GraphListitem 588 + if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 589 + return err 590 + } 591 + if !b.anyRelevantIdents(repo.Did) { 592 + return nil 593 + } 594 + 595 + created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 596 + if err != nil { 597 + return fmt.Errorf("invalid timestamp: %w", err) 598 + } 599 + 600 + subj, err := b.GetOrCreateRepo(ctx, rec.Subject) 601 + if err != nil { 602 + return err 603 + } 604 + 605 + list, err := b.GetOrCreateList(ctx, rec.List) 606 + if err != nil { 607 + return err 608 + } 609 + 610 + if err := b.db.Create(&ListItem{ 611 + Created: created.Time(), 612 + Indexed: time.Now(), 613 + Author: repo.ID, 614 + Rkey: rkey, 615 + Subject: subj.ID, 616 + List: list.ID, 617 + }).Error; err != nil { 618 + return err 619 + } 620 + 621 + return nil 622 + } 623 + 624 + func (b *PostgresBackend) HandleCreateListblock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 625 + var rec bsky.GraphListblock 626 + if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 627 + return err 628 + } 629 + 630 + if !b.anyRelevantIdents(repo.Did, rec.Subject) { 631 + return nil 632 + } 633 + 634 + created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 635 + if err != nil { 636 + return fmt.Errorf("invalid timestamp: %w", err) 637 + } 638 + 639 + list, err := b.GetOrCreateList(ctx, rec.Subject) 640 + if err != nil { 641 + return err 642 + } 643 + 644 + if err := b.db.Create(&ListBlock{ 645 + Created: created.Time(), 646 + Indexed: time.Now(), 647 + Author: repo.ID, 648 + Rkey: rkey, 649 + List: list.ID, 650 + }).Error; err != nil { 651 + return err 652 + } 653 + 654 + return nil 655 + } 656 + 657 + func (b *PostgresBackend) HandleCreateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error { 658 + if !b.anyRelevantIdents(repo.Did) { 659 + return nil 660 + } 661 + 662 + if err := b.db.Create(&Profile{ 663 + //Created: created.Time(), 664 + Indexed: time.Now(), 665 + Repo: repo.ID, 666 + Raw: recb, 667 + Rev: rev, 668 + }).Error; err != nil { 669 + return err 670 + } 671 + 672 + return nil 673 + } 674 + 675 + func (b *PostgresBackend) HandleUpdateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error { 676 + if !b.anyRelevantIdents(repo.Did) { 677 + return nil 678 + } 679 + 680 + if err := b.db.Create(&Profile{ 681 + Indexed: time.Now(), 682 + Repo: repo.ID, 683 + Raw: recb, 684 + Rev: rev, 685 + }).Error; err != nil { 686 + return err 687 + } 688 + 689 + return nil 690 + } 691 + 692 + func (b *PostgresBackend) HandleCreateFeedGenerator(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 693 + if !b.anyRelevantIdents(repo.Did) { 694 + return nil 695 + } 696 + 697 + var rec bsky.FeedGenerator 698 + if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 699 + return err 700 + } 701 + 702 + created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 703 + if err != nil { 704 + return fmt.Errorf("invalid timestamp: %w", err) 705 + } 706 + 707 + if err := b.db.Create(&FeedGenerator{ 708 + Created: created.Time(), 709 + Indexed: time.Now(), 710 + Author: repo.ID, 711 + Rkey: rkey, 712 + Did: rec.Did, 713 + Raw: recb, 714 + }).Error; err != nil { 715 + return err 716 + } 717 + 718 + return nil 719 + } 720 + 721 + func (b *PostgresBackend) HandleCreateThreadgate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 722 + if !b.anyRelevantIdents(repo.Did) { 723 + return nil 724 + } 725 + var rec bsky.FeedThreadgate 726 + if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 727 + return err 728 + } 729 + 730 + created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 731 + if err != nil { 732 + return fmt.Errorf("invalid timestamp: %w", err) 733 + } 734 + 735 + pid, err := b.postIDForUri(ctx, rec.Post) 736 + if err != nil { 737 + return err 738 + } 739 + 740 + if err := b.db.Create(&ThreadGate{ 741 + Created: created.Time(), 742 + Indexed: time.Now(), 743 + Author: repo.ID, 744 + Rkey: rkey, 745 + Post: pid, 746 + }).Error; err != nil { 747 + return err 748 + } 749 + 750 + return nil 751 + } 752 + 753 + func (b *PostgresBackend) HandleCreateChatDeclaration(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 754 + // TODO: maybe track these? 755 + return nil 756 + } 757 + 758 + func (b *PostgresBackend) HandleCreatePostGate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 759 + if !b.anyRelevantIdents(repo.Did) { 760 + return nil 761 + } 762 + var rec bsky.FeedPostgate 763 + if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 764 + return err 765 + } 766 + created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 767 + if err != nil { 768 + return fmt.Errorf("invalid timestamp: %w", err) 769 + } 770 + 771 + refPost, err := b.postInfoForUri(ctx, rec.Post) 772 + if err != nil { 773 + return err 774 + } 775 + 776 + if err := b.db.Create(&PostGate{ 777 + Created: created.Time(), 778 + Indexed: time.Now(), 779 + Author: repo.ID, 780 + Rkey: rkey, 781 + Subject: refPost.ID, 782 + Raw: recb, 783 + }).Error; err != nil { 784 + return err 785 + } 786 + 787 + return nil 788 + } 789 + 790 + func (b *PostgresBackend) HandleCreateStarterPack(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 791 + if !b.anyRelevantIdents(repo.Did) { 792 + return nil 793 + } 794 + var rec bsky.GraphStarterpack 795 + if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 796 + return err 797 + } 798 + created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 799 + if err != nil { 800 + return fmt.Errorf("invalid timestamp: %w", err) 801 + } 802 + 803 + list, err := b.GetOrCreateList(ctx, rec.List) 804 + if err != nil { 805 + return err 806 + } 807 + 808 + if err := b.db.Create(&StarterPack{ 809 + Created: created.Time(), 810 + Indexed: time.Now(), 811 + Author: repo.ID, 812 + Rkey: rkey, 813 + Raw: recb, 814 + List: list.ID, 815 + }).Error; err != nil { 816 + return err 817 + } 818 + 819 + return nil 820 + } 821 + 822 + func (b *PostgresBackend) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error { 823 + start := time.Now() 824 + 825 + rr, err := b.GetOrCreateRepo(ctx, repo) 826 + if err != nil { 827 + return fmt.Errorf("get user failed: %w", err) 828 + } 829 + 830 + lrev, err := b.revForRepo(rr) 831 + if err != nil { 832 + return err 833 + } 834 + if lrev != "" { 835 + if rev < lrev { 836 + //slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path) 837 + return nil 838 + } 839 + } 840 + 841 + parts := strings.Split(path, "/") 842 + if len(parts) != 2 { 843 + return fmt.Errorf("invalid path in HandleCreate: %q", path) 844 + } 845 + col := parts[0] 846 + rkey := parts[1] 847 + 848 + defer func() { 849 + handleOpHist.WithLabelValues("update", col).Observe(float64(time.Since(start).Milliseconds())) 850 + }() 851 + 852 + if rkey == "" { 853 + fmt.Printf("messed up path: %q\n", rkey) 854 + } 855 + 856 + switch col { 857 + /* 858 + case "app.bsky.feed.post": 859 + if err := s.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil { 860 + return err 861 + } 862 + case "app.bsky.feed.like": 863 + if err := s.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil { 864 + return err 865 + } 866 + case "app.bsky.feed.repost": 867 + if err := s.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil { 868 + return err 869 + } 870 + case "app.bsky.graph.follow": 871 + if err := s.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil { 872 + return err 873 + } 874 + case "app.bsky.graph.block": 875 + if err := s.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil { 876 + return err 877 + } 878 + case "app.bsky.graph.list": 879 + if err := s.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil { 880 + return err 881 + } 882 + case "app.bsky.graph.listitem": 883 + if err := s.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil { 884 + return err 885 + } 886 + case "app.bsky.graph.listblock": 887 + if err := s.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil { 888 + return err 889 + } 890 + */ 891 + case "app.bsky.actor.profile": 892 + if err := b.HandleUpdateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil { 893 + return err 894 + } 895 + /* 896 + case "app.bsky.feed.generator": 897 + if err := s.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil { 898 + return err 899 + } 900 + case "app.bsky.feed.threadgate": 901 + if err := s.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil { 902 + return err 903 + } 904 + case "chat.bsky.actor.declaration": 905 + if err := s.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil { 906 + return err 907 + } 908 + */ 909 + default: 910 + slog.Debug("unrecognized record type in update", "repo", repo, "path", path, "rev", rev) 911 + } 912 + 913 + return nil 914 + } 915 + 916 + func (b *PostgresBackend) HandleDelete(ctx context.Context, repo string, rev string, path string) error { 917 + start := time.Now() 918 + 919 + rr, err := b.GetOrCreateRepo(ctx, repo) 920 + if err != nil { 921 + return fmt.Errorf("get user failed: %w", err) 922 + } 923 + 924 + lrev, ok := b.revCache.Get(rr.ID) 925 + if ok { 926 + if rev < lrev { 927 + //slog.Info("skipping old rev delete", "did", rr.Did, "rev", rev, "oldrev", lrev) 928 + return nil 929 + } 930 + } 931 + 932 + parts := strings.Split(path, "/") 933 + if len(parts) != 2 { 934 + return fmt.Errorf("invalid path in HandleDelete: %q", path) 935 + } 936 + col := parts[0] 937 + rkey := parts[1] 938 + 939 + defer func() { 940 + handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds())) 941 + }() 942 + 943 + switch col { 944 + case "app.bsky.feed.post": 945 + if err := b.HandleDeletePost(ctx, rr, rkey); err != nil { 946 + return err 947 + } 948 + case "app.bsky.feed.like": 949 + if err := b.HandleDeleteLike(ctx, rr, rkey); err != nil { 950 + return err 951 + } 952 + case "app.bsky.feed.repost": 953 + if err := b.HandleDeleteRepost(ctx, rr, rkey); err != nil { 954 + return err 955 + } 956 + case "app.bsky.graph.follow": 957 + if err := b.HandleDeleteFollow(ctx, rr, rkey); err != nil { 958 + return err 959 + } 960 + case "app.bsky.graph.block": 961 + if err := b.HandleDeleteBlock(ctx, rr, rkey); err != nil { 962 + return err 963 + } 964 + case "app.bsky.graph.list": 965 + if err := b.HandleDeleteList(ctx, rr, rkey); err != nil { 966 + return err 967 + } 968 + case "app.bsky.graph.listitem": 969 + if err := b.HandleDeleteListitem(ctx, rr, rkey); err != nil { 970 + return err 971 + } 972 + case "app.bsky.graph.listblock": 973 + if err := b.HandleDeleteListblock(ctx, rr, rkey); err != nil { 974 + return err 975 + } 976 + case "app.bsky.actor.profile": 977 + if err := b.HandleDeleteProfile(ctx, rr, rkey); err != nil { 978 + return err 979 + } 980 + case "app.bsky.feed.generator": 981 + if err := b.HandleDeleteFeedGenerator(ctx, rr, rkey); err != nil { 982 + return err 983 + } 984 + case "app.bsky.feed.threadgate": 985 + if err := b.HandleDeleteThreadgate(ctx, rr, rkey); err != nil { 986 + return err 987 + } 988 + default: 989 + slog.Warn("delete unrecognized record type", "repo", repo, "path", path, "rev", rev) 990 + } 991 + 992 + b.revCache.Add(rr.ID, rev) 993 + return nil 994 + } 995 + 996 + func (b *PostgresBackend) HandleDeletePost(ctx context.Context, repo *Repo, rkey string) error { 997 + var p Post 998 + if err := b.db.Find(&p, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 999 + return err 1000 + } 1001 + 1002 + if p.ID == 0 { 1003 + //slog.Warn("delete of unknown post record", "repo", repo.Did, "rkey", rkey) 1004 + return nil 1005 + } 1006 + 1007 + if err := b.db.Delete(&Post{}, p.ID).Error; err != nil { 1008 + return err 1009 + } 1010 + 1011 + return nil 1012 + } 1013 + 1014 + func (b *PostgresBackend) HandleDeleteLike(ctx context.Context, repo *Repo, rkey string) error { 1015 + var like Like 1016 + if err := b.db.Find(&like, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1017 + return err 1018 + } 1019 + 1020 + if like.ID == 0 { 1021 + //slog.Warn("delete of missing like", "repo", repo.Did, "rkey", rkey) 1022 + return nil 1023 + } 1024 + 1025 + if err := b.db.Exec("DELETE FROM likes WHERE id = ?", like.ID).Error; err != nil { 1026 + return err 1027 + } 1028 + 1029 + return nil 1030 + } 1031 + 1032 + func (b *PostgresBackend) HandleDeleteRepost(ctx context.Context, repo *Repo, rkey string) error { 1033 + var repost Repost 1034 + if err := b.db.Find(&repost, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1035 + return err 1036 + } 1037 + 1038 + if repost.ID == 0 { 1039 + //return fmt.Errorf("delete of missing repost: %s %s", repo.Did, rkey) 1040 + return nil 1041 + } 1042 + 1043 + if err := b.db.Exec("DELETE FROM reposts WHERE id = ?", repost.ID).Error; err != nil { 1044 + return err 1045 + } 1046 + 1047 + return nil 1048 + } 1049 + 1050 + func (b *PostgresBackend) HandleDeleteFollow(ctx context.Context, repo *Repo, rkey string) error { 1051 + var follow Follow 1052 + if err := b.db.Find(&follow, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1053 + return err 1054 + } 1055 + 1056 + if follow.ID == 0 { 1057 + //slog.Warn("delete of missing follow", "repo", repo.Did, "rkey", rkey) 1058 + return nil 1059 + } 1060 + 1061 + if err := b.db.Exec("DELETE FROM follows WHERE id = ?", follow.ID).Error; err != nil { 1062 + return err 1063 + } 1064 + 1065 + return nil 1066 + } 1067 + 1068 + func (b *PostgresBackend) HandleDeleteBlock(ctx context.Context, repo *Repo, rkey string) error { 1069 + var block Block 1070 + if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1071 + return err 1072 + } 1073 + 1074 + if block.ID == 0 { 1075 + //slog.Warn("delete of missing block", "repo", repo.Did, "rkey", rkey) 1076 + return nil 1077 + } 1078 + 1079 + if err := b.db.Exec("DELETE FROM blocks WHERE id = ?", block.ID).Error; err != nil { 1080 + return err 1081 + } 1082 + 1083 + return nil 1084 + } 1085 + 1086 + func (b *PostgresBackend) HandleDeleteList(ctx context.Context, repo *Repo, rkey string) error { 1087 + var list List 1088 + if err := b.db.Find(&list, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1089 + return err 1090 + } 1091 + 1092 + if list.ID == 0 { 1093 + return nil 1094 + //return fmt.Errorf("delete of missing list: %s %s", repo.Did, rkey) 1095 + } 1096 + 1097 + if err := b.db.Exec("DELETE FROM lists WHERE id = ?", list.ID).Error; err != nil { 1098 + return err 1099 + } 1100 + 1101 + return nil 1102 + } 1103 + 1104 + func (b *PostgresBackend) HandleDeleteListitem(ctx context.Context, repo *Repo, rkey string) error { 1105 + var item ListItem 1106 + if err := b.db.Find(&item, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1107 + return err 1108 + } 1109 + 1110 + if item.ID == 0 { 1111 + return nil 1112 + //return fmt.Errorf("delete of missing listitem: %s %s", repo.Did, rkey) 1113 + } 1114 + 1115 + if err := b.db.Exec("DELETE FROM list_items WHERE id = ?", item.ID).Error; err != nil { 1116 + return err 1117 + } 1118 + 1119 + return nil 1120 + } 1121 + 1122 + func (b *PostgresBackend) HandleDeleteListblock(ctx context.Context, repo *Repo, rkey string) error { 1123 + var block ListBlock 1124 + if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1125 + return err 1126 + } 1127 + 1128 + if block.ID == 0 { 1129 + return nil 1130 + //return fmt.Errorf("delete of missing listblock: %s %s", repo.Did, rkey) 1131 + } 1132 + 1133 + if err := b.db.Exec("DELETE FROM list_blocks WHERE id = ?", block.ID).Error; err != nil { 1134 + return err 1135 + } 1136 + 1137 + return nil 1138 + } 1139 + 1140 + func (b *PostgresBackend) HandleDeleteFeedGenerator(ctx context.Context, repo *Repo, rkey string) error { 1141 + var feedgen FeedGenerator 1142 + if err := b.db.Find(&feedgen, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1143 + return err 1144 + } 1145 + 1146 + if feedgen.ID == 0 { 1147 + return nil 1148 + //return fmt.Errorf("delete of missing feedgen: %s %s", repo.Did, rkey) 1149 + } 1150 + 1151 + if err := b.db.Exec("DELETE FROM feed_generators WHERE id = ?", feedgen.ID).Error; err != nil { 1152 + return err 1153 + } 1154 + 1155 + return nil 1156 + } 1157 + 1158 + func (b *PostgresBackend) HandleDeleteThreadgate(ctx context.Context, repo *Repo, rkey string) error { 1159 + var threadgate ThreadGate 1160 + if err := b.db.Find(&threadgate, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1161 + return err 1162 + } 1163 + 1164 + if threadgate.ID == 0 { 1165 + return nil 1166 + //return fmt.Errorf("delete of missing threadgate: %s %s", repo.Did, rkey) 1167 + } 1168 + 1169 + if err := b.db.Exec("DELETE FROM thread_gates WHERE id = ?", threadgate.ID).Error; err != nil { 1170 + return err 1171 + } 1172 + 1173 + return nil 1174 + } 1175 + 1176 + func (b *PostgresBackend) HandleDeleteProfile(ctx context.Context, repo *Repo, rkey string) error { 1177 + var profile Profile 1178 + if err := b.db.Find(&profile, "repo = ?", repo.ID).Error; err != nil { 1179 + return err 1180 + } 1181 + 1182 + if profile.ID == 0 { 1183 + return nil 1184 + } 1185 + 1186 + if err := b.db.Exec("DELETE FROM profiles WHERE id = ?", profile.ID).Error; err != nil { 1187 + return err 1188 + } 1189 + 1190 + return nil 1191 + } 1192 + 1193 + const ( 1194 + NotifKindReply = "reply" 1195 + NotifKindLike = "like" 1196 + NotifKindMention = "mention" 1197 + NotifKindRepost = "repost" 1198 + ) 1199 + 1200 + func (b *PostgresBackend) AddNotification(ctx context.Context, forUser, author uint, recordUri string, recordCid cid.Cid, kind string) error { 1201 + return b.db.Create(&Notification{ 1202 + For: forUser, 1203 + Author: author, 1204 + Source: recordUri, 1205 + SourceCid: recordCid.String(), 1206 + Kind: kind, 1207 + }).Error 1208 + }
+211
backend/missing.go
··· 1 + package backend 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "log/slog" 8 + 9 + "github.com/bluesky-social/indigo/api/atproto" 10 + "github.com/bluesky-social/indigo/api/bsky" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + xrpclib "github.com/bluesky-social/indigo/xrpc" 13 + "github.com/ipfs/go-cid" 14 + ) 15 + 16 + type MissingRecordType string 17 + 18 + const ( 19 + MissingRecordTypeProfile MissingRecordType = "profile" 20 + MissingRecordTypePost MissingRecordType = "post" 21 + MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator" 22 + MissingRecordTypeUnknown MissingRecordType = "unknown" 23 + ) 24 + 25 + type MissingRecord struct { 26 + Type MissingRecordType 27 + Identifier string // DID for profiles, AT-URI for posts/feedgens 28 + Wait bool 29 + 30 + waitch chan struct{} 31 + } 32 + 33 + func (b *PostgresBackend) addMissingRecord(ctx context.Context, rec MissingRecord) { 34 + if rec.Wait { 35 + rec.waitch = make(chan struct{}) 36 + } 37 + 38 + select { 39 + case b.missingRecords <- rec: 40 + case <-ctx.Done(): 41 + } 42 + 43 + if rec.Wait { 44 + select { 45 + case <-rec.waitch: 46 + case <-ctx.Done(): 47 + } 48 + } 49 + } 50 + 51 + func (b *PostgresBackend) missingRecordFetcher() { 52 + for rec := range b.missingRecords { 53 + var err error 54 + switch rec.Type { 55 + case MissingRecordTypeProfile: 56 + err = b.fetchMissingProfile(context.TODO(), rec.Identifier) 57 + case MissingRecordTypePost: 58 + err = b.fetchMissingPost(context.TODO(), rec.Identifier) 59 + case MissingRecordTypeFeedGenerator: 60 + err = b.fetchMissingFeedGenerator(context.TODO(), rec.Identifier) 61 + default: 62 + slog.Error("unknown missing record type", "type", rec.Type) 63 + continue 64 + } 65 + 66 + if err != nil { 67 + slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err) 68 + } 69 + 70 + if rec.Wait { 71 + close(rec.waitch) 72 + } 73 + } 74 + } 75 + 76 + func (b *PostgresBackend) fetchMissingProfile(ctx context.Context, did string) error { 77 + b.AddRelevantDid(did) 78 + 79 + repo, err := b.GetOrCreateRepo(ctx, did) 80 + if err != nil { 81 + return err 82 + } 83 + 84 + resp, err := b.dir.LookupDID(ctx, syntax.DID(did)) 85 + if err != nil { 86 + return err 87 + } 88 + 89 + c := &xrpclib.Client{ 90 + Host: resp.PDSEndpoint(), 91 + } 92 + 93 + rec, err := atproto.RepoGetRecord(ctx, c, "", "app.bsky.actor.profile", did, "self") 94 + if err != nil { 95 + return err 96 + } 97 + 98 + prof, ok := rec.Value.Val.(*bsky.ActorProfile) 99 + if !ok { 100 + return fmt.Errorf("record we got back wasnt a profile somehow") 101 + } 102 + 103 + buf := new(bytes.Buffer) 104 + if err := prof.MarshalCBOR(buf); err != nil { 105 + return err 106 + } 107 + 108 + cc, err := cid.Decode(*rec.Cid) 109 + if err != nil { 110 + return err 111 + } 112 + 113 + return b.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc) 114 + } 115 + 116 + func (b *PostgresBackend) fetchMissingPost(ctx context.Context, uri string) error { 117 + puri, err := syntax.ParseATURI(uri) 118 + if err != nil { 119 + return fmt.Errorf("invalid AT URI: %s", uri) 120 + } 121 + 122 + did := puri.Authority().String() 123 + collection := puri.Collection().String() 124 + rkey := puri.RecordKey().String() 125 + 126 + b.AddRelevantDid(did) 127 + 128 + repo, err := b.GetOrCreateRepo(ctx, did) 129 + if err != nil { 130 + return err 131 + } 132 + 133 + resp, err := b.dir.LookupDID(ctx, syntax.DID(did)) 134 + if err != nil { 135 + return err 136 + } 137 + 138 + c := &xrpclib.Client{ 139 + Host: resp.PDSEndpoint(), 140 + } 141 + 142 + rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey) 143 + if err != nil { 144 + return err 145 + } 146 + 147 + post, ok := rec.Value.Val.(*bsky.FeedPost) 148 + if !ok { 149 + return fmt.Errorf("record we got back wasn't a post somehow") 150 + } 151 + 152 + buf := new(bytes.Buffer) 153 + if err := post.MarshalCBOR(buf); err != nil { 154 + return err 155 + } 156 + 157 + cc, err := cid.Decode(*rec.Cid) 158 + if err != nil { 159 + return err 160 + } 161 + 162 + return b.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc) 163 + } 164 + 165 + func (b *PostgresBackend) fetchMissingFeedGenerator(ctx context.Context, uri string) error { 166 + puri, err := syntax.ParseATURI(uri) 167 + if err != nil { 168 + return fmt.Errorf("invalid AT URI: %s", uri) 169 + } 170 + 171 + did := puri.Authority().String() 172 + collection := puri.Collection().String() 173 + rkey := puri.RecordKey().String() 174 + b.AddRelevantDid(did) 175 + 176 + repo, err := b.GetOrCreateRepo(ctx, did) 177 + if err != nil { 178 + return err 179 + } 180 + 181 + resp, err := b.dir.LookupDID(ctx, syntax.DID(did)) 182 + if err != nil { 183 + return err 184 + } 185 + 186 + c := &xrpclib.Client{ 187 + Host: resp.PDSEndpoint(), 188 + } 189 + 190 + rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey) 191 + if err != nil { 192 + return err 193 + } 194 + 195 + feedGen, ok := rec.Value.Val.(*bsky.FeedGenerator) 196 + if !ok { 197 + return fmt.Errorf("record we got back wasn't a feed generator somehow") 198 + } 199 + 200 + buf := new(bytes.Buffer) 201 + if err := feedGen.MarshalCBOR(buf); err != nil { 202 + return err 203 + } 204 + 205 + cc, err := cid.Decode(*rec.Cid) 206 + if err != nil { 207 + return err 208 + } 209 + 210 + return b.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc) 211 + }
+6 -5
go.mod
··· 3 3 go 1.25.1 4 4 5 5 require ( 6 - github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f 6 + github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe 7 + github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 7 8 github.com/gorilla/websocket v1.5.1 8 9 github.com/hashicorp/golang-lru/v2 v2.0.7 9 10 github.com/ipfs/go-cid v0.4.1 ··· 60 61 github.com/ipfs/go-metrics-interface v0.0.1 // indirect 61 62 github.com/ipfs/go-peertaskqueue v0.8.1 // indirect 62 63 github.com/ipfs/go-verifcid v0.0.3 // indirect 63 - github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 // indirect 64 + github.com/ipld/go-car v0.6.2 // indirect 64 65 github.com/ipld/go-codec-dagpb v1.6.0 // indirect 65 66 github.com/ipld/go-ipld-prime v0.21.0 // indirect 66 67 github.com/jackc/pgpassfile v1.0.0 // indirect ··· 69 70 github.com/jbenet/goprocess v0.1.4 // indirect 70 71 github.com/jinzhu/inflection v1.0.0 // indirect 71 72 github.com/jinzhu/now v1.1.5 // indirect 72 - github.com/klauspost/compress v1.17.3 // indirect 73 + github.com/klauspost/compress v1.17.9 // indirect 73 74 github.com/klauspost/cpuid/v2 v2.2.7 // indirect 74 75 github.com/lestrrat-go/blackmagic v1.0.1 // indirect 75 76 github.com/lestrrat-go/httpcc v1.0.1 // indirect ··· 91 92 github.com/orandin/slog-gorm v1.3.2 // indirect 92 93 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect 93 94 github.com/prometheus/client_model v0.6.1 // indirect 94 - github.com/prometheus/common v0.48.0 // indirect 95 - github.com/prometheus/procfs v0.12.0 // indirect 95 + github.com/prometheus/common v0.54.0 // indirect 96 + github.com/prometheus/procfs v0.15.1 // indirect 96 97 github.com/redis/go-redis/v9 v9.3.0 // indirect 97 98 github.com/russross/blackfriday/v2 v2.1.0 // indirect 98 99 github.com/segmentio/asm v1.2.0 // indirect
+12 -10
go.sum
··· 6 6 github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= 7 7 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= 8 8 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= 9 - github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f h1:FugOoTzh0nCMTWGqNGsjttFWVPcwxaaGD3p/nE9V8qY= 10 - github.com/bluesky-social/indigo v0.0.0-20250909204019-c5eaa30f683f/go.mod h1:n6QE1NDPFoi7PRbMUZmc2y7FibCqiVU4ePpsvhHUBR8= 9 + github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe h1:VBhaqE5ewQgXbY5SfSWFZC/AwHFo7cHxZKFYi2ce9Yo= 10 + github.com/bluesky-social/indigo v0.0.0-20251009212240-20524de167fe/go.mod h1:RuQVrCGm42QNsgumKaR6se+XkFKfCPNwdCiTvqKRUck= 11 + github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1 h1:ovcRKN1iXZnY5WApVg+0Hw2RkwMH0ziA7lSAA8vellU= 12 + github.com/bluesky-social/jetstream v0.0.0-20251009222037-7d7efa58d7f1/go.mod h1:5PtGi4r/PjEVBBl+0xWuQn4mBEjr9h6xsfDBADS6cHs= 11 13 github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous= 12 14 github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c= 13 15 github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= ··· 158 160 github.com/ipfs/go-peertaskqueue v0.8.1/go.mod h1:Oxxd3eaK279FxeydSPPVGHzbwVeHjatZ2GA8XD+KbPU= 159 161 github.com/ipfs/go-verifcid v0.0.3 h1:gmRKccqhWDocCRkC+a59g5QW7uJw5bpX9HWBevXa0zs= 160 162 github.com/ipfs/go-verifcid v0.0.3/go.mod h1:gcCtGniVzelKrbk9ooUSX/pM3xlH73fZZJDzQJRvOUw= 161 - github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 h1:oFo19cBmcP0Cmg3XXbrr0V/c+xU9U1huEZp8+OgBzdI= 162 - github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4/go.mod h1:6nkFF8OmR5wLKBzRKi7/YFJpyYR7+oEn1DX+mMWnlLA= 163 + github.com/ipld/go-car v0.6.2 h1:Hlnl3Awgnq8icK+ze3iRghk805lu8YNq3wlREDTF2qc= 164 + github.com/ipld/go-car v0.6.2/go.mod h1:oEGXdwp6bmxJCZ+rARSkDliTeYnVzv3++eXajZ+Bmr8= 163 165 github.com/ipld/go-car/v2 v2.13.1 h1:KnlrKvEPEzr5IZHKTXLAEub+tPrzeAFQVRlSQvuxBO4= 164 166 github.com/ipld/go-car/v2 v2.13.1/go.mod h1:QkdjjFNGit2GIkpQ953KBwowuoukoM75nP/JI1iDJdo= 165 167 github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= ··· 188 190 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= 189 191 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= 190 192 github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= 191 - github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= 192 - github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= 193 + github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= 194 + github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= 193 195 github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= 194 196 github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= 195 197 github.com/koron/go-ssdp v0.0.3 h1:JivLMY45N76b4p/vsWGOKewBQu6uf39y8l+AQ7sDKx8= ··· 312 314 github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= 313 315 github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= 314 316 github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= 315 - github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= 316 - github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= 317 - github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= 318 - github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= 317 + github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= 318 + github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ= 319 + github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= 320 + github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= 319 321 github.com/redis/go-redis/v9 v9.0.0-rc.4/go.mod h1:Vo3EsyWnicKnSKCA7HhgnvnyA74wOA69Cd2Meli5mmA= 320 322 github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0= 321 323 github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
+8 -8
handlers.go
··· 146 146 } 147 147 148 148 if profile.Raw == nil || len(profile.Raw) == 0 { 149 - s.addMissingProfile(ctx, accdid) 149 + s.backend.TrackMissingRecord(accdid, false) 150 150 return e.JSON(404, map[string]any{ 151 151 "error": "missing profile info for user", 152 152 }) ··· 307 307 } 308 308 309 309 if profile.Raw == nil || len(profile.Raw) == 0 { 310 - s.addMissingProfile(ctx, r.Did) 310 + s.backend.TrackMissingRecord(r.Did, false) 311 311 return &authorInfo{ 312 312 Handle: resp.Handle.String(), 313 313 Did: r.Did, ··· 379 379 380 380 uri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", r.Did, p.Rkey) 381 381 if len(p.Raw) == 0 || p.NotFound { 382 - s.addMissingPost(ctx, uri) 382 + s.backend.TrackMissingRecord(uri, false) 383 383 posts[ix] = postResponse{ 384 384 Uri: uri, 385 385 Missing: true, ··· 515 515 quotedPost, err := s.backend.GetPostByUri(ctx, quotedURI, "*") 516 516 if err != nil { 517 517 slog.Warn("failed to get quoted post", "uri", quotedURI, "error", err) 518 - s.addMissingPost(ctx, quotedURI) 518 + s.backend.TrackMissingRecord(quotedURI, false) 519 519 return s.buildQuoteFallback(quotedURI, quotedCid) 520 520 } 521 521 522 522 if quotedPost == nil || quotedPost.Raw == nil || len(quotedPost.Raw) == 0 || quotedPost.NotFound { 523 - s.addMissingPost(ctx, quotedURI) 523 + s.backend.TrackMissingRecord(quotedURI, false) 524 524 return s.buildQuoteFallback(quotedURI, quotedCid) 525 525 } 526 526 ··· 707 707 prof = &p 708 708 } 709 709 } else { 710 - s.addMissingProfile(ctx, r.Did) 710 + s.backend.TrackMissingRecord(r.Did, false) 711 711 } 712 712 713 713 users = append(users, engagementUser{ ··· 767 767 prof = &p 768 768 } 769 769 } else { 770 - s.addMissingProfile(ctx, r.Did) 770 + s.backend.TrackMissingRecord(r.Did, false) 771 771 } 772 772 773 773 users = append(users, engagementUser{ ··· 835 835 prof = &p 836 836 } 837 837 } else { 838 - s.addMissingProfile(ctx, r.Did) 838 + s.backend.TrackMissingRecord(r.Did, false) 839 839 } 840 840 841 841 users = append(users, engagementUser{
+16 -10
hydration/post.go
··· 17 17 18 18 // PostInfo contains hydrated post information 19 19 type PostInfo struct { 20 + ID uint 20 21 URI string 21 22 Cid string 22 23 Post *bsky.FeedPost ··· 39 40 ctx, span := tracer.Start(ctx, "hydratePost") 40 41 defer span.End() 41 42 43 + p, err := h.backend.GetPostByUri(ctx, uri, "*") 44 + if err != nil { 45 + return nil, err 46 + } 47 + 48 + return h.HydratePostDB(ctx, uri, p, viewerDID) 49 + } 50 + 51 + func (h *Hydrator) HydratePostDB(ctx context.Context, uri string, dbPost *models.Post, viewerDID string) (*PostInfo, error) { 42 52 autoFetch, _ := ctx.Value("auto-fetch").(bool) 43 53 44 54 authorDid := extractDIDFromURI(uri) ··· 47 57 return nil, err 48 58 } 49 59 50 - // Query post from database 51 - var dbPost models.Post 52 - if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ? `, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil { 53 - return nil, fmt.Errorf("failed to query post: %w", err) 54 - } 55 - 56 60 if dbPost.NotFound || len(dbPost.Raw) == 0 { 57 61 if autoFetch { 58 62 h.AddMissingRecord(uri, true) 59 - if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ? `, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil { 63 + if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ?`, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil { 60 64 return nil, fmt.Errorf("failed to query post: %w", err) 61 65 } 62 66 if dbPost.NotFound || len(dbPost.Raw) == 0 { ··· 75 79 76 80 var wg sync.WaitGroup 77 81 78 - // Get author DID 79 - 80 - authorDID := extractDIDFromURI(uri) 82 + authorDID := r.Did 81 83 82 84 // Get engagement counts 83 85 var likes, reposts, replies int ··· 121 123 wg.Wait() 122 124 123 125 info := &PostInfo{ 126 + ID: dbPost.ID, 124 127 URI: uri, 125 128 Cid: dbPost.Cid, 126 129 Post: &feedPost, ··· 385 388 386 389 // hydrateEmbeddedRecord hydrates an embedded record (for quote posts, etc.) 387 390 func (h *Hydrator) hydrateEmbeddedRecord(ctx context.Context, uri string, viewerDID string) *bsky.EmbedRecord_View_Record { 391 + ctx, span := tracer.Start(ctx, "hydrateEmbeddedRecord") 392 + defer span.End() 393 + 388 394 // Check if it's a post URI 389 395 if !isPostURI(uri) { 390 396 // Could be a feed generator, list, labeler, or starter pack
+10
hydration/utils.go
··· 5 5 "fmt" 6 6 7 7 "github.com/bluesky-social/indigo/atproto/syntax" 8 + "github.com/whyrusleeping/market/models" 8 9 ) 9 10 10 11 func (h *Hydrator) NormalizeUri(ctx context.Context, uri string) (string, error) { ··· 27 28 28 29 return fmt.Sprintf("at://%s/%s/%s", did, puri.Collection().String(), puri.RecordKey().String()), nil 29 30 } 31 + 32 + func (h *Hydrator) UriForPost(ctx context.Context, p *models.Post) (string, error) { 33 + did, err := h.backend.DidFromID(ctx, p.Author) 34 + if err != nil { 35 + return "", err 36 + } 37 + 38 + return fmt.Sprintf("at://%s/app.bsky.feed.post/%s", did, p.Rkey), nil 39 + }
+40 -103
main.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 + "encoding/json" 6 7 "fmt" 7 8 "log" 8 9 "log/slog" ··· 19 20 "github.com/bluesky-social/indigo/atproto/identity" 20 21 "github.com/bluesky-social/indigo/atproto/identity/redisdir" 21 22 "github.com/bluesky-social/indigo/atproto/syntax" 22 - "github.com/bluesky-social/indigo/cmd/relay/stream" 23 - "github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel" 24 23 "github.com/bluesky-social/indigo/repo" 25 24 "github.com/bluesky-social/indigo/util/cliutil" 26 25 xrpclib "github.com/bluesky-social/indigo/xrpc" 27 - "github.com/gorilla/websocket" 28 26 "github.com/ipfs/go-cid" 29 27 "github.com/jackc/pgx/v5/pgxpool" 30 28 "github.com/prometheus/client_golang/prometheus" ··· 71 69 &cli.StringFlag{ 72 70 Name: "redis-url", 73 71 }, 72 + &cli.StringFlag{ 73 + Name: "sync-config", 74 + }, 74 75 } 75 76 app.Action = func(cctx *cli.Context) error { 76 77 db, err := cliutil.SetupDatabase(cctx.String("db-url"), cctx.Int("max-db-connections")) ··· 131 132 db.AutoMigrate(StarterPack{}) 132 133 db.AutoMigrate(backend.SyncInfo{}) 133 134 db.AutoMigrate(Notification{}) 135 + db.AutoMigrate(NotificationSeen{}) 134 136 db.AutoMigrate(SequenceTracker{}) 135 137 db.Exec("CREATE INDEX IF NOT EXISTS reposts_subject_idx ON reposts (subject)") 136 138 db.Exec("CREATE INDEX IF NOT EXISTS posts_reply_to_idx ON posts (reply_to)") 139 + db.Exec("CREATE INDEX IF NOT EXISTS posts_in_thread_idx ON posts (in_thread)") 137 140 138 141 ctx := context.TODO() 139 142 ··· 198 201 client: cc, 199 202 dir: dir, 200 203 201 - missingRecords: make(chan MissingRecord, 1024), 202 - db: db, 204 + db: db, 203 205 } 204 - fmt.Println("MY DID: ", s.mydid) 205 206 206 - pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, nil) 207 + pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, dir) 207 208 if err != nil { 208 209 return err 209 210 } ··· 240 241 http.ListenAndServe(":4445", nil) 241 242 }() 242 243 243 - go s.missingRecordFetcher() 244 + sc := SyncConfig{ 245 + Backends: []SyncBackend{ 246 + { 247 + Type: "firehose", 248 + Host: "bsky.network", 249 + }, 250 + }, 251 + } 244 252 245 - seqno, err := loadLastSeq(db, "firehose_seq") 246 - if err != nil { 247 - fmt.Println("failed to load sequence number, starting over", err) 253 + if scfn := cctx.String("sync-config"); scfn != "" { 254 + { 255 + scfi, err := os.Open(scfn) 256 + if err != nil { 257 + return err 258 + } 259 + defer scfi.Close() 260 + 261 + var lsc SyncConfig 262 + if err := json.NewDecoder(scfi).Decode(&lsc); err != nil { 263 + return err 264 + } 265 + sc = lsc 266 + } 248 267 } 249 268 250 - return s.startLiveTail(ctx, int(seqno), 10, 20) 269 + /* 270 + sc.Backends[0] = SyncBackend{ 271 + Type: "jetstream", 272 + Host: "jetstream1.us-west.bsky.network", 273 + } 274 + */ 275 + 276 + return s.StartSyncEngine(ctx, &sc) 277 + 251 278 } 252 279 253 280 app.RunAndExitOnError() ··· 265 292 seqLk sync.Mutex 266 293 lastSeq int64 267 294 268 - mpLk sync.Mutex 269 - missingRecords chan MissingRecord 295 + mpLk sync.Mutex 270 296 271 297 db *gorm.DB 272 298 } ··· 274 300 func (s *Server) getXrpcClient() (*xrpclib.Client, error) { 275 301 // TODO: handle refreshing the token periodically 276 302 return s.client, nil 277 - } 278 - 279 - func (s *Server) startLiveTail(ctx context.Context, curs int, parWorkers, maxQ int) error { 280 - slog.Info("starting live tail") 281 - 282 - // Connect to the Relay websocket 283 - urlStr := fmt.Sprintf("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", curs) 284 - 285 - d := websocket.DefaultDialer 286 - con, _, err := d.Dial(urlStr, http.Header{ 287 - "User-Agent": []string{"market/0.0.1"}, 288 - }) 289 - if err != nil { 290 - return fmt.Errorf("failed to connect to relay: %w", err) 291 - } 292 - 293 - var lelk sync.Mutex 294 - lastEvent := time.Now() 295 - 296 - go func() { 297 - for range time.Tick(time.Second) { 298 - lelk.Lock() 299 - let := lastEvent 300 - lelk.Unlock() 301 - 302 - if time.Since(let) > time.Second*30 { 303 - slog.Error("firehose connection timed out") 304 - con.Close() 305 - return 306 - } 307 - 308 - } 309 - 310 - }() 311 - 312 - var cclk sync.Mutex 313 - var completeCursor int64 314 - 315 - rsc := &stream.RepoStreamCallbacks{ 316 - RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 317 - ctx := context.Background() 318 - 319 - firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq)) 320 - 321 - s.seqLk.Lock() 322 - if evt.Seq > s.lastSeq { 323 - curs = int(evt.Seq) 324 - s.lastSeq = evt.Seq 325 - 326 - if evt.Seq%1000 == 0 { 327 - if err := storeLastSeq(s.db, "firehose_seq", evt.Seq); err != nil { 328 - fmt.Println("failed to store seqno: ", err) 329 - } 330 - } 331 - } 332 - s.seqLk.Unlock() 333 - 334 - lelk.Lock() 335 - lastEvent = time.Now() 336 - lelk.Unlock() 337 - 338 - if err := s.backend.HandleEvent(ctx, evt); err != nil { 339 - return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err) 340 - } 341 - 342 - cclk.Lock() 343 - if evt.Seq > completeCursor { 344 - completeCursor = evt.Seq 345 - firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq)) 346 - } 347 - cclk.Unlock() 348 - 349 - return nil 350 - }, 351 - RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error { 352 - return nil 353 - }, 354 - // TODO: all the other event types 355 - Error: func(errf *stream.ErrorFrame) error { 356 - return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message) 357 - }, 358 - } 359 - 360 - sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler) 361 - 362 - //s.eventScheduler = sched 363 - //s.streamFinished = make(chan struct{}) 364 - 365 - return stream.HandleRepoStream(ctx, con, sched, slog.Default()) 366 303 } 367 304 368 305 func (s *Server) resolveAccountIdent(ctx context.Context, acc string) (string, error) {
-234
missing.go
··· 1 - package main 2 - 3 - import ( 4 - "bytes" 5 - "context" 6 - "fmt" 7 - "log/slog" 8 - 9 - "github.com/bluesky-social/indigo/api/atproto" 10 - "github.com/bluesky-social/indigo/api/bsky" 11 - "github.com/bluesky-social/indigo/atproto/syntax" 12 - xrpclib "github.com/bluesky-social/indigo/xrpc" 13 - "github.com/ipfs/go-cid" 14 - ) 15 - 16 - type MissingRecordType string 17 - 18 - const ( 19 - MissingRecordTypeProfile MissingRecordType = "profile" 20 - MissingRecordTypePost MissingRecordType = "post" 21 - MissingRecordTypeFeedGenerator MissingRecordType = "feedgenerator" 22 - ) 23 - 24 - type MissingRecord struct { 25 - Type MissingRecordType 26 - Identifier string // DID for profiles, AT-URI for posts/feedgens 27 - Wait bool 28 - 29 - waitch chan struct{} 30 - } 31 - 32 - func (s *Server) addMissingRecord(ctx context.Context, rec MissingRecord) { 33 - if rec.Wait { 34 - rec.waitch = make(chan struct{}) 35 - } 36 - 37 - select { 38 - case s.missingRecords <- rec: 39 - case <-ctx.Done(): 40 - } 41 - 42 - if rec.Wait { 43 - select { 44 - case <-rec.waitch: 45 - case <-ctx.Done(): 46 - } 47 - } 48 - } 49 - 50 - // Legacy methods for backward compatibility 51 - func (s *Server) addMissingProfile(ctx context.Context, did string) { 52 - s.addMissingRecord(ctx, MissingRecord{ 53 - Type: MissingRecordTypeProfile, 54 - Identifier: did, 55 - }) 56 - } 57 - 58 - func (s *Server) addMissingPost(ctx context.Context, uri string) { 59 - slog.Info("adding missing post to fetch queue", "uri", uri) 60 - s.addMissingRecord(ctx, MissingRecord{ 61 - Type: MissingRecordTypePost, 62 - Identifier: uri, 63 - }) 64 - } 65 - 66 - func (s *Server) addMissingFeedGenerator(ctx context.Context, uri string) { 67 - slog.Info("adding missing feed generator to fetch queue", "uri", uri) 68 - s.addMissingRecord(ctx, MissingRecord{ 69 - Type: MissingRecordTypeFeedGenerator, 70 - Identifier: uri, 71 - }) 72 - } 73 - 74 - func (s *Server) missingRecordFetcher() { 75 - for rec := range s.missingRecords { 76 - var err error 77 - switch rec.Type { 78 - case MissingRecordTypeProfile: 79 - err = s.fetchMissingProfile(context.TODO(), rec.Identifier) 80 - case MissingRecordTypePost: 81 - err = s.fetchMissingPost(context.TODO(), rec.Identifier) 82 - case MissingRecordTypeFeedGenerator: 83 - err = s.fetchMissingFeedGenerator(context.TODO(), rec.Identifier) 84 - default: 85 - slog.Error("unknown missing record type", "type", rec.Type) 86 - continue 87 - } 88 - 89 - if err != nil { 90 - slog.Warn("failed to fetch missing record", "type", rec.Type, "identifier", rec.Identifier, "error", err) 91 - } 92 - 93 - if rec.Wait { 94 - close(rec.waitch) 95 - } 96 - } 97 - } 98 - 99 - func (s *Server) fetchMissingProfile(ctx context.Context, did string) error { 100 - s.backend.AddRelevantDid(did) 101 - 102 - repo, err := s.backend.GetOrCreateRepo(ctx, did) 103 - if err != nil { 104 - return err 105 - } 106 - 107 - resp, err := s.dir.LookupDID(ctx, syntax.DID(did)) 108 - if err != nil { 109 - return err 110 - } 111 - 112 - c := &xrpclib.Client{ 113 - Host: resp.PDSEndpoint(), 114 - } 115 - 116 - rec, err := atproto.RepoGetRecord(ctx, c, "", "app.bsky.actor.profile", did, "self") 117 - if err != nil { 118 - return err 119 - } 120 - 121 - prof, ok := rec.Value.Val.(*bsky.ActorProfile) 122 - if !ok { 123 - return fmt.Errorf("record we got back wasnt a profile somehow") 124 - } 125 - 126 - buf := new(bytes.Buffer) 127 - if err := prof.MarshalCBOR(buf); err != nil { 128 - return err 129 - } 130 - 131 - cc, err := cid.Decode(*rec.Cid) 132 - if err != nil { 133 - return err 134 - } 135 - 136 - return s.backend.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc) 137 - } 138 - 139 - func (s *Server) fetchMissingPost(ctx context.Context, uri string) error { 140 - puri, err := syntax.ParseATURI(uri) 141 - if err != nil { 142 - return fmt.Errorf("invalid AT URI: %s", uri) 143 - } 144 - 145 - did := puri.Authority().String() 146 - collection := puri.Collection().String() 147 - rkey := puri.RecordKey().String() 148 - 149 - s.backend.AddRelevantDid(did) 150 - 151 - repo, err := s.backend.GetOrCreateRepo(ctx, did) 152 - if err != nil { 153 - return err 154 - } 155 - 156 - resp, err := s.dir.LookupDID(ctx, syntax.DID(did)) 157 - if err != nil { 158 - return err 159 - } 160 - 161 - c := &xrpclib.Client{ 162 - Host: resp.PDSEndpoint(), 163 - } 164 - 165 - rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey) 166 - if err != nil { 167 - return err 168 - } 169 - 170 - post, ok := rec.Value.Val.(*bsky.FeedPost) 171 - if !ok { 172 - return fmt.Errorf("record we got back wasn't a post somehow") 173 - } 174 - 175 - buf := new(bytes.Buffer) 176 - if err := post.MarshalCBOR(buf); err != nil { 177 - return err 178 - } 179 - 180 - cc, err := cid.Decode(*rec.Cid) 181 - if err != nil { 182 - return err 183 - } 184 - 185 - return s.backend.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc) 186 - } 187 - 188 - func (s *Server) fetchMissingFeedGenerator(ctx context.Context, uri string) error { 189 - puri, err := syntax.ParseATURI(uri) 190 - if err != nil { 191 - return fmt.Errorf("invalid AT URI: %s", uri) 192 - } 193 - 194 - did := puri.Authority().String() 195 - collection := puri.Collection().String() 196 - rkey := puri.RecordKey().String() 197 - s.backend.AddRelevantDid(did) 198 - 199 - repo, err := s.backend.GetOrCreateRepo(ctx, did) 200 - if err != nil { 201 - return err 202 - } 203 - 204 - resp, err := s.dir.LookupDID(ctx, syntax.DID(did)) 205 - if err != nil { 206 - return err 207 - } 208 - 209 - c := &xrpclib.Client{ 210 - Host: resp.PDSEndpoint(), 211 - } 212 - 213 - rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey) 214 - if err != nil { 215 - return err 216 - } 217 - 218 - feedGen, ok := rec.Value.Val.(*bsky.FeedGenerator) 219 - if !ok { 220 - return fmt.Errorf("record we got back wasn't a feed generator somehow") 221 - } 222 - 223 - buf := new(bytes.Buffer) 224 - if err := feedGen.MarshalCBOR(buf); err != nil { 225 - return err 226 - } 227 - 228 - cc, err := cid.Decode(*rec.Cid) 229 - if err != nil { 230 - return err 231 - } 232 - 233 - return s.backend.HandleCreateFeedGenerator(ctx, repo, rkey, buf.Bytes(), cc) 234 - }
+5
models/models.go
··· 47 47 Key string `gorm:"uniqueIndex"` 48 48 IntVal int64 49 49 } 50 + 51 + type NotificationSeen struct { 52 + Repo uint `gorm:"uniqueindex"` 53 + SeenAt time.Time 54 + }
+8
sync-config-jetstream.json
··· 1 + { 2 + "backends": [ 3 + { 4 + "type": "jetstream", 5 + "host": "jetstream1.us-west.bsky.network" 6 + } 7 + ] 8 + }
+281
sync.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "net/http" 8 + "sync" 9 + "time" 10 + 11 + "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/cmd/relay/stream" 13 + "github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel" 14 + jsclient "github.com/bluesky-social/jetstream/pkg/client" 15 + jsparallel "github.com/bluesky-social/jetstream/pkg/client/schedulers/parallel" 16 + "github.com/bluesky-social/jetstream/pkg/models" 17 + "github.com/gorilla/websocket" 18 + ) 19 + 20 + type SyncConfig struct { 21 + Backends []SyncBackend `json:"backends"` 22 + } 23 + 24 + type SyncBackend struct { 25 + Type string `json:"type"` 26 + Host string `json:"host"` 27 + MaxWorkers int `json:"max_workers,omitempty"` 28 + } 29 + 30 + func (s *Server) StartSyncEngine(ctx context.Context, sc *SyncConfig) error { 31 + for _, be := range sc.Backends { 32 + switch be.Type { 33 + case "firehose": 34 + go s.runSyncFirehose(ctx, be) 35 + case "jetstream": 36 + go s.runSyncJetstream(ctx, be) 37 + default: 38 + return fmt.Errorf("unrecognized sync backend type: %q", be.Type) 39 + } 40 + } 41 + 42 + <-ctx.Done() 43 + return fmt.Errorf("exiting sync routine") 44 + } 45 + 46 + const failureTimeInterval = time.Second * 5 47 + 48 + func (s *Server) runSyncFirehose(ctx context.Context, be SyncBackend) { 49 + var failures int 50 + for { 51 + seqno, err := loadLastSeq(s.db, be.Host) 52 + if err != nil { 53 + fmt.Println("failed to load sequence number, starting over", err) 54 + } 55 + 56 + maxWorkers := 10 57 + if be.MaxWorkers != 0 { 58 + maxWorkers = be.MaxWorkers 59 + } 60 + 61 + start := time.Now() 62 + if err := s.startLiveTail(ctx, be.Host, int(seqno), maxWorkers, 20); err != nil { 63 + slog.Error("firehose connection lost", "host", be.Host, "error", err) 64 + } 65 + 66 + elapsed := time.Since(start) 67 + 68 + if elapsed > failureTimeInterval { 69 + failures = 0 70 + continue 71 + } 72 + failures++ 73 + 74 + delay := delayForFailureCount(failures) 75 + slog.Warn("retrying connection after delay", "host", be.Host, "delay", delay) 76 + } 77 + } 78 + 79 + func (s *Server) runSyncJetstream(ctx context.Context, be SyncBackend) { 80 + var failures int 81 + for { 82 + // Load last cursor (stored as sequence number in same table) 83 + cursor, err := loadLastSeq(s.db, be.Host) 84 + if err != nil { 85 + slog.Warn("failed to load jetstream cursor, starting from live", "error", err) 86 + cursor = 0 87 + } 88 + 89 + maxWorkers := 10 90 + if be.MaxWorkers != 0 { 91 + maxWorkers = be.MaxWorkers 92 + } 93 + 94 + start := time.Now() 95 + if err := s.startJetstreamTail(ctx, be.Host, cursor, maxWorkers); err != nil { 96 + slog.Error("jetstream connection lost", "host", be.Host, "error", err) 97 + } 98 + 99 + elapsed := time.Since(start) 100 + 101 + if elapsed > failureTimeInterval { 102 + failures = 0 103 + continue 104 + } 105 + failures++ 106 + 107 + delay := delayForFailureCount(failures) 108 + slog.Warn("retrying jetstream connection after delay", "host", be.Host, "delay", delay) 109 + time.Sleep(delay) 110 + } 111 + } 112 + 113 + func delayForFailureCount(n int) time.Duration { 114 + if n < 5 { 115 + return (time.Second * 5) + (time.Second * 2 * time.Duration(n)) 116 + } 117 + 118 + return time.Second * 30 119 + } 120 + 121 + func (s *Server) startLiveTail(ctx context.Context, host string, curs int, parWorkers, maxQ int) error { 122 + ctx, cancel := context.WithCancel(ctx) 123 + defer cancel() 124 + 125 + slog.Info("starting live tail") 126 + 127 + // Connect to the Relay websocket 128 + urlStr := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", host, curs) 129 + 130 + d := websocket.DefaultDialer 131 + con, _, err := d.Dial(urlStr, http.Header{ 132 + "User-Agent": []string{"konbini/0.0.1"}, 133 + }) 134 + if err != nil { 135 + return fmt.Errorf("failed to connect to relay: %w", err) 136 + } 137 + 138 + var lelk sync.Mutex 139 + lastEvent := time.Now() 140 + 141 + go func() { 142 + tick := time.NewTicker(time.Second) 143 + defer tick.Stop() 144 + for { 145 + select { 146 + case <-tick.C: 147 + lelk.Lock() 148 + let := lastEvent 149 + lelk.Unlock() 150 + 151 + if time.Since(let) > time.Second*30 { 152 + slog.Error("firehose connection timed out") 153 + con.Close() 154 + return 155 + } 156 + case <-ctx.Done(): 157 + return 158 + } 159 + } 160 + }() 161 + 162 + var cclk sync.Mutex 163 + var completeCursor int64 164 + 165 + rsc := &stream.RepoStreamCallbacks{ 166 + RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 167 + ctx := context.Background() 168 + 169 + firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq)) 170 + 171 + s.seqLk.Lock() 172 + if evt.Seq > s.lastSeq { 173 + curs = int(evt.Seq) 174 + s.lastSeq = evt.Seq 175 + 176 + if evt.Seq%1000 == 0 { 177 + if err := storeLastSeq(s.db, host, evt.Seq); err != nil { 178 + fmt.Println("failed to store seqno: ", err) 179 + } 180 + } 181 + } 182 + s.seqLk.Unlock() 183 + 184 + lelk.Lock() 185 + lastEvent = time.Now() 186 + lelk.Unlock() 187 + 188 + if err := s.backend.HandleEvent(ctx, evt); err != nil { 189 + return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err) 190 + } 191 + 192 + cclk.Lock() 193 + if evt.Seq > completeCursor { 194 + completeCursor = evt.Seq 195 + firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq)) 196 + } 197 + cclk.Unlock() 198 + 199 + return nil 200 + }, 201 + RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error { 202 + return nil 203 + }, 204 + // TODO: all the other event types 205 + Error: func(errf *stream.ErrorFrame) error { 206 + return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message) 207 + }, 208 + } 209 + 210 + sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler) 211 + 212 + return stream.HandleRepoStream(ctx, con, sched, slog.Default()) 213 + } 214 + 215 + func (s *Server) startJetstreamTail(ctx context.Context, host string, cursor int64, parWorkers int) error { 216 + ctx, cancel := context.WithCancel(ctx) 217 + defer cancel() 218 + 219 + slog.Info("starting jetstream tail", "host", host, "cursor", cursor) 220 + 221 + // Create a scheduler for parallel processing 222 + lastStored := int64(0) 223 + sched := jsparallel.NewScheduler( 224 + parWorkers, 225 + host, 226 + slog.Default(), 227 + func(ctx context.Context, event *models.Event) error { 228 + // Update cursor tracking 229 + s.seqLk.Lock() 230 + if event.TimeUS > s.lastSeq { 231 + s.lastSeq = event.TimeUS 232 + if event.TimeUS-lastStored > 1_000_000 { 233 + // Store checkpoint periodically 234 + if err := storeLastSeq(s.db, host, event.TimeUS); err != nil { 235 + slog.Error("failed to store jetstream cursor", "error", err) 236 + } 237 + lastStored = event.TimeUS 238 + } 239 + } 240 + s.seqLk.Unlock() 241 + 242 + // Update metrics 243 + firehoseCursorGauge.WithLabelValues("ingest").Set(float64(event.TimeUS)) 244 + 245 + // Convert Jetstream event to ATProto event format 246 + if event.Commit != nil { 247 + 248 + if err := s.backend.HandleEventJetstream(ctx, event); err != nil { 249 + return fmt.Errorf("handle event (%s,%d): %w", event.Did, event.TimeUS, err) 250 + } 251 + 252 + firehoseCursorGauge.WithLabelValues("complete").Set(float64(event.TimeUS)) 253 + } 254 + 255 + return nil 256 + }, 257 + ) 258 + 259 + // Configure Jetstream client 260 + config := jsclient.DefaultClientConfig() 261 + config.WebsocketURL = fmt.Sprintf("wss://%s/subscribe", host) 262 + 263 + // Prepare cursor pointer 264 + var cursorPtr *int64 265 + if cursor > 0 { 266 + cursorPtr = &cursor 267 + } 268 + 269 + // Create and connect client 270 + client, err := jsclient.NewClient( 271 + config, 272 + slog.Default(), 273 + sched, 274 + ) 275 + if err != nil { 276 + return fmt.Errorf("create jetstream client: %w", err) 277 + } 278 + 279 + // Start reading from Jetstream 280 + return client.ConnectAndRead(ctx, cursorPtr) 281 + }
+11 -11
xrpc/feed/getPostThread.go
··· 15 15 func HandleGetPostThread(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error { 16 16 uriParam := c.QueryParam("uri") 17 17 if uriParam == "" { 18 - return c.JSON(http.StatusBadRequest, map[string]interface{}{ 18 + return c.JSON(http.StatusBadRequest, map[string]any{ 19 19 "error": "InvalidRequest", 20 20 "message": "uri parameter is required", 21 21 }) ··· 27 27 // Hydrate the requested post 28 28 postInfo, err := hydrator.HydratePost(ctx, uriParam, viewer) 29 29 if err != nil { 30 - return c.JSON(http.StatusNotFound, map[string]interface{}{ 30 + return c.JSON(http.StatusNotFound, map[string]any{ 31 31 "error": "NotFound", 32 32 "message": "post not found", 33 33 }) ··· 74 74 uri: uri, 75 75 replyTo: tp.ReplyTo, 76 76 inThread: tp.InThread, 77 - replies: []interface{}{}, 77 + replies: []any{}, 78 78 } 79 79 } 80 80 ··· 98 98 } 99 99 100 100 if rootNode == nil { 101 - return c.JSON(http.StatusNotFound, map[string]interface{}{ 101 + return c.JSON(http.StatusNotFound, map[string]any{ 102 102 "error": "NotFound", 103 103 "message": "thread root not found", 104 104 }) ··· 107 107 // Build the response by traversing the tree 108 108 thread := buildThreadView(ctx, db, rootNode, postsByID, hydrator, viewer, nil) 109 109 110 - return c.JSON(http.StatusOK, map[string]interface{}{ 110 + return c.JSON(http.StatusOK, map[string]any{ 111 111 "thread": thread, 112 112 }) 113 113 } ··· 117 117 uri string 118 118 replyTo uint 119 119 inThread uint 120 - replies []interface{} 120 + replies []any 121 121 } 122 122 123 - func buildThreadView(ctx context.Context, db *gorm.DB, node *threadPostNode, allNodes map[uint]*threadPostNode, hydrator *hydration.Hydrator, viewer string, parent interface{}) interface{} { 123 + func buildThreadView(ctx context.Context, db *gorm.DB, node *threadPostNode, allNodes map[uint]*threadPostNode, hydrator *hydration.Hydrator, viewer string, parent any) any { 124 124 // Hydrate this post 125 125 postInfo, err := hydrator.HydratePost(ctx, node.uri, viewer) 126 126 if err != nil { 127 127 // Return a notFound post 128 - return map[string]interface{}{ 128 + return map[string]any{ 129 129 "$type": "app.bsky.feed.defs#notFoundPost", 130 130 "uri": node.uri, 131 131 } ··· 134 134 // Hydrate author 135 135 authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author) 136 136 if err != nil { 137 - return map[string]interface{}{ 137 + return map[string]any{ 138 138 "$type": "app.bsky.feed.defs#notFoundPost", 139 139 "uri": node.uri, 140 140 } 141 141 } 142 142 143 143 // Build replies 144 - var replies []interface{} 144 + var replies []any 145 145 for _, replyNode := range node.replies { 146 146 if rn, ok := replyNode.(*threadPostNode); ok { 147 147 replyView := buildThreadView(ctx, db, rn, allNodes, hydrator, viewer, nil) ··· 150 150 } 151 151 152 152 // Build the thread view post 153 - var repliesForView interface{} 153 + var repliesForView any 154 154 if len(replies) > 0 { 155 155 repliesForView = replies 156 156 }
+92 -11
xrpc/notification/listNotifications.go
··· 13 13 lexutil "github.com/bluesky-social/indigo/lex/util" 14 14 "github.com/labstack/echo/v4" 15 15 "github.com/whyrusleeping/konbini/hydration" 16 + models "github.com/whyrusleeping/konbini/models" 16 17 "github.com/whyrusleeping/konbini/views" 17 - "github.com/whyrusleeping/market/models" 18 18 "gorm.io/gorm" 19 + "gorm.io/gorm/clause" 19 20 ) 20 21 21 22 // HandleListNotifications implements app.bsky.notification.listNotifications 22 23 func HandleListNotifications(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error { 23 24 viewer := getUserDID(c) 24 25 if viewer == "" { 25 - return c.JSON(http.StatusUnauthorized, map[string]interface{}{ 26 + return c.JSON(http.StatusUnauthorized, map[string]any{ 26 27 "error": "AuthenticationRequired", 27 28 "message": "authentication required", 28 29 }) ··· 77 78 } 78 79 query += ` ORDER BY n.created_at DESC LIMIT ?` 79 80 80 - var queryArgs []interface{} 81 + var queryArgs []any 81 82 queryArgs = append(queryArgs, viewer) 82 83 if cursor > 0 { 83 84 queryArgs = append(queryArgs, cursor) ··· 85 86 queryArgs = append(queryArgs, limit) 86 87 87 88 if err := db.Raw(query, queryArgs...).Scan(&rows).Error; err != nil { 88 - return c.JSON(http.StatusInternalServerError, map[string]interface{}{ 89 + return c.JSON(http.StatusInternalServerError, map[string]any{ 89 90 "error": "InternalError", 90 91 "message": "failed to query notifications", 91 92 }) ··· 130 131 cursorPtr = &cursor 131 132 } 132 133 134 + var lastSeen time.Time 135 + if err := db.Raw("SELECT seen_at FROM notification_seens WHERE repo = (select id from repos where did = ?)", viewer).Scan(&lastSeen).Error; err != nil { 136 + return err 137 + } 138 + 139 + var lastSeenStr *string 140 + if !lastSeen.IsZero() { 141 + s := lastSeen.Format(time.RFC3339) 142 + lastSeenStr = &s 143 + } 144 + 133 145 output := &bsky.NotificationListNotifications_Output{ 134 146 Notifications: notifications, 135 147 Cursor: cursorPtr, 148 + SeenAt: lastSeenStr, 136 149 } 137 150 138 151 return c.JSON(http.StatusOK, output) ··· 142 155 func HandleGetUnreadCount(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error { 143 156 viewer := getUserDID(c) 144 157 if viewer == "" { 145 - return c.JSON(http.StatusUnauthorized, map[string]interface{}{ 158 + return c.JSON(http.StatusUnauthorized, map[string]any{ 146 159 "error": "AuthenticationRequired", 147 160 "message": "authentication required", 148 161 }) 149 162 } 150 163 151 - // For now, return 0 - we'd need to track read state in the database 152 - return c.JSON(http.StatusOK, map[string]interface{}{ 153 - "count": 0, 164 + var repo models.Repo 165 + if err := db.Find(&repo, "did = ?", viewer).Error; err != nil { 166 + return err 167 + } 168 + 169 + var lastSeen time.Time 170 + if err := db.Raw("SELECT seen_at FROM notification_seens WHERE repo = ?", repo.ID).Scan(&lastSeen).Error; err != nil { 171 + return err 172 + } 173 + 174 + var count int 175 + query := `SELECT count(*) FROM notifications WHERE created_at > ? AND for = ?` 176 + if err := db.Raw(query, lastSeen, repo.ID).Scan(&count).Error; err != nil { 177 + return c.JSON(http.StatusInternalServerError, map[string]any{ 178 + "error": "InternalError", 179 + "message": "failed to count unread notifications", 180 + }) 181 + } 182 + 183 + return c.JSON(http.StatusOK, map[string]any{ 184 + "count": count, 154 185 }) 155 186 } 156 187 ··· 158 189 func HandleUpdateSeen(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error { 159 190 viewer := getUserDID(c) 160 191 if viewer == "" { 161 - return c.JSON(http.StatusUnauthorized, map[string]interface{}{ 192 + return c.JSON(http.StatusUnauthorized, map[string]any{ 162 193 "error": "AuthenticationRequired", 163 194 "message": "authentication required", 164 195 }) 165 196 } 166 197 167 - // For now, just return success - we'd need to track seen timestamps in the database 168 - return c.JSON(http.StatusOK, map[string]interface{}{}) 198 + var body bsky.NotificationUpdateSeen_Input 199 + if err := c.Bind(&body); err != nil { 200 + return c.JSON(http.StatusBadRequest, map[string]any{ 201 + "error": "InvalidRequest", 202 + "message": "invalid request body", 203 + }) 204 + } 205 + 206 + // Parse the seenAt timestamp 207 + seenAt, err := time.Parse(time.RFC3339, body.SeenAt) 208 + if err != nil { 209 + return c.JSON(http.StatusBadRequest, map[string]any{ 210 + "error": "InvalidRequest", 211 + "message": "invalid seenAt timestamp", 212 + }) 213 + } 214 + 215 + // Get the viewer's repo ID 216 + var repoID uint 217 + if err := db.Raw("SELECT id FROM repos WHERE did = ?", viewer).Scan(&repoID).Error; err != nil { 218 + return c.JSON(http.StatusInternalServerError, map[string]any{ 219 + "error": "InternalError", 220 + "message": "failed to find viewer repo", 221 + }) 222 + } 223 + 224 + if repoID == 0 { 225 + return c.JSON(http.StatusInternalServerError, map[string]any{ 226 + "error": "InternalError", 227 + "message": "viewer repo not found", 228 + }) 229 + } 230 + 231 + // Upsert the NotificationSeen record 232 + notifSeen := models.NotificationSeen{ 233 + Repo: repoID, 234 + SeenAt: seenAt, 235 + } 236 + 237 + err = db.Clauses(clause.OnConflict{ 238 + Columns: []clause.Column{{Name: "repo"}}, 239 + DoUpdates: clause.AssignmentColumns([]string{"seen_at"}), 240 + }).Create(&notifSeen).Error 241 + 242 + if err != nil { 243 + return c.JSON(http.StatusInternalServerError, map[string]any{ 244 + "error": "InternalError", 245 + "message": "failed to update seen timestamp", 246 + }) 247 + } 248 + 249 + return c.JSON(http.StatusOK, map[string]any{}) 169 250 } 170 251 171 252 func getUserDID(c echo.Context) string {
+159 -131
xrpc/unspecced/getPostThreadV2.go
··· 1 1 package unspecced 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "fmt" 6 7 "log/slog" 7 8 "net/http" 8 9 "strconv" 10 + "sync" 9 11 10 12 "github.com/bluesky-social/indigo/api/bsky" 11 13 "github.com/labstack/echo/v4" 12 14 "github.com/whyrusleeping/konbini/hydration" 13 15 "github.com/whyrusleeping/konbini/views" 16 + "github.com/whyrusleeping/market/models" 17 + "go.opentelemetry.io/otel" 14 18 "gorm.io/gorm" 15 19 ) 16 20 21 + var tracer = otel.Tracer("xrpc/unspecced") 22 + 17 23 // HandleGetPostThreadV2 implements app.bsky.unspecced.getPostThreadV2 18 24 func HandleGetPostThreadV2(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error { 19 - ctx := c.Request().Context() 25 + ctx, span := tracer.Start(c.Request().Context(), "getPostThreadV2") 26 + defer span.End() 20 27 ctx = context.WithValue(ctx, "auto-fetch", true) 21 28 22 29 // Parse parameters ··· 69 76 }) 70 77 } 71 78 72 - // Determine the root post ID for the thread 73 - rootPostID := anchorPostInfo.InThread 74 - if rootPostID == 0 { 75 - // This post is the root - get its ID 76 - var postID uint 77 - db.Raw(` 78 - SELECT id FROM posts 79 - WHERE author = (SELECT id FROM repos WHERE did = ?) 80 - AND rkey = ? 81 - `, extractDIDFromURI(anchorUri), extractRkeyFromURI(anchorUri)).Scan(&postID) 82 - rootPostID = postID 79 + threadID := anchorPostInfo.InThread 80 + if threadID == 0 { 81 + threadID = anchorPostInfo.ID 83 82 } 84 83 85 - // Query all posts in this thread 86 - type threadPostRow struct { 87 - ID uint 88 - Rkey string 89 - ReplyTo uint 90 - InThread uint 91 - AuthorDid string 84 + var threadPosts []*models.Post 85 + if err := db.Raw("SELECT * FROM posts WHERE in_thread = ? OR id = ?", threadID, anchorPostInfo.ID).Scan(&threadPosts).Error; err != nil { 86 + return err 92 87 } 93 - var threadPosts []threadPostRow 94 - db.Raw(` 95 - SELECT p.id, p.rkey, p.reply_to, p.in_thread, r.did as author_did 96 - FROM posts p 97 - JOIN repos r ON r.id = p.author 98 - WHERE (p.id = ? OR p.in_thread = ?) 99 - AND p.not_found = false 100 - ORDER BY p.created ASC 101 - `, rootPostID, rootPostID).Scan(&threadPosts) 102 88 103 - // Build a map of posts by ID 104 - postsByID := make(map[uint]*threadNode) 105 - for _, tp := range threadPosts { 106 - uri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", tp.AuthorDid, tp.Rkey) 107 - postsByID[tp.ID] = &threadNode{ 108 - id: tp.ID, 109 - uri: uri, 110 - replyTo: tp.ReplyTo, 111 - inThread: tp.InThread, 112 - children: []*threadNode{}, 113 - } 114 - } 89 + fmt.Println("GOT THREAD POSTS: ", len(threadPosts)) 115 90 116 - // Build parent-child relationships 117 - for _, node := range postsByID { 118 - if node.replyTo != 0 { 119 - parent := postsByID[node.replyTo] 120 - if parent != nil { 121 - parent.children = append(parent.children, node) 122 - } 123 - } 91 + treeNodes, err := buildThreadTree(ctx, hydrator, db, threadPosts) 92 + if err != nil { 93 + return fmt.Errorf("failed to construct tree: %w", err) 124 94 } 125 95 126 - // Find the anchor node 127 - anchorID := uint(0) 128 - for id, node := range postsByID { 129 - if node.uri == anchorUri { 130 - anchorID = id 131 - break 132 - } 133 - } 134 - 135 - if anchorID == 0 { 136 - return c.JSON(http.StatusNotFound, map[string]interface{}{ 137 - "error": "NotFound", 138 - "message": "anchor post not found in thread", 139 - }) 140 - } 141 - 142 - anchorNode := postsByID[anchorID] 96 + anchor := treeNodes[anchorPostInfo.ID] 143 97 144 98 // Build flat thread items list 145 99 var threadItems []*bsky.UnspeccedGetPostThreadV2_ThreadItem ··· 147 101 148 102 // Add parents if requested 149 103 if above { 150 - parents := collectParents(anchorNode, postsByID) 151 - for i := len(parents) - 1; i >= 0; i-- { 152 - depth := int64(-(len(parents) - i)) 153 - item := buildThreadItem(ctx, hydrator, parents[i], depth, viewer) 104 + parent := anchor.parent 105 + depth := int64(-1) 106 + for parent != nil { 107 + if parent.missing { 108 + fmt.Println("Parent missing: ", depth) 109 + item := &bsky.UnspeccedGetPostThreadV2_ThreadItem{ 110 + Depth: depth, 111 + Uri: parent.uri, 112 + Value: &bsky.UnspeccedGetPostThreadV2_ThreadItem_Value{ 113 + UnspeccedDefs_ThreadItemNotFound: &bsky.UnspeccedDefs_ThreadItemNotFound{ 114 + LexiconTypeID: "app.bsky.unspecced.defs#threadItemNotFound", 115 + }, 116 + }, 117 + } 118 + 119 + threadItems = append(threadItems, item) 120 + break 121 + } 122 + 123 + item := buildThreadItem(ctx, hydrator, parent, depth, viewer) 154 124 if item != nil { 155 125 threadItems = append(threadItems, item) 156 126 } 127 + 128 + parent = parent.parent 129 + depth-- 157 130 } 158 131 } 159 132 160 133 // Add anchor post (depth 0) 161 - anchorItem := buildThreadItem(ctx, hydrator, anchorNode, 0, viewer) 134 + anchorItem := buildThreadItem(ctx, hydrator, anchor, 0, viewer) 162 135 if anchorItem != nil { 163 136 threadItems = append(threadItems, anchorItem) 164 137 } 165 138 166 139 // Add replies below anchor 167 140 if below > 0 { 168 - replies, hasMore := collectReplies(ctx, hydrator, anchorNode, 1, below, branchingFactor, sort, viewer) 141 + replies, err := collectReplies(ctx, hydrator, anchor, 0, below, branchingFactor, sort, viewer) 142 + if err != nil { 143 + return err 144 + } 169 145 threadItems = append(threadItems, replies...) 170 - hasOtherReplies = hasMore 146 + //hasOtherReplies = hasMore 171 147 } 172 148 173 149 return c.JSON(http.StatusOK, &bsky.UnspeccedGetPostThreadV2_Output{ ··· 176 152 }) 177 153 } 178 154 179 - type threadNode struct { 180 - id uint 181 - uri string 182 - replyTo uint 183 - inThread uint 184 - children []*threadNode 185 - } 155 + func collectReplies(ctx context.Context, hydrator *hydration.Hydrator, curnode *threadTree, depth int64, below int64, branchingFactor int64, sort string, viewer string) ([]*bsky.UnspeccedGetPostThreadV2_ThreadItem, error) { 156 + if below == 0 { 157 + return nil, nil 158 + } 186 159 187 - func collectParents(node *threadNode, allNodes map[uint]*threadNode) []*threadNode { 188 - var parents []*threadNode 189 - current := node 190 - for current.replyTo != 0 { 191 - parent := allNodes[current.replyTo] 192 - if parent == nil { 193 - break 194 - } 195 - parents = append(parents, parent) 196 - current = parent 160 + type parThreadResults struct { 161 + node *bsky.UnspeccedGetPostThreadV2_ThreadItem 162 + children []*bsky.UnspeccedGetPostThreadV2_ThreadItem 197 163 } 198 - return parents 199 - } 164 + 165 + results := make([]parThreadResults, len(curnode.children)) 166 + 167 + var wg sync.WaitGroup 168 + for i := range curnode.children { 169 + ix := i 170 + wg.Go(func() { 171 + child := curnode.children[ix] 172 + 173 + results[ix].node = buildThreadItem(ctx, hydrator, child, depth+1, viewer) 174 + if child.missing { 175 + return 176 + } 200 177 201 - func collectReplies(ctx context.Context, hydrator *hydration.Hydrator, node *threadNode, currentDepth, maxDepth, branchingFactor int64, sort string, viewer string) ([]*bsky.UnspeccedGetPostThreadV2_ThreadItem, bool) { 202 - var items []*bsky.UnspeccedGetPostThreadV2_ThreadItem 203 - hasMore := false 178 + sub, err := collectReplies(ctx, hydrator, child, depth+1, below-1, branchingFactor, sort, viewer) 179 + if err != nil { 180 + slog.Error("failed to collect replies", "node", child.uri, "error", err) 181 + return 182 + } 204 183 205 - if currentDepth > maxDepth { 206 - return items, false 184 + results[ix].children = sub 185 + }) 207 186 } 208 187 209 - // Sort children based on sort parameter 210 - children := node.children 211 - // TODO: Actually sort based on the sort parameter (newest/oldest/top) 212 - // For now, just use the order we have 188 + wg.Wait() 213 189 214 - // Limit to branchingFactor 215 - limit := int(branchingFactor) 216 - if len(children) > limit { 217 - hasMore = true 218 - children = children[:limit] 190 + var out []*bsky.UnspeccedGetPostThreadV2_ThreadItem 191 + for _, res := range results { 192 + out = append(out, res.node) 193 + out = append(out, res.children...) 219 194 } 220 195 221 - for _, child := range children { 222 - item := buildThreadItem(ctx, hydrator, child, currentDepth, viewer) 223 - if item != nil { 224 - items = append(items, item) 196 + return out, nil 197 + } 225 198 226 - // Recursively collect replies 227 - if currentDepth < maxDepth { 228 - childReplies, childHasMore := collectReplies(ctx, hydrator, child, currentDepth+1, maxDepth, branchingFactor, sort, viewer) 229 - items = append(items, childReplies...) 230 - if childHasMore { 231 - hasMore = true 232 - } 233 - } 199 + func buildThreadItem(ctx context.Context, hydrator *hydration.Hydrator, node *threadTree, depth int64, viewer string) *bsky.UnspeccedGetPostThreadV2_ThreadItem { 200 + if node.missing { 201 + return &bsky.UnspeccedGetPostThreadV2_ThreadItem{ 202 + Depth: depth, 203 + Uri: node.uri, 204 + Value: &bsky.UnspeccedGetPostThreadV2_ThreadItem_Value{ 205 + UnspeccedDefs_ThreadItemNotFound: &bsky.UnspeccedDefs_ThreadItemNotFound{ 206 + LexiconTypeID: "app.bsky.unspecced.defs#threadItemNotFound", 207 + }, 208 + }, 234 209 } 235 210 } 236 211 237 - return items, hasMore 238 - } 239 - 240 - func buildThreadItem(ctx context.Context, hydrator *hydration.Hydrator, node *threadNode, depth int64, viewer string) *bsky.UnspeccedGetPostThreadV2_ThreadItem { 241 212 // Hydrate the post 242 - postInfo, err := hydrator.HydratePost(ctx, node.uri, viewer) 213 + postInfo, err := hydrator.HydratePostDB(ctx, node.uri, node.val, viewer) 243 214 if err != nil { 215 + slog.Error("failed to hydrate post in thread item", "uri", node.uri, "error", err) 244 216 // Return not found item 245 217 return &bsky.UnspeccedGetPostThreadV2_ThreadItem{ 246 218 Depth: depth, ··· 256 228 // Hydrate author 257 229 authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author) 258 230 if err != nil { 231 + slog.Error("failed to hydrate actor in thread item", "author", postInfo.Author, "error", err) 259 232 return &bsky.UnspeccedGetPostThreadV2_ThreadItem{ 260 233 Depth: depth, 261 234 Uri: node.uri, ··· 319 292 return string(parts) 320 293 } 321 294 322 - func extractRkeyFromURI(uri string) string { 323 - // URI format: at://did:plc:xxx/collection/rkey 324 - if len(uri) < 5 || uri[:5] != "at://" { 325 - return "" 295 + type threadTree struct { 296 + parent *threadTree 297 + children []*threadTree 298 + 299 + val *models.Post 300 + 301 + missing bool 302 + 303 + uri string 304 + cid string 305 + } 306 + 307 + func buildThreadTree(ctx context.Context, hydrator *hydration.Hydrator, db *gorm.DB, posts []*models.Post) (map[uint]*threadTree, error) { 308 + nodes := make(map[uint]*threadTree) 309 + for _, p := range posts { 310 + puri, err := hydrator.UriForPost(ctx, p) 311 + if err != nil { 312 + return nil, err 313 + } 314 + 315 + t := &threadTree{ 316 + val: p, 317 + uri: puri, 318 + } 319 + 320 + nodes[p.ID] = t 326 321 } 327 - // Find last slash 328 - for i := len(uri) - 1; i >= 5; i-- { 329 - if uri[i] == '/' { 330 - return uri[i+1:] 322 + 323 + missing := make(map[uint]*threadTree) 324 + for _, node := range nodes { 325 + if node.val.ReplyTo == 0 { 326 + continue 331 327 } 328 + 329 + pnode, ok := nodes[node.val.ReplyTo] 330 + if !ok { 331 + pnode = &threadTree{ 332 + missing: true, 333 + } 334 + missing[node.val.ReplyTo] = pnode 335 + 336 + var bspost bsky.FeedPost 337 + if err := bspost.UnmarshalCBOR(bytes.NewReader(node.val.Raw)); err != nil { 338 + return nil, err 339 + } 340 + 341 + if bspost.Reply == nil || bspost.Reply.Parent == nil { 342 + return nil, fmt.Errorf("node with parent had no parent in object") 343 + } 344 + 345 + pnode.uri = bspost.Reply.Parent.Uri 346 + pnode.cid = bspost.Reply.Parent.Cid 347 + 348 + /* Maybe we could force hydrate these? 349 + hydrator.AddMissingRecord(puri, true) 350 + */ 351 + } 352 + 353 + pnode.children = append(pnode.children, node) 354 + node.parent = pnode 332 355 } 333 - return "" 356 + 357 + for k, v := range missing { 358 + nodes[k] = v 359 + } 360 + 361 + return nodes, nil 334 362 }