A locally focused bluesky appview
1package backend
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "strings"
10 "time"
11
12 "github.com/bluesky-social/indigo/api/atproto"
13 "github.com/bluesky-social/indigo/api/bsky"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 lexutil "github.com/bluesky-social/indigo/lex/util"
16 "github.com/bluesky-social/indigo/repo"
17 jsmodels "github.com/bluesky-social/jetstream/pkg/models"
18 "github.com/ipfs/go-cid"
19 "github.com/jackc/pgx/v5/pgconn"
20 "github.com/prometheus/client_golang/prometheus"
21 "github.com/prometheus/client_golang/prometheus/promauto"
22
23 . "github.com/whyrusleeping/konbini/models"
24)
25
26var handleOpHist = promauto.NewHistogramVec(prometheus.HistogramOpts{
27 Name: "handle_op_duration",
28 Help: "A histogram of op handling durations",
29 Buckets: prometheus.ExponentialBuckets(1, 2, 15),
30}, []string{"op", "collection"})
31
32func (b *PostgresBackend) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error {
33 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
34 if err != nil {
35 return fmt.Errorf("failed to read event repo: %w", err)
36 }
37
38 for _, op := range evt.Ops {
39 switch op.Action {
40 case "create":
41 c, rec, err := r.GetRecordBytes(ctx, op.Path)
42 if err != nil {
43 return err
44 }
45 if err := b.HandleCreate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil {
46 return fmt.Errorf("create record failed: %w", err)
47 }
48 case "update":
49 c, rec, err := r.GetRecordBytes(ctx, op.Path)
50 if err != nil {
51 return err
52 }
53 if err := b.HandleUpdate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil {
54 return fmt.Errorf("update record failed: %w", err)
55 }
56 case "delete":
57 if err := b.HandleDelete(ctx, evt.Repo, evt.Rev, op.Path); err != nil {
58 return fmt.Errorf("delete record failed: %w", err)
59 }
60 }
61 }
62
63 // TODO: sync with the Since field to make sure we don't miss events we care about
64 /*
65 if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil {
66 return fmt.Errorf("failed to update rev: %w", err)
67 }
68 */
69
70 return nil
71}
72
73func cborBytesFromEvent(evt *jsmodels.Event) ([]byte, error) {
74 val, err := lexutil.NewFromType(evt.Commit.Collection)
75 if err != nil {
76 return nil, fmt.Errorf("failed to load event record type: %w", err)
77 }
78
79 if err := json.Unmarshal(evt.Commit.Record, val); err != nil {
80 return nil, err
81 }
82
83 cval, ok := val.(lexutil.CBOR)
84 if !ok {
85 return nil, fmt.Errorf("decoded type was not cbor marshalable")
86 }
87
88 buf := new(bytes.Buffer)
89 if err := cval.MarshalCBOR(buf); err != nil {
90 return nil, fmt.Errorf("failed to marshal event to cbor: %w", err)
91 }
92
93 rec := buf.Bytes()
94 return rec, nil
95}
96
97func (b *PostgresBackend) HandleEventJetstream(ctx context.Context, evt *jsmodels.Event) error {
98
99 path := evt.Commit.Collection + "/" + evt.Commit.RKey
100 switch evt.Commit.Operation {
101 case jsmodels.CommitOperationCreate:
102 rec, err := cborBytesFromEvent(evt)
103 if err != nil {
104 return err
105 }
106
107 c, err := cid.Decode(evt.Commit.CID)
108 if err != nil {
109 return err
110 }
111
112 if err := b.HandleCreate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil {
113 return fmt.Errorf("create record failed: %w", err)
114 }
115 case jsmodels.CommitOperationUpdate:
116 rec, err := cborBytesFromEvent(evt)
117 if err != nil {
118 return err
119 }
120
121 c, err := cid.Decode(evt.Commit.CID)
122 if err != nil {
123 return err
124 }
125
126 if err := b.HandleUpdate(ctx, evt.Did, evt.Commit.Rev, path, &rec, &c); err != nil {
127 return fmt.Errorf("update record failed: %w", err)
128 }
129 case jsmodels.CommitOperationDelete:
130 if err := b.HandleDelete(ctx, evt.Did, evt.Commit.Rev, path); err != nil {
131 return fmt.Errorf("delete record failed: %w", err)
132 }
133 }
134
135 return nil
136}
137
138func (b *PostgresBackend) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
139 start := time.Now()
140
141 rr, err := b.GetOrCreateRepo(ctx, repo)
142 if err != nil {
143 return fmt.Errorf("get user failed: %w", err)
144 }
145
146 lrev, err := b.revForRepo(rr)
147 if err != nil {
148 return err
149 }
150 if lrev != "" {
151 if rev < lrev {
152 slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path)
153 return nil
154 }
155 }
156
157 parts := strings.Split(path, "/")
158 if len(parts) != 2 {
159 return fmt.Errorf("invalid path in HandleCreate: %q", path)
160 }
161 col := parts[0]
162 rkey := parts[1]
163
164 defer func() {
165 handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds()))
166 }()
167
168 if rkey == "" {
169 fmt.Printf("messed up path: %q\n", rkey)
170 }
171
172 switch col {
173 case "app.bsky.feed.post":
174 if err := b.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil {
175 return err
176 }
177 case "app.bsky.feed.like":
178 if err := b.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil {
179 return err
180 }
181 case "app.bsky.feed.repost":
182 if err := b.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil {
183 return err
184 }
185 case "app.bsky.graph.follow":
186 if err := b.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil {
187 return err
188 }
189 case "app.bsky.graph.block":
190 if err := b.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil {
191 return err
192 }
193 case "app.bsky.graph.list":
194 if err := b.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil {
195 return err
196 }
197 case "app.bsky.graph.listitem":
198 if err := b.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil {
199 return err
200 }
201 case "app.bsky.graph.listblock":
202 if err := b.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil {
203 return err
204 }
205 case "app.bsky.actor.profile":
206 if err := b.HandleCreateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil {
207 return err
208 }
209 case "app.bsky.feed.generator":
210 if err := b.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil {
211 return err
212 }
213 case "app.bsky.feed.threadgate":
214 if err := b.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil {
215 return err
216 }
217 case "chat.bsky.actor.declaration":
218 if err := b.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil {
219 return err
220 }
221 case "app.bsky.feed.postgate":
222 if err := b.HandleCreatePostGate(ctx, rr, rkey, *rec, *cid); err != nil {
223 return err
224 }
225 case "app.bsky.graph.starterpack":
226 if err := b.HandleCreateStarterPack(ctx, rr, rkey, *rec, *cid); err != nil {
227 return err
228 }
229 default:
230 slog.Debug("unrecognized record type", "repo", repo, "path", path, "rev", rev)
231 }
232
233 b.revCache.Add(rr.ID, rev)
234 return nil
235}
236
237func (b *PostgresBackend) HandleCreatePost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
238 exists, err := b.checkPostExists(ctx, repo, rkey)
239 if err != nil {
240 return err
241 }
242
243 // still technically a race condition if two creates for the same post happen concurrently... probably fine
244 if exists {
245 return nil
246 }
247
248 var rec bsky.FeedPost
249 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
250 uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey
251 slog.Warn("skipping post with malformed data", "uri", uri, "error", err)
252 return nil // Skip this post rather than failing the entire event
253 }
254
255 reldids := []string{repo.Did}
256 // care about a post if its in a thread of a user we are interested in
257 if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil {
258 reldids = append(reldids, rec.Reply.Parent.Uri, rec.Reply.Root.Uri)
259 }
260 // TODO: maybe also care if its mentioning a user we care about or quoting a user we care about?
261 if !b.anyRelevantIdents(reldids...) {
262 return nil
263 }
264
265 uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey
266 slog.Warn("adding post", "uri", uri)
267
268 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
269 if err != nil {
270 return fmt.Errorf("invalid timestamp: %w", err)
271 }
272
273 p := Post{
274 Created: created.Time(),
275 Indexed: time.Now(),
276 Author: repo.ID,
277 Rkey: rkey,
278 Raw: recb,
279 Cid: cc.String(),
280 }
281
282 if rec.Reply != nil && rec.Reply.Parent != nil {
283 if rec.Reply.Root == nil {
284 return fmt.Errorf("post reply had nil root")
285 }
286
287 pinfo, err := b.postInfoForUri(ctx, rec.Reply.Parent.Uri)
288 if err != nil {
289 return fmt.Errorf("getting reply parent: %w", err)
290 }
291
292 p.ReplyTo = pinfo.ID
293 p.ReplyToUsr = pinfo.Author
294
295 thread, err := b.postIDForUri(ctx, rec.Reply.Root.Uri)
296 if err != nil {
297 return fmt.Errorf("getting thread root: %w", err)
298 }
299
300 p.InThread = thread
301
302 r, err := b.GetOrCreateRepo(ctx, b.mydid)
303 if err != nil {
304 return err
305 }
306
307 if p.ReplyToUsr == r.ID {
308 if err := b.AddNotification(ctx, r.ID, p.Author, uri, cc, NotifKindReply); err != nil {
309 slog.Warn("failed to create notification", "uri", uri, "error", err)
310 }
311 }
312 }
313
314 if rec.Embed != nil {
315 var rpref string
316 if rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
317 rpref = rec.Embed.EmbedRecord.Record.Uri
318 }
319 if rec.Embed.EmbedRecordWithMedia != nil &&
320 rec.Embed.EmbedRecordWithMedia.Record != nil &&
321 rec.Embed.EmbedRecordWithMedia.Record.Record != nil {
322 rpref = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri
323 }
324
325 if rpref != "" && strings.Contains(rpref, "app.bsky.feed.post") {
326 rp, err := b.postIDForUri(ctx, rpref)
327 if err != nil {
328 return fmt.Errorf("getting quote subject: %w", err)
329 }
330
331 p.Reposting = rp
332 }
333 }
334
335 if err := b.doPostCreate(ctx, &p); err != nil {
336 return err
337 }
338
339 // Check for mentions and create notifications
340 if rec.Facets != nil {
341 for _, facet := range rec.Facets {
342 for _, feature := range facet.Features {
343 if feature.RichtextFacet_Mention != nil {
344 mentionDid := feature.RichtextFacet_Mention.Did
345 // This is a mention
346 mentionedRepo, err := b.GetOrCreateRepo(ctx, mentionDid)
347 if err != nil {
348 slog.Warn("failed to get repo for mention", "did", mentionDid, "error", err)
349 continue
350 }
351
352 // Create notification if the mentioned user is the current user
353 if mentionedRepo.ID == b.myrepo.ID {
354 if err := b.AddNotification(ctx, b.myrepo.ID, p.Author, uri, cc, NotifKindMention); err != nil {
355 slog.Warn("failed to create mention notification", "uri", uri, "error", err)
356 }
357 }
358 }
359 }
360 }
361 }
362
363 b.postInfoCache.Add(uri, cachedPostInfo{
364 ID: p.ID,
365 Author: p.Author,
366 })
367
368 return nil
369}
370
371func (b *PostgresBackend) doPostCreate(ctx context.Context, p *Post) error {
372 /*
373 if err := b.db.Clauses(clause.OnConflict{
374 Columns: []clause.Column{{Name: "author"}, {Name: "rkey"}},
375 DoUpdates: clause.AssignmentColumns([]string{"cid", "not_found", "raw", "created", "indexed"}),
376 }).Create(p).Error; err != nil {
377 return err
378 }
379 */
380
381 query := `
382INSERT INTO posts (author, rkey, cid, not_found, raw, created, indexed, reposting, reply_to, reply_to_usr, in_thread)
383VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
384ON CONFLICT (author, rkey)
385DO UPDATE SET
386 cid = $3,
387 not_found = $4,
388 raw = $5,
389 created = $6,
390 indexed = $7,
391 reposting = $8,
392 reply_to = $9,
393 reply_to_usr = $10,
394 in_thread = $11
395RETURNING id
396`
397
398 // Execute the query with parameters from the Post struct
399 if err := b.pgx.QueryRow(
400 ctx,
401 query,
402 p.Author,
403 p.Rkey,
404 p.Cid,
405 p.NotFound,
406 p.Raw,
407 p.Created,
408 p.Indexed,
409 p.Reposting,
410 p.ReplyTo,
411 p.ReplyToUsr,
412 p.InThread,
413 ).Scan(&p.ID); err != nil {
414 return err
415 }
416
417 return nil
418}
419
420func (b *PostgresBackend) HandleCreateLike(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
421 var rec bsky.FeedLike
422 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
423 return err
424 }
425
426 if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) {
427 return nil
428 }
429
430 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
431 if err != nil {
432 return fmt.Errorf("invalid timestamp: %w", err)
433 }
434
435 pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri)
436 if err != nil {
437 return fmt.Errorf("getting like subject: %w", err)
438 }
439
440 if _, err := b.pgx.Exec(ctx, `INSERT INTO "likes" ("created","indexed","author","rkey","subject","cid") VALUES ($1, $2, $3, $4, $5, $6)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID, cc.String()); err != nil {
441 pgErr, ok := err.(*pgconn.PgError)
442 if ok && pgErr.Code == "23505" {
443 return nil
444 }
445 return err
446 }
447
448 // Create notification if the liked post belongs to the current user
449 if pinfo.Author == b.myrepo.ID {
450 uri := fmt.Sprintf("at://%s/app.bsky.feed.like/%s", repo.Did, rkey)
451 if err := b.AddNotification(ctx, b.myrepo.ID, repo.ID, uri, cc, NotifKindLike); err != nil {
452 slog.Warn("failed to create like notification", "uri", uri, "error", err)
453 }
454 }
455
456 return nil
457}
458
459func (b *PostgresBackend) HandleCreateRepost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
460 var rec bsky.FeedRepost
461 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
462 return err
463 }
464
465 if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) {
466 return nil
467 }
468
469 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
470 if err != nil {
471 return fmt.Errorf("invalid timestamp: %w", err)
472 }
473
474 pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri)
475 if err != nil {
476 return fmt.Errorf("getting repost subject: %w", err)
477 }
478
479 if _, err := b.pgx.Exec(ctx, `INSERT INTO "reposts" ("created","indexed","author","rkey","subject") VALUES ($1, $2, $3, $4, $5)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID); err != nil {
480 pgErr, ok := err.(*pgconn.PgError)
481 if ok && pgErr.Code == "23505" {
482 return nil
483 }
484 return err
485 }
486
487 // Create notification if the reposted post belongs to the current user
488 if pinfo.Author == b.myrepo.ID {
489 uri := fmt.Sprintf("at://%s/app.bsky.feed.repost/%s", repo.Did, rkey)
490 if err := b.AddNotification(ctx, b.myrepo.ID, repo.ID, uri, cc, NotifKindRepost); err != nil {
491 slog.Warn("failed to create repost notification", "uri", uri, "error", err)
492 }
493 }
494
495 return nil
496}
497
498func (b *PostgresBackend) HandleCreateFollow(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
499 var rec bsky.GraphFollow
500 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
501 return err
502 }
503
504 if !b.anyRelevantIdents(repo.Did, rec.Subject) {
505 return nil
506 }
507
508 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
509 if err != nil {
510 return fmt.Errorf("invalid timestamp: %w", err)
511 }
512
513 subj, err := b.GetOrCreateRepo(ctx, rec.Subject)
514 if err != nil {
515 return err
516 }
517
518 if _, err := b.pgx.Exec(ctx, "INSERT INTO follows (created, indexed, author, rkey, subject) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING", created.Time(), time.Now(), repo.ID, rkey, subj.ID); err != nil {
519 return err
520 }
521
522 return nil
523}
524
525func (b *PostgresBackend) HandleCreateBlock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
526 var rec bsky.GraphBlock
527 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
528 return err
529 }
530
531 if !b.anyRelevantIdents(repo.Did, rec.Subject) {
532 return nil
533 }
534
535 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
536 if err != nil {
537 return fmt.Errorf("invalid timestamp: %w", err)
538 }
539
540 subj, err := b.GetOrCreateRepo(ctx, rec.Subject)
541 if err != nil {
542 return err
543 }
544
545 if err := b.db.Create(&Block{
546 Created: created.Time(),
547 Indexed: time.Now(),
548 Author: repo.ID,
549 Rkey: rkey,
550 Subject: subj.ID,
551 }).Error; err != nil {
552 return err
553 }
554
555 return nil
556}
557
558func (b *PostgresBackend) HandleCreateList(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
559 var rec bsky.GraphList
560 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
561 return err
562 }
563
564 if !b.anyRelevantIdents(repo.Did) {
565 return nil
566 }
567
568 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
569 if err != nil {
570 return fmt.Errorf("invalid timestamp: %w", err)
571 }
572
573 if err := b.db.Create(&List{
574 Created: created.Time(),
575 Indexed: time.Now(),
576 Author: repo.ID,
577 Rkey: rkey,
578 Raw: recb,
579 }).Error; err != nil {
580 return err
581 }
582
583 return nil
584}
585
586func (b *PostgresBackend) HandleCreateListitem(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
587 var rec bsky.GraphListitem
588 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
589 return err
590 }
591 if !b.anyRelevantIdents(repo.Did) {
592 return nil
593 }
594
595 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
596 if err != nil {
597 return fmt.Errorf("invalid timestamp: %w", err)
598 }
599
600 subj, err := b.GetOrCreateRepo(ctx, rec.Subject)
601 if err != nil {
602 return err
603 }
604
605 list, err := b.GetOrCreateList(ctx, rec.List)
606 if err != nil {
607 return err
608 }
609
610 if err := b.db.Create(&ListItem{
611 Created: created.Time(),
612 Indexed: time.Now(),
613 Author: repo.ID,
614 Rkey: rkey,
615 Subject: subj.ID,
616 List: list.ID,
617 }).Error; err != nil {
618 return err
619 }
620
621 return nil
622}
623
624func (b *PostgresBackend) HandleCreateListblock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
625 var rec bsky.GraphListblock
626 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
627 return err
628 }
629
630 if !b.anyRelevantIdents(repo.Did, rec.Subject) {
631 return nil
632 }
633
634 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
635 if err != nil {
636 return fmt.Errorf("invalid timestamp: %w", err)
637 }
638
639 list, err := b.GetOrCreateList(ctx, rec.Subject)
640 if err != nil {
641 return err
642 }
643
644 if err := b.db.Create(&ListBlock{
645 Created: created.Time(),
646 Indexed: time.Now(),
647 Author: repo.ID,
648 Rkey: rkey,
649 List: list.ID,
650 }).Error; err != nil {
651 return err
652 }
653
654 return nil
655}
656
657func (b *PostgresBackend) HandleCreateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error {
658 if !b.anyRelevantIdents(repo.Did) {
659 return nil
660 }
661
662 if err := b.db.Create(&Profile{
663 //Created: created.Time(),
664 Indexed: time.Now(),
665 Repo: repo.ID,
666 Raw: recb,
667 Rev: rev,
668 }).Error; err != nil {
669 return err
670 }
671
672 return nil
673}
674
675func (b *PostgresBackend) HandleUpdateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error {
676 if !b.anyRelevantIdents(repo.Did) {
677 return nil
678 }
679
680 if err := b.db.Create(&Profile{
681 Indexed: time.Now(),
682 Repo: repo.ID,
683 Raw: recb,
684 Rev: rev,
685 }).Error; err != nil {
686 return err
687 }
688
689 return nil
690}
691
692func (b *PostgresBackend) HandleCreateFeedGenerator(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
693 if !b.anyRelevantIdents(repo.Did) {
694 return nil
695 }
696
697 var rec bsky.FeedGenerator
698 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
699 return err
700 }
701
702 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
703 if err != nil {
704 return fmt.Errorf("invalid timestamp: %w", err)
705 }
706
707 if err := b.db.Create(&FeedGenerator{
708 Created: created.Time(),
709 Indexed: time.Now(),
710 Author: repo.ID,
711 Rkey: rkey,
712 Did: rec.Did,
713 Raw: recb,
714 }).Error; err != nil {
715 return err
716 }
717
718 return nil
719}
720
721func (b *PostgresBackend) HandleCreateThreadgate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
722 if !b.anyRelevantIdents(repo.Did) {
723 return nil
724 }
725 var rec bsky.FeedThreadgate
726 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
727 return err
728 }
729
730 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
731 if err != nil {
732 return fmt.Errorf("invalid timestamp: %w", err)
733 }
734
735 pid, err := b.postIDForUri(ctx, rec.Post)
736 if err != nil {
737 return err
738 }
739
740 if err := b.db.Create(&ThreadGate{
741 Created: created.Time(),
742 Indexed: time.Now(),
743 Author: repo.ID,
744 Rkey: rkey,
745 Post: pid,
746 }).Error; err != nil {
747 return err
748 }
749
750 return nil
751}
752
753func (b *PostgresBackend) HandleCreateChatDeclaration(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
754 // TODO: maybe track these?
755 return nil
756}
757
758func (b *PostgresBackend) HandleCreatePostGate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
759 if !b.anyRelevantIdents(repo.Did) {
760 return nil
761 }
762 var rec bsky.FeedPostgate
763 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
764 return err
765 }
766 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
767 if err != nil {
768 return fmt.Errorf("invalid timestamp: %w", err)
769 }
770
771 refPost, err := b.postInfoForUri(ctx, rec.Post)
772 if err != nil {
773 return err
774 }
775
776 if err := b.db.Create(&PostGate{
777 Created: created.Time(),
778 Indexed: time.Now(),
779 Author: repo.ID,
780 Rkey: rkey,
781 Subject: refPost.ID,
782 Raw: recb,
783 }).Error; err != nil {
784 return err
785 }
786
787 return nil
788}
789
790func (b *PostgresBackend) HandleCreateStarterPack(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
791 if !b.anyRelevantIdents(repo.Did) {
792 return nil
793 }
794 var rec bsky.GraphStarterpack
795 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
796 return err
797 }
798 created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
799 if err != nil {
800 return fmt.Errorf("invalid timestamp: %w", err)
801 }
802
803 list, err := b.GetOrCreateList(ctx, rec.List)
804 if err != nil {
805 return err
806 }
807
808 if err := b.db.Create(&StarterPack{
809 Created: created.Time(),
810 Indexed: time.Now(),
811 Author: repo.ID,
812 Rkey: rkey,
813 Raw: recb,
814 List: list.ID,
815 }).Error; err != nil {
816 return err
817 }
818
819 return nil
820}
821
822func (b *PostgresBackend) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
823 start := time.Now()
824
825 rr, err := b.GetOrCreateRepo(ctx, repo)
826 if err != nil {
827 return fmt.Errorf("get user failed: %w", err)
828 }
829
830 lrev, err := b.revForRepo(rr)
831 if err != nil {
832 return err
833 }
834 if lrev != "" {
835 if rev < lrev {
836 //slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path)
837 return nil
838 }
839 }
840
841 parts := strings.Split(path, "/")
842 if len(parts) != 2 {
843 return fmt.Errorf("invalid path in HandleCreate: %q", path)
844 }
845 col := parts[0]
846 rkey := parts[1]
847
848 defer func() {
849 handleOpHist.WithLabelValues("update", col).Observe(float64(time.Since(start).Milliseconds()))
850 }()
851
852 if rkey == "" {
853 fmt.Printf("messed up path: %q\n", rkey)
854 }
855
856 switch col {
857 /*
858 case "app.bsky.feed.post":
859 if err := s.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil {
860 return err
861 }
862 case "app.bsky.feed.like":
863 if err := s.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil {
864 return err
865 }
866 case "app.bsky.feed.repost":
867 if err := s.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil {
868 return err
869 }
870 case "app.bsky.graph.follow":
871 if err := s.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil {
872 return err
873 }
874 case "app.bsky.graph.block":
875 if err := s.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil {
876 return err
877 }
878 case "app.bsky.graph.list":
879 if err := s.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil {
880 return err
881 }
882 case "app.bsky.graph.listitem":
883 if err := s.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil {
884 return err
885 }
886 case "app.bsky.graph.listblock":
887 if err := s.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil {
888 return err
889 }
890 */
891 case "app.bsky.actor.profile":
892 if err := b.HandleUpdateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil {
893 return err
894 }
895 /*
896 case "app.bsky.feed.generator":
897 if err := s.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil {
898 return err
899 }
900 case "app.bsky.feed.threadgate":
901 if err := s.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil {
902 return err
903 }
904 case "chat.bsky.actor.declaration":
905 if err := s.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil {
906 return err
907 }
908 */
909 default:
910 slog.Debug("unrecognized record type in update", "repo", repo, "path", path, "rev", rev)
911 }
912
913 return nil
914}
915
916func (b *PostgresBackend) HandleDelete(ctx context.Context, repo string, rev string, path string) error {
917 start := time.Now()
918
919 rr, err := b.GetOrCreateRepo(ctx, repo)
920 if err != nil {
921 return fmt.Errorf("get user failed: %w", err)
922 }
923
924 lrev, ok := b.revCache.Get(rr.ID)
925 if ok {
926 if rev < lrev {
927 //slog.Info("skipping old rev delete", "did", rr.Did, "rev", rev, "oldrev", lrev)
928 return nil
929 }
930 }
931
932 parts := strings.Split(path, "/")
933 if len(parts) != 2 {
934 return fmt.Errorf("invalid path in HandleDelete: %q", path)
935 }
936 col := parts[0]
937 rkey := parts[1]
938
939 defer func() {
940 handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds()))
941 }()
942
943 switch col {
944 case "app.bsky.feed.post":
945 if err := b.HandleDeletePost(ctx, rr, rkey); err != nil {
946 return err
947 }
948 case "app.bsky.feed.like":
949 if err := b.HandleDeleteLike(ctx, rr, rkey); err != nil {
950 return err
951 }
952 case "app.bsky.feed.repost":
953 if err := b.HandleDeleteRepost(ctx, rr, rkey); err != nil {
954 return err
955 }
956 case "app.bsky.graph.follow":
957 if err := b.HandleDeleteFollow(ctx, rr, rkey); err != nil {
958 return err
959 }
960 case "app.bsky.graph.block":
961 if err := b.HandleDeleteBlock(ctx, rr, rkey); err != nil {
962 return err
963 }
964 case "app.bsky.graph.list":
965 if err := b.HandleDeleteList(ctx, rr, rkey); err != nil {
966 return err
967 }
968 case "app.bsky.graph.listitem":
969 if err := b.HandleDeleteListitem(ctx, rr, rkey); err != nil {
970 return err
971 }
972 case "app.bsky.graph.listblock":
973 if err := b.HandleDeleteListblock(ctx, rr, rkey); err != nil {
974 return err
975 }
976 case "app.bsky.actor.profile":
977 if err := b.HandleDeleteProfile(ctx, rr, rkey); err != nil {
978 return err
979 }
980 case "app.bsky.feed.generator":
981 if err := b.HandleDeleteFeedGenerator(ctx, rr, rkey); err != nil {
982 return err
983 }
984 case "app.bsky.feed.threadgate":
985 if err := b.HandleDeleteThreadgate(ctx, rr, rkey); err != nil {
986 return err
987 }
988 default:
989 slog.Warn("delete unrecognized record type", "repo", repo, "path", path, "rev", rev)
990 }
991
992 b.revCache.Add(rr.ID, rev)
993 return nil
994}
995
996func (b *PostgresBackend) HandleDeletePost(ctx context.Context, repo *Repo, rkey string) error {
997 var p Post
998 if err := b.db.Find(&p, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
999 return err
1000 }
1001
1002 if p.ID == 0 {
1003 //slog.Warn("delete of unknown post record", "repo", repo.Did, "rkey", rkey)
1004 return nil
1005 }
1006
1007 if err := b.db.Delete(&Post{}, p.ID).Error; err != nil {
1008 return err
1009 }
1010
1011 return nil
1012}
1013
1014func (b *PostgresBackend) HandleDeleteLike(ctx context.Context, repo *Repo, rkey string) error {
1015 var like Like
1016 if err := b.db.Find(&like, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1017 return err
1018 }
1019
1020 if like.ID == 0 {
1021 //slog.Warn("delete of missing like", "repo", repo.Did, "rkey", rkey)
1022 return nil
1023 }
1024
1025 if err := b.db.Exec("DELETE FROM likes WHERE id = ?", like.ID).Error; err != nil {
1026 return err
1027 }
1028
1029 return nil
1030}
1031
1032func (b *PostgresBackend) HandleDeleteRepost(ctx context.Context, repo *Repo, rkey string) error {
1033 var repost Repost
1034 if err := b.db.Find(&repost, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1035 return err
1036 }
1037
1038 if repost.ID == 0 {
1039 //return fmt.Errorf("delete of missing repost: %s %s", repo.Did, rkey)
1040 return nil
1041 }
1042
1043 if err := b.db.Exec("DELETE FROM reposts WHERE id = ?", repost.ID).Error; err != nil {
1044 return err
1045 }
1046
1047 return nil
1048}
1049
1050func (b *PostgresBackend) HandleDeleteFollow(ctx context.Context, repo *Repo, rkey string) error {
1051 var follow Follow
1052 if err := b.db.Find(&follow, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1053 return err
1054 }
1055
1056 if follow.ID == 0 {
1057 //slog.Warn("delete of missing follow", "repo", repo.Did, "rkey", rkey)
1058 return nil
1059 }
1060
1061 if err := b.db.Exec("DELETE FROM follows WHERE id = ?", follow.ID).Error; err != nil {
1062 return err
1063 }
1064
1065 return nil
1066}
1067
1068func (b *PostgresBackend) HandleDeleteBlock(ctx context.Context, repo *Repo, rkey string) error {
1069 var block Block
1070 if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1071 return err
1072 }
1073
1074 if block.ID == 0 {
1075 //slog.Warn("delete of missing block", "repo", repo.Did, "rkey", rkey)
1076 return nil
1077 }
1078
1079 if err := b.db.Exec("DELETE FROM blocks WHERE id = ?", block.ID).Error; err != nil {
1080 return err
1081 }
1082
1083 return nil
1084}
1085
1086func (b *PostgresBackend) HandleDeleteList(ctx context.Context, repo *Repo, rkey string) error {
1087 var list List
1088 if err := b.db.Find(&list, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1089 return err
1090 }
1091
1092 if list.ID == 0 {
1093 return nil
1094 //return fmt.Errorf("delete of missing list: %s %s", repo.Did, rkey)
1095 }
1096
1097 if err := b.db.Exec("DELETE FROM lists WHERE id = ?", list.ID).Error; err != nil {
1098 return err
1099 }
1100
1101 return nil
1102}
1103
1104func (b *PostgresBackend) HandleDeleteListitem(ctx context.Context, repo *Repo, rkey string) error {
1105 var item ListItem
1106 if err := b.db.Find(&item, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1107 return err
1108 }
1109
1110 if item.ID == 0 {
1111 return nil
1112 //return fmt.Errorf("delete of missing listitem: %s %s", repo.Did, rkey)
1113 }
1114
1115 if err := b.db.Exec("DELETE FROM list_items WHERE id = ?", item.ID).Error; err != nil {
1116 return err
1117 }
1118
1119 return nil
1120}
1121
1122func (b *PostgresBackend) HandleDeleteListblock(ctx context.Context, repo *Repo, rkey string) error {
1123 var block ListBlock
1124 if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1125 return err
1126 }
1127
1128 if block.ID == 0 {
1129 return nil
1130 //return fmt.Errorf("delete of missing listblock: %s %s", repo.Did, rkey)
1131 }
1132
1133 if err := b.db.Exec("DELETE FROM list_blocks WHERE id = ?", block.ID).Error; err != nil {
1134 return err
1135 }
1136
1137 return nil
1138}
1139
1140func (b *PostgresBackend) HandleDeleteFeedGenerator(ctx context.Context, repo *Repo, rkey string) error {
1141 var feedgen FeedGenerator
1142 if err := b.db.Find(&feedgen, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1143 return err
1144 }
1145
1146 if feedgen.ID == 0 {
1147 return nil
1148 //return fmt.Errorf("delete of missing feedgen: %s %s", repo.Did, rkey)
1149 }
1150
1151 if err := b.db.Exec("DELETE FROM feed_generators WHERE id = ?", feedgen.ID).Error; err != nil {
1152 return err
1153 }
1154
1155 return nil
1156}
1157
1158func (b *PostgresBackend) HandleDeleteThreadgate(ctx context.Context, repo *Repo, rkey string) error {
1159 var threadgate ThreadGate
1160 if err := b.db.Find(&threadgate, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1161 return err
1162 }
1163
1164 if threadgate.ID == 0 {
1165 return nil
1166 //return fmt.Errorf("delete of missing threadgate: %s %s", repo.Did, rkey)
1167 }
1168
1169 if err := b.db.Exec("DELETE FROM thread_gates WHERE id = ?", threadgate.ID).Error; err != nil {
1170 return err
1171 }
1172
1173 return nil
1174}
1175
1176func (b *PostgresBackend) HandleDeleteProfile(ctx context.Context, repo *Repo, rkey string) error {
1177 var profile Profile
1178 if err := b.db.Find(&profile, "repo = ?", repo.ID).Error; err != nil {
1179 return err
1180 }
1181
1182 if profile.ID == 0 {
1183 return nil
1184 }
1185
1186 if err := b.db.Exec("DELETE FROM profiles WHERE id = ?", profile.ID).Error; err != nil {
1187 return err
1188 }
1189
1190 return nil
1191}
1192
1193const (
1194 NotifKindReply = "reply"
1195 NotifKindLike = "like"
1196 NotifKindMention = "mention"
1197 NotifKindRepost = "repost"
1198)
1199
1200func (b *PostgresBackend) AddNotification(ctx context.Context, forUser, author uint, recordUri string, recordCid cid.Cid, kind string) error {
1201 return b.db.Create(&Notification{
1202 For: forUser,
1203 Author: author,
1204 Source: recordUri,
1205 SourceCid: recordCid.String(),
1206 Kind: kind,
1207 }).Error
1208}