A locally focused bluesky appview
at master 31 kB view raw
1package backend 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "strings" 10 "time" 11 12 "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/api/bsky" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 lexutil "github.com/bluesky-social/indigo/lex/util" 16 "github.com/bluesky-social/indigo/repo" 17 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 18 "github.com/ipfs/go-cid" 19 "github.com/jackc/pgx/v5/pgconn" 20 "github.com/prometheus/client_golang/prometheus" 21 "github.com/prometheus/client_golang/prometheus/promauto" 22 23 . "github.com/whyrusleeping/konbini/models" 24) 25 26var handleOpHist = promauto.NewHistogramVec(prometheus.HistogramOpts{ 27 Name: "handle_op_duration", 28 Help: "A histogram of op handling durations", 29 Buckets: prometheus.ExponentialBuckets(1, 2, 15), 30}, []string{"op", "collection"}) 31 32func (b *PostgresBackend) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error { 33 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 34 if err != nil { 35 return fmt.Errorf("failed to read event repo: %w", err) 36 } 37 38 for _, op := range evt.Ops { 39 switch op.Action { 40 case "create": 41 c, rec, err := r.GetRecordBytes(ctx, op.Path) 42 if err != nil { 43 return err 44 } 45 if err := b.HandleCreate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil { 46 return fmt.Errorf("create record failed: %w", err) 47 } 48 case "update": 49 c, rec, err := r.GetRecordBytes(ctx, op.Path) 50 if err != nil { 51 return err 52 } 53 if err := b.HandleUpdate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil { 54 return fmt.Errorf("update record failed: %w", err) 55 } 56 case "delete": 57 if err := b.HandleDelete(ctx, evt.Repo, evt.Rev, op.Path); err != nil { 58 return fmt.Errorf("delete record failed: %w", err) 59 } 60 } 61 } 62 63 // TODO: sync with the Since field to make sure we don't miss events we care about 64 /* 65 if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil { 66 return fmt.Errorf("failed to update rev: %w", err) 67 } 68 */ 69 70 return nil 71} 72 73func cborBytesFromEvent(evt *jsmodels.Event) ([]byte, error) { 74 val, err := lexutil.NewFromType(evt.Commit.Collection) 75 if err != nil { 76 return nil, fmt.Errorf("failed to load event record type: %w", err) 77 } 78 79 if err := json.Unmarshal(evt.Commit.Record, val); err != nil { 80 return nil, err 81 } 82 83 cval, ok := val.(lexutil.CBOR) 84 if !ok { 85 return nil, fmt.Errorf("decoded type was not cbor marshalable") 86 } 87 88 buf := new(bytes.Buffer) 89 if err := cval.MarshalCBOR(buf); err != nil { 90 return nil, fmt.Errorf("failed to marshal event to cbor: %w", err) 91 } 92 93 rec := buf.Bytes() 94 return rec, nil 95} 96 97func (b *PostgresBackend) HandleEventJetstream(ctx context.Context, evt *jsmodels.Event) error { 98 99 path := evt.Commit.Collection + "/" + evt.Commit.RKey 100 switch evt.Commit.Operation { 101 case jsmodels.CommitOperationCreate: 102 rec, err := cborBytesFromEvent(evt) 103 if err != nil { 104 return err 105 } 106 107 c, err := cid.Decode(evt.Commit.CID) 108 if err != nil { 109 return err 110 } 111 112 if err := b.HandleCreate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil { 113 return fmt.Errorf("create record failed: %w", err) 114 } 115 case jsmodels.CommitOperationUpdate: 116 rec, err := cborBytesFromEvent(evt) 117 if err != nil { 118 return err 119 } 120 121 c, err := cid.Decode(evt.Commit.CID) 122 if err != nil { 123 return err 124 } 125 126 if err := b.HandleUpdate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil { 127 return fmt.Errorf("update record failed: %w", err) 128 } 129 case jsmodels.CommitOperationDelete: 130 if err := b.HandleDelete(ctx, evt.Did, evt.Commit.Rev, path); err != nil { 131 return fmt.Errorf("delete record failed: %w", err) 132 } 133 } 134 135 return nil 136} 137 138func (b *PostgresBackend) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error { 139 start := time.Now() 140 141 rr, err := b.GetOrCreateRepo(ctx, repo) 142 if err != nil { 143 return fmt.Errorf("get user failed: %w", err) 144 } 145 146 lrev, err := b.revForRepo(rr) 147 if err != nil { 148 return err 149 } 150 if lrev != "" { 151 if rev < lrev { 152 slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path) 153 return nil 154 } 155 } 156 157 parts := strings.Split(path, "/") 158 if len(parts) != 2 { 159 return fmt.Errorf("invalid path in HandleCreate: %q", path) 160 } 161 col := parts[0] 162 rkey := parts[1] 163 164 defer func() { 165 handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds())) 166 }() 167 168 if rkey == "" { 169 fmt.Printf("messed up path: %q\n", rkey) 170 } 171 172 switch col { 173 case "app.bsky.feed.post": 174 if err := b.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil { 175 return err 176 } 177 case "app.bsky.feed.like": 178 if err := b.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil { 179 return err 180 } 181 case "app.bsky.feed.repost": 182 if err := b.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil { 183 return err 184 } 185 case "app.bsky.graph.follow": 186 if err := b.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil { 187 return err 188 } 189 case "app.bsky.graph.block": 190 if err := b.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil { 191 return err 192 } 193 case "app.bsky.graph.list": 194 if err := b.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil { 195 return err 196 } 197 case "app.bsky.graph.listitem": 198 if err := b.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil { 199 return err 200 } 201 case "app.bsky.graph.listblock": 202 if err := b.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil { 203 return err 204 } 205 case "app.bsky.actor.profile": 206 if err := b.HandleCreateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil { 207 return err 208 } 209 case "app.bsky.feed.generator": 210 if err := b.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil { 211 return err 212 } 213 case "app.bsky.feed.threadgate": 214 if err := b.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil { 215 return err 216 } 217 case "chat.bsky.actor.declaration": 218 if err := b.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil { 219 return err 220 } 221 case "app.bsky.feed.postgate": 222 if err := b.HandleCreatePostGate(ctx, rr, rkey, *rec, *cid); err != nil { 223 return err 224 } 225 case "app.bsky.graph.starterpack": 226 if err := b.HandleCreateStarterPack(ctx, rr, rkey, *rec, *cid); err != nil { 227 return err 228 } 229 default: 230 slog.Debug("unrecognized record type", "repo", repo, "path", path, "rev", rev) 231 } 232 233 b.revCache.Add(rr.ID, rev) 234 return nil 235} 236 237func (b *PostgresBackend) HandleCreatePost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 238 exists, err := b.checkPostExists(ctx, repo, rkey) 239 if err != nil { 240 return err 241 } 242 243 // still technically a race condition if two creates for the same post happen concurrently... probably fine 244 if exists { 245 return nil 246 } 247 248 var rec bsky.FeedPost 249 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 250 uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey 251 slog.Warn("skipping post with malformed data", "uri", uri, "error", err) 252 return nil // Skip this post rather than failing the entire event 253 } 254 255 reldids := []string{repo.Did} 256 // care about a post if its in a thread of a user we are interested in 257 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil { 258 reldids = append(reldids, rec.Reply.Parent.Uri, rec.Reply.Root.Uri) 259 } 260 // TODO: maybe also care if its mentioning a user we care about or quoting a user we care about? 261 if !b.anyRelevantIdents(reldids...) { 262 return nil 263 } 264 265 uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey 266 slog.Warn("adding post", "uri", uri) 267 268 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 269 if err != nil { 270 return fmt.Errorf("invalid timestamp: %w", err) 271 } 272 273 p := Post{ 274 Created: created.Time(), 275 Indexed: time.Now(), 276 Author: repo.ID, 277 Rkey: rkey, 278 Raw: recb, 279 Cid: cc.String(), 280 } 281 282 if rec.Reply != nil && rec.Reply.Parent != nil { 283 if rec.Reply.Root == nil { 284 return fmt.Errorf("post reply had nil root") 285 } 286 287 pinfo, err := b.postInfoForUri(ctx, rec.Reply.Parent.Uri) 288 if err != nil { 289 return fmt.Errorf("getting reply parent: %w", err) 290 } 291 292 p.ReplyTo = pinfo.ID 293 p.ReplyToUsr = pinfo.Author 294 295 thread, err := b.postIDForUri(ctx, rec.Reply.Root.Uri) 296 if err != nil { 297 return fmt.Errorf("getting thread root: %w", err) 298 } 299 300 p.InThread = thread 301 302 r, err := b.GetOrCreateRepo(ctx, b.mydid) 303 if err != nil { 304 return err 305 } 306 307 if p.ReplyToUsr == r.ID { 308 if err := b.AddNotification(ctx, r.ID, p.Author, uri, cc, NotifKindReply); err != nil { 309 slog.Warn("failed to create notification", "uri", uri, "error", err) 310 } 311 } 312 } 313 314 if rec.Embed != nil { 315 var rpref string 316 if rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil { 317 rpref = rec.Embed.EmbedRecord.Record.Uri 318 } 319 if rec.Embed.EmbedRecordWithMedia != nil && 320 rec.Embed.EmbedRecordWithMedia.Record != nil && 321 rec.Embed.EmbedRecordWithMedia.Record.Record != nil { 322 rpref = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri 323 } 324 325 if rpref != "" && strings.Contains(rpref, "app.bsky.feed.post") { 326 rp, err := b.postIDForUri(ctx, rpref) 327 if err != nil { 328 return fmt.Errorf("getting quote subject: %w", err) 329 } 330 331 p.Reposting = rp 332 } 333 } 334 335 if err := b.doPostCreate(ctx, &p); err != nil { 336 return err 337 } 338 339 // Check for mentions and create notifications 340 if rec.Facets != nil { 341 for _, facet := range rec.Facets { 342 for _, feature := range facet.Features { 343 if feature.RichtextFacet_Mention != nil { 344 mentionDid := feature.RichtextFacet_Mention.Did 345 // This is a mention 346 mentionedRepo, err := b.GetOrCreateRepo(ctx, mentionDid) 347 if err != nil { 348 slog.Warn("failed to get repo for mention", "did", mentionDid, "error", err) 349 continue 350 } 351 352 // Create notification if the mentioned user is the current user 353 if mentionedRepo.ID == b.myrepo.ID { 354 if err := b.AddNotification(ctx, b.myrepo.ID, p.Author, uri, cc, NotifKindMention); err != nil { 355 slog.Warn("failed to create mention notification", "uri", uri, "error", err) 356 } 357 } 358 } 359 } 360 } 361 } 362 363 b.postInfoCache.Add(uri, cachedPostInfo{ 364 ID: p.ID, 365 Author: p.Author, 366 }) 367 368 return nil 369} 370 371func (b *PostgresBackend) doPostCreate(ctx context.Context, p *Post) error { 372 /* 373 if err := b.db.Clauses(clause.OnConflict{ 374 Columns: []clause.Column{{Name: "author"}, {Name: "rkey"}}, 375 DoUpdates: clause.AssignmentColumns([]string{"cid", "not_found", "raw", "created", "indexed"}), 376 }).Create(p).Error; err != nil { 377 return err 378 } 379 */ 380 381 query := ` 382INSERT INTO posts (author, rkey, cid, not_found, raw, created, indexed, reposting, reply_to, reply_to_usr, in_thread) 383VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) 384ON CONFLICT (author, rkey) 385DO UPDATE SET 386 cid = $3, 387 not_found = $4, 388 raw = $5, 389 created = $6, 390 indexed = $7, 391 reposting = $8, 392 reply_to = $9, 393 reply_to_usr = $10, 394 in_thread = $11 395RETURNING id 396` 397 398 // Execute the query with parameters from the Post struct 399 if err := b.pgx.QueryRow( 400 ctx, 401 query, 402 p.Author, 403 p.Rkey, 404 p.Cid, 405 p.NotFound, 406 p.Raw, 407 p.Created, 408 p.Indexed, 409 p.Reposting, 410 p.ReplyTo, 411 p.ReplyToUsr, 412 p.InThread, 413 ).Scan(&p.ID); err != nil { 414 return err 415 } 416 417 return nil 418} 419 420func (b *PostgresBackend) HandleCreateLike(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 421 var rec bsky.FeedLike 422 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 423 return err 424 } 425 426 if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) { 427 return nil 428 } 429 430 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 431 if err != nil { 432 return fmt.Errorf("invalid timestamp: %w", err) 433 } 434 435 pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri) 436 if err != nil { 437 return fmt.Errorf("getting like subject: %w", err) 438 } 439 440 if _, err := b.pgx.Exec(ctx, `INSERT INTO "likes" ("created","indexed","author","rkey","subject","cid") VALUES ($1, $2, $3, $4, $5, $6)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID, cc.String()); err != nil { 441 pgErr, ok := err.(*pgconn.PgError) 442 if ok && pgErr.Code == "23505" { 443 return nil 444 } 445 return err 446 } 447 448 // Create notification if the liked post belongs to the current user 449 if pinfo.Author == b.myrepo.ID { 450 uri := fmt.Sprintf("at://%s/app.bsky.feed.like/%s", repo.Did, rkey) 451 if err := b.AddNotification(ctx, b.myrepo.ID, repo.ID, uri, cc, NotifKindLike); err != nil { 452 slog.Warn("failed to create like notification", "uri", uri, "error", err) 453 } 454 } 455 456 return nil 457} 458 459func (b *PostgresBackend) HandleCreateRepost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 460 var rec bsky.FeedRepost 461 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 462 return err 463 } 464 465 if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) { 466 return nil 467 } 468 469 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 470 if err != nil { 471 return fmt.Errorf("invalid timestamp: %w", err) 472 } 473 474 pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri) 475 if err != nil { 476 return fmt.Errorf("getting repost subject: %w", err) 477 } 478 479 if _, err := b.pgx.Exec(ctx, `INSERT INTO "reposts" ("created","indexed","author","rkey","subject") VALUES ($1, $2, $3, $4, $5)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID); err != nil { 480 pgErr, ok := err.(*pgconn.PgError) 481 if ok && pgErr.Code == "23505" { 482 return nil 483 } 484 return err 485 } 486 487 // Create notification if the reposted post belongs to the current user 488 if pinfo.Author == b.myrepo.ID { 489 uri := fmt.Sprintf("at://%s/app.bsky.feed.repost/%s", repo.Did, rkey) 490 if err := b.AddNotification(ctx, b.myrepo.ID, repo.ID, uri, cc, NotifKindRepost); err != nil { 491 slog.Warn("failed to create repost notification", "uri", uri, "error", err) 492 } 493 } 494 495 return nil 496} 497 498func (b *PostgresBackend) HandleCreateFollow(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 499 var rec bsky.GraphFollow 500 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 501 return err 502 } 503 504 if !b.anyRelevantIdents(repo.Did, rec.Subject) { 505 return nil 506 } 507 508 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 509 if err != nil { 510 return fmt.Errorf("invalid timestamp: %w", err) 511 } 512 513 subj, err := b.GetOrCreateRepo(ctx, rec.Subject) 514 if err != nil { 515 return err 516 } 517 518 if _, err := b.pgx.Exec(ctx, "INSERT INTO follows (created, indexed, author, rkey, subject) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING", created.Time(), time.Now(), repo.ID, rkey, subj.ID); err != nil { 519 return err 520 } 521 522 return nil 523} 524 525func (b *PostgresBackend) HandleCreateBlock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 526 var rec bsky.GraphBlock 527 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 528 return err 529 } 530 531 if !b.anyRelevantIdents(repo.Did, rec.Subject) { 532 return nil 533 } 534 535 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 536 if err != nil { 537 return fmt.Errorf("invalid timestamp: %w", err) 538 } 539 540 subj, err := b.GetOrCreateRepo(ctx, rec.Subject) 541 if err != nil { 542 return err 543 } 544 545 if err := b.db.Create(&Block{ 546 Created: created.Time(), 547 Indexed: time.Now(), 548 Author: repo.ID, 549 Rkey: rkey, 550 Subject: subj.ID, 551 }).Error; err != nil { 552 return err 553 } 554 555 return nil 556} 557 558func (b *PostgresBackend) HandleCreateList(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 559 var rec bsky.GraphList 560 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 561 return err 562 } 563 564 if !b.anyRelevantIdents(repo.Did) { 565 return nil 566 } 567 568 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 569 if err != nil { 570 return fmt.Errorf("invalid timestamp: %w", err) 571 } 572 573 if err := b.db.Create(&List{ 574 Created: created.Time(), 575 Indexed: time.Now(), 576 Author: repo.ID, 577 Rkey: rkey, 578 Raw: recb, 579 }).Error; err != nil { 580 return err 581 } 582 583 return nil 584} 585 586func (b *PostgresBackend) HandleCreateListitem(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 587 var rec bsky.GraphListitem 588 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 589 return err 590 } 591 if !b.anyRelevantIdents(repo.Did) { 592 return nil 593 } 594 595 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 596 if err != nil { 597 return fmt.Errorf("invalid timestamp: %w", err) 598 } 599 600 subj, err := b.GetOrCreateRepo(ctx, rec.Subject) 601 if err != nil { 602 return err 603 } 604 605 list, err := b.GetOrCreateList(ctx, rec.List) 606 if err != nil { 607 return err 608 } 609 610 if err := b.db.Create(&ListItem{ 611 Created: created.Time(), 612 Indexed: time.Now(), 613 Author: repo.ID, 614 Rkey: rkey, 615 Subject: subj.ID, 616 List: list.ID, 617 }).Error; err != nil { 618 return err 619 } 620 621 return nil 622} 623 624func (b *PostgresBackend) HandleCreateListblock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 625 var rec bsky.GraphListblock 626 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 627 return err 628 } 629 630 if !b.anyRelevantIdents(repo.Did, rec.Subject) { 631 return nil 632 } 633 634 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 635 if err != nil { 636 return fmt.Errorf("invalid timestamp: %w", err) 637 } 638 639 list, err := b.GetOrCreateList(ctx, rec.Subject) 640 if err != nil { 641 return err 642 } 643 644 if err := b.db.Create(&ListBlock{ 645 Created: created.Time(), 646 Indexed: time.Now(), 647 Author: repo.ID, 648 Rkey: rkey, 649 List: list.ID, 650 }).Error; err != nil { 651 return err 652 } 653 654 return nil 655} 656 657func (b *PostgresBackend) HandleCreateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error { 658 if !b.anyRelevantIdents(repo.Did) { 659 return nil 660 } 661 662 if err := b.db.Create(&Profile{ 663 //Created: created.Time(), 664 Indexed: time.Now(), 665 Repo: repo.ID, 666 Raw: recb, 667 Rev: rev, 668 }).Error; err != nil { 669 return err 670 } 671 672 return nil 673} 674 675func (b *PostgresBackend) HandleUpdateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error { 676 if !b.anyRelevantIdents(repo.Did) { 677 return nil 678 } 679 680 if err := b.db.Create(&Profile{ 681 Indexed: time.Now(), 682 Repo: repo.ID, 683 Raw: recb, 684 Rev: rev, 685 }).Error; err != nil { 686 return err 687 } 688 689 return nil 690} 691 692func (b *PostgresBackend) HandleCreateFeedGenerator(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 693 if !b.anyRelevantIdents(repo.Did) { 694 return nil 695 } 696 697 var rec bsky.FeedGenerator 698 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 699 return err 700 } 701 702 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 703 if err != nil { 704 return fmt.Errorf("invalid timestamp: %w", err) 705 } 706 707 if err := b.db.Create(&FeedGenerator{ 708 Created: created.Time(), 709 Indexed: time.Now(), 710 Author: repo.ID, 711 Rkey: rkey, 712 Did: rec.Did, 713 Raw: recb, 714 }).Error; err != nil { 715 return err 716 } 717 718 return nil 719} 720 721func (b *PostgresBackend) HandleCreateThreadgate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 722 if !b.anyRelevantIdents(repo.Did) { 723 return nil 724 } 725 var rec bsky.FeedThreadgate 726 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 727 return err 728 } 729 730 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 731 if err != nil { 732 return fmt.Errorf("invalid timestamp: %w", err) 733 } 734 735 pid, err := b.postIDForUri(ctx, rec.Post) 736 if err != nil { 737 return err 738 } 739 740 if err := b.db.Create(&ThreadGate{ 741 Created: created.Time(), 742 Indexed: time.Now(), 743 Author: repo.ID, 744 Rkey: rkey, 745 Post: pid, 746 }).Error; err != nil { 747 return err 748 } 749 750 return nil 751} 752 753func (b *PostgresBackend) HandleCreateChatDeclaration(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 754 // TODO: maybe track these? 755 return nil 756} 757 758func (b *PostgresBackend) HandleCreatePostGate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 759 if !b.anyRelevantIdents(repo.Did) { 760 return nil 761 } 762 var rec bsky.FeedPostgate 763 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 764 return err 765 } 766 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 767 if err != nil { 768 return fmt.Errorf("invalid timestamp: %w", err) 769 } 770 771 refPost, err := b.postInfoForUri(ctx, rec.Post) 772 if err != nil { 773 return err 774 } 775 776 if err := b.db.Create(&PostGate{ 777 Created: created.Time(), 778 Indexed: time.Now(), 779 Author: repo.ID, 780 Rkey: rkey, 781 Subject: refPost.ID, 782 Raw: recb, 783 }).Error; err != nil { 784 return err 785 } 786 787 return nil 788} 789 790func (b *PostgresBackend) HandleCreateStarterPack(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 791 if !b.anyRelevantIdents(repo.Did) { 792 return nil 793 } 794 var rec bsky.GraphStarterpack 795 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 796 return err 797 } 798 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 799 if err != nil { 800 return fmt.Errorf("invalid timestamp: %w", err) 801 } 802 803 list, err := b.GetOrCreateList(ctx, rec.List) 804 if err != nil { 805 return err 806 } 807 808 if err := b.db.Create(&StarterPack{ 809 Created: created.Time(), 810 Indexed: time.Now(), 811 Author: repo.ID, 812 Rkey: rkey, 813 Raw: recb, 814 List: list.ID, 815 }).Error; err != nil { 816 return err 817 } 818 819 return nil 820} 821 822func (b *PostgresBackend) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error { 823 start := time.Now() 824 825 rr, err := b.GetOrCreateRepo(ctx, repo) 826 if err != nil { 827 return fmt.Errorf("get user failed: %w", err) 828 } 829 830 lrev, err := b.revForRepo(rr) 831 if err != nil { 832 return err 833 } 834 if lrev != "" { 835 if rev < lrev { 836 //slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path) 837 return nil 838 } 839 } 840 841 parts := strings.Split(path, "/") 842 if len(parts) != 2 { 843 return fmt.Errorf("invalid path in HandleCreate: %q", path) 844 } 845 col := parts[0] 846 rkey := parts[1] 847 848 defer func() { 849 handleOpHist.WithLabelValues("update", col).Observe(float64(time.Since(start).Milliseconds())) 850 }() 851 852 if rkey == "" { 853 fmt.Printf("messed up path: %q\n", rkey) 854 } 855 856 switch col { 857 /* 858 case "app.bsky.feed.post": 859 if err := s.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil { 860 return err 861 } 862 case "app.bsky.feed.like": 863 if err := s.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil { 864 return err 865 } 866 case "app.bsky.feed.repost": 867 if err := s.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil { 868 return err 869 } 870 case "app.bsky.graph.follow": 871 if err := s.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil { 872 return err 873 } 874 case "app.bsky.graph.block": 875 if err := s.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil { 876 return err 877 } 878 case "app.bsky.graph.list": 879 if err := s.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil { 880 return err 881 } 882 case "app.bsky.graph.listitem": 883 if err := s.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil { 884 return err 885 } 886 case "app.bsky.graph.listblock": 887 if err := s.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil { 888 return err 889 } 890 */ 891 case "app.bsky.actor.profile": 892 if err := b.HandleUpdateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil { 893 return err 894 } 895 /* 896 case "app.bsky.feed.generator": 897 if err := s.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil { 898 return err 899 } 900 case "app.bsky.feed.threadgate": 901 if err := s.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil { 902 return err 903 } 904 case "chat.bsky.actor.declaration": 905 if err := s.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil { 906 return err 907 } 908 */ 909 default: 910 slog.Debug("unrecognized record type in update", "repo", repo, "path", path, "rev", rev) 911 } 912 913 return nil 914} 915 916func (b *PostgresBackend) HandleDelete(ctx context.Context, repo string, rev string, path string) error { 917 start := time.Now() 918 919 rr, err := b.GetOrCreateRepo(ctx, repo) 920 if err != nil { 921 return fmt.Errorf("get user failed: %w", err) 922 } 923 924 lrev, ok := b.revCache.Get(rr.ID) 925 if ok { 926 if rev < lrev { 927 //slog.Info("skipping old rev delete", "did", rr.Did, "rev", rev, "oldrev", lrev) 928 return nil 929 } 930 } 931 932 parts := strings.Split(path, "/") 933 if len(parts) != 2 { 934 return fmt.Errorf("invalid path in HandleDelete: %q", path) 935 } 936 col := parts[0] 937 rkey := parts[1] 938 939 defer func() { 940 handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds())) 941 }() 942 943 switch col { 944 case "app.bsky.feed.post": 945 if err := b.HandleDeletePost(ctx, rr, rkey); err != nil { 946 return err 947 } 948 case "app.bsky.feed.like": 949 if err := b.HandleDeleteLike(ctx, rr, rkey); err != nil { 950 return err 951 } 952 case "app.bsky.feed.repost": 953 if err := b.HandleDeleteRepost(ctx, rr, rkey); err != nil { 954 return err 955 } 956 case "app.bsky.graph.follow": 957 if err := b.HandleDeleteFollow(ctx, rr, rkey); err != nil { 958 return err 959 } 960 case "app.bsky.graph.block": 961 if err := b.HandleDeleteBlock(ctx, rr, rkey); err != nil { 962 return err 963 } 964 case "app.bsky.graph.list": 965 if err := b.HandleDeleteList(ctx, rr, rkey); err != nil { 966 return err 967 } 968 case "app.bsky.graph.listitem": 969 if err := b.HandleDeleteListitem(ctx, rr, rkey); err != nil { 970 return err 971 } 972 case "app.bsky.graph.listblock": 973 if err := b.HandleDeleteListblock(ctx, rr, rkey); err != nil { 974 return err 975 } 976 case "app.bsky.actor.profile": 977 if err := b.HandleDeleteProfile(ctx, rr, rkey); err != nil { 978 return err 979 } 980 case "app.bsky.feed.generator": 981 if err := b.HandleDeleteFeedGenerator(ctx, rr, rkey); err != nil { 982 return err 983 } 984 case "app.bsky.feed.threadgate": 985 if err := b.HandleDeleteThreadgate(ctx, rr, rkey); err != nil { 986 return err 987 } 988 default: 989 slog.Warn("delete unrecognized record type", "repo", repo, "path", path, "rev", rev) 990 } 991 992 b.revCache.Add(rr.ID, rev) 993 return nil 994} 995 996func (b *PostgresBackend) HandleDeletePost(ctx context.Context, repo *Repo, rkey string) error { 997 var p Post 998 if err := b.db.Find(&p, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 999 return err 1000 } 1001 1002 if p.ID == 0 { 1003 //slog.Warn("delete of unknown post record", "repo", repo.Did, "rkey", rkey) 1004 return nil 1005 } 1006 1007 if err := b.db.Delete(&Post{}, p.ID).Error; err != nil { 1008 return err 1009 } 1010 1011 return nil 1012} 1013 1014func (b *PostgresBackend) HandleDeleteLike(ctx context.Context, repo *Repo, rkey string) error { 1015 var like Like 1016 if err := b.db.Find(&like, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1017 return err 1018 } 1019 1020 if like.ID == 0 { 1021 //slog.Warn("delete of missing like", "repo", repo.Did, "rkey", rkey) 1022 return nil 1023 } 1024 1025 if err := b.db.Exec("DELETE FROM likes WHERE id = ?", like.ID).Error; err != nil { 1026 return err 1027 } 1028 1029 return nil 1030} 1031 1032func (b *PostgresBackend) HandleDeleteRepost(ctx context.Context, repo *Repo, rkey string) error { 1033 var repost Repost 1034 if err := b.db.Find(&repost, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1035 return err 1036 } 1037 1038 if repost.ID == 0 { 1039 //return fmt.Errorf("delete of missing repost: %s %s", repo.Did, rkey) 1040 return nil 1041 } 1042 1043 if err := b.db.Exec("DELETE FROM reposts WHERE id = ?", repost.ID).Error; err != nil { 1044 return err 1045 } 1046 1047 return nil 1048} 1049 1050func (b *PostgresBackend) HandleDeleteFollow(ctx context.Context, repo *Repo, rkey string) error { 1051 var follow Follow 1052 if err := b.db.Find(&follow, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1053 return err 1054 } 1055 1056 if follow.ID == 0 { 1057 //slog.Warn("delete of missing follow", "repo", repo.Did, "rkey", rkey) 1058 return nil 1059 } 1060 1061 if err := b.db.Exec("DELETE FROM follows WHERE id = ?", follow.ID).Error; err != nil { 1062 return err 1063 } 1064 1065 return nil 1066} 1067 1068func (b *PostgresBackend) HandleDeleteBlock(ctx context.Context, repo *Repo, rkey string) error { 1069 var block Block 1070 if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1071 return err 1072 } 1073 1074 if block.ID == 0 { 1075 //slog.Warn("delete of missing block", "repo", repo.Did, "rkey", rkey) 1076 return nil 1077 } 1078 1079 if err := b.db.Exec("DELETE FROM blocks WHERE id = ?", block.ID).Error; err != nil { 1080 return err 1081 } 1082 1083 return nil 1084} 1085 1086func (b *PostgresBackend) HandleDeleteList(ctx context.Context, repo *Repo, rkey string) error { 1087 var list List 1088 if err := b.db.Find(&list, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1089 return err 1090 } 1091 1092 if list.ID == 0 { 1093 return nil 1094 //return fmt.Errorf("delete of missing list: %s %s", repo.Did, rkey) 1095 } 1096 1097 if err := b.db.Exec("DELETE FROM lists WHERE id = ?", list.ID).Error; err != nil { 1098 return err 1099 } 1100 1101 return nil 1102} 1103 1104func (b *PostgresBackend) HandleDeleteListitem(ctx context.Context, repo *Repo, rkey string) error { 1105 var item ListItem 1106 if err := b.db.Find(&item, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1107 return err 1108 } 1109 1110 if item.ID == 0 { 1111 return nil 1112 //return fmt.Errorf("delete of missing listitem: %s %s", repo.Did, rkey) 1113 } 1114 1115 if err := b.db.Exec("DELETE FROM list_items WHERE id = ?", item.ID).Error; err != nil { 1116 return err 1117 } 1118 1119 return nil 1120} 1121 1122func (b *PostgresBackend) HandleDeleteListblock(ctx context.Context, repo *Repo, rkey string) error { 1123 var block ListBlock 1124 if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1125 return err 1126 } 1127 1128 if block.ID == 0 { 1129 return nil 1130 //return fmt.Errorf("delete of missing listblock: %s %s", repo.Did, rkey) 1131 } 1132 1133 if err := b.db.Exec("DELETE FROM list_blocks WHERE id = ?", block.ID).Error; err != nil { 1134 return err 1135 } 1136 1137 return nil 1138} 1139 1140func (b *PostgresBackend) HandleDeleteFeedGenerator(ctx context.Context, repo *Repo, rkey string) error { 1141 var feedgen FeedGenerator 1142 if err := b.db.Find(&feedgen, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1143 return err 1144 } 1145 1146 if feedgen.ID == 0 { 1147 return nil 1148 //return fmt.Errorf("delete of missing feedgen: %s %s", repo.Did, rkey) 1149 } 1150 1151 if err := b.db.Exec("DELETE FROM feed_generators WHERE id = ?", feedgen.ID).Error; err != nil { 1152 return err 1153 } 1154 1155 return nil 1156} 1157 1158func (b *PostgresBackend) HandleDeleteThreadgate(ctx context.Context, repo *Repo, rkey string) error { 1159 var threadgate ThreadGate 1160 if err := b.db.Find(&threadgate, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1161 return err 1162 } 1163 1164 if threadgate.ID == 0 { 1165 return nil 1166 //return fmt.Errorf("delete of missing threadgate: %s %s", repo.Did, rkey) 1167 } 1168 1169 if err := b.db.Exec("DELETE FROM thread_gates WHERE id = ?", threadgate.ID).Error; err != nil { 1170 return err 1171 } 1172 1173 return nil 1174} 1175 1176func (b *PostgresBackend) HandleDeleteProfile(ctx context.Context, repo *Repo, rkey string) error { 1177 var profile Profile 1178 if err := b.db.Find(&profile, "repo = ?", repo.ID).Error; err != nil { 1179 return err 1180 } 1181 1182 if profile.ID == 0 { 1183 return nil 1184 } 1185 1186 if err := b.db.Exec("DELETE FROM profiles WHERE id = ?", profile.ID).Error; err != nil { 1187 return err 1188 } 1189 1190 return nil 1191} 1192 1193const ( 1194 NotifKindReply = "reply" 1195 NotifKindLike = "like" 1196 NotifKindMention = "mention" 1197 NotifKindRepost = "repost" 1198) 1199 1200func (b *PostgresBackend) AddNotification(ctx context.Context, forUser, author uint, recordUri string, recordCid cid.Cid, kind string) error { 1201 return b.db.Create(&Notification{ 1202 For: forUser, 1203 Author: author, 1204 Source: recordUri, 1205 SourceCid: recordCid.String(), 1206 Kind: kind, 1207 }).Error 1208}