+1140
backend/events.go
+1140
backend/events.go
···
1
+
package backend
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"fmt"
7
+
"log/slog"
8
+
"strings"
9
+
"time"
10
+
11
+
"github.com/bluesky-social/indigo/api/atproto"
12
+
"github.com/bluesky-social/indigo/api/bsky"
13
+
"github.com/bluesky-social/indigo/atproto/syntax"
14
+
"github.com/bluesky-social/indigo/repo"
15
+
"github.com/ipfs/go-cid"
16
+
"github.com/jackc/pgx/v5/pgconn"
17
+
"github.com/prometheus/client_golang/prometheus"
18
+
"github.com/prometheus/client_golang/prometheus/promauto"
19
+
20
+
. "github.com/whyrusleeping/konbini/models"
21
+
)
22
+
23
+
var handleOpHist = promauto.NewHistogramVec(prometheus.HistogramOpts{
24
+
Name: "handle_op_duration",
25
+
Help: "A histogram of op handling durations",
26
+
Buckets: prometheus.ExponentialBuckets(1, 2, 15),
27
+
}, []string{"op", "collection"})
28
+
29
+
func (b *PostgresBackend) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error {
30
+
r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
31
+
if err != nil {
32
+
return fmt.Errorf("failed to read event repo: %w", err)
33
+
}
34
+
35
+
for _, op := range evt.Ops {
36
+
switch op.Action {
37
+
case "create":
38
+
c, rec, err := r.GetRecordBytes(ctx, op.Path)
39
+
if err != nil {
40
+
return err
41
+
}
42
+
if err := b.HandleCreate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil {
43
+
return fmt.Errorf("create record failed: %w", err)
44
+
}
45
+
case "update":
46
+
c, rec, err := r.GetRecordBytes(ctx, op.Path)
47
+
if err != nil {
48
+
return err
49
+
}
50
+
if err := b.HandleUpdate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil {
51
+
return fmt.Errorf("update record failed: %w", err)
52
+
}
53
+
case "delete":
54
+
if err := b.HandleDelete(ctx, evt.Repo, evt.Rev, op.Path); err != nil {
55
+
return fmt.Errorf("delete record failed: %w", err)
56
+
}
57
+
}
58
+
}
59
+
60
+
// TODO: sync with the Since field to make sure we don't miss events we care about
61
+
/*
62
+
if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil {
63
+
return fmt.Errorf("failed to update rev: %w", err)
64
+
}
65
+
*/
66
+
67
+
return nil
68
+
}
69
+
70
+
func (b *PostgresBackend) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
71
+
start := time.Now()
72
+
73
+
rr, err := b.GetOrCreateRepo(ctx, repo)
74
+
if err != nil {
75
+
return fmt.Errorf("get user failed: %w", err)
76
+
}
77
+
78
+
lrev, err := b.revForRepo(rr)
79
+
if err != nil {
80
+
return err
81
+
}
82
+
if lrev != "" {
83
+
if rev < lrev {
84
+
slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path)
85
+
return nil
86
+
}
87
+
}
88
+
89
+
parts := strings.Split(path, "/")
90
+
if len(parts) != 2 {
91
+
return fmt.Errorf("invalid path in HandleCreate: %q", path)
92
+
}
93
+
col := parts[0]
94
+
rkey := parts[1]
95
+
96
+
defer func() {
97
+
handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds()))
98
+
}()
99
+
100
+
if rkey == "" {
101
+
fmt.Printf("messed up path: %q\n", rkey)
102
+
}
103
+
104
+
switch col {
105
+
case "app.bsky.feed.post":
106
+
if err := b.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil {
107
+
return err
108
+
}
109
+
case "app.bsky.feed.like":
110
+
if err := b.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil {
111
+
return err
112
+
}
113
+
case "app.bsky.feed.repost":
114
+
if err := b.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil {
115
+
return err
116
+
}
117
+
case "app.bsky.graph.follow":
118
+
if err := b.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil {
119
+
return err
120
+
}
121
+
case "app.bsky.graph.block":
122
+
if err := b.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil {
123
+
return err
124
+
}
125
+
case "app.bsky.graph.list":
126
+
if err := b.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil {
127
+
return err
128
+
}
129
+
case "app.bsky.graph.listitem":
130
+
if err := b.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil {
131
+
return err
132
+
}
133
+
case "app.bsky.graph.listblock":
134
+
if err := b.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil {
135
+
return err
136
+
}
137
+
case "app.bsky.actor.profile":
138
+
if err := b.HandleCreateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil {
139
+
return err
140
+
}
141
+
case "app.bsky.feed.generator":
142
+
if err := b.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil {
143
+
return err
144
+
}
145
+
case "app.bsky.feed.threadgate":
146
+
if err := b.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil {
147
+
return err
148
+
}
149
+
case "chat.bsky.actor.declaration":
150
+
if err := b.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil {
151
+
return err
152
+
}
153
+
case "app.bsky.feed.postgate":
154
+
if err := b.HandleCreatePostGate(ctx, rr, rkey, *rec, *cid); err != nil {
155
+
return err
156
+
}
157
+
case "app.bsky.graph.starterpack":
158
+
if err := b.HandleCreateStarterPack(ctx, rr, rkey, *rec, *cid); err != nil {
159
+
return err
160
+
}
161
+
default:
162
+
slog.Debug("unrecognized record type", "repo", repo, "path", path, "rev", rev)
163
+
}
164
+
165
+
b.revCache.Add(rr.ID, rev)
166
+
return nil
167
+
}
168
+
169
+
func (b *PostgresBackend) HandleCreatePost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
170
+
exists, err := b.checkPostExists(ctx, repo, rkey)
171
+
if err != nil {
172
+
return err
173
+
}
174
+
175
+
// still technically a race condition if two creates for the same post happen concurrently... probably fine
176
+
if exists {
177
+
return nil
178
+
}
179
+
180
+
var rec bsky.FeedPost
181
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
182
+
uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey
183
+
slog.Warn("skipping post with malformed data", "uri", uri, "error", err)
184
+
return nil // Skip this post rather than failing the entire event
185
+
}
186
+
187
+
reldids := []string{repo.Did}
188
+
// care about a post if its in a thread of a user we are interested in
189
+
if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil {
190
+
reldids = append(reldids, rec.Reply.Parent.Uri, rec.Reply.Root.Uri)
191
+
}
192
+
// TODO: maybe also care if its mentioning a user we care about or quoting a user we care about?
193
+
if !b.anyRelevantIdents(reldids...) {
194
+
return nil
195
+
}
196
+
197
+
uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey
198
+
slog.Warn("adding post", "uri", uri)
199
+
200
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
201
+
if err != nil {
202
+
return fmt.Errorf("invalid timestamp: %w", err)
203
+
}
204
+
205
+
p := Post{
206
+
Created: created.Time(),
207
+
Indexed: time.Now(),
208
+
Author: repo.ID,
209
+
Rkey: rkey,
210
+
Raw: recb,
211
+
Cid: cc.String(),
212
+
}
213
+
214
+
if rec.Reply != nil && rec.Reply.Parent != nil {
215
+
if rec.Reply.Root == nil {
216
+
return fmt.Errorf("post reply had nil root")
217
+
}
218
+
219
+
pinfo, err := b.postInfoForUri(ctx, rec.Reply.Parent.Uri)
220
+
if err != nil {
221
+
return fmt.Errorf("getting reply parent: %w", err)
222
+
}
223
+
224
+
p.ReplyTo = pinfo.ID
225
+
p.ReplyToUsr = pinfo.Author
226
+
227
+
thread, err := b.postIDForUri(ctx, rec.Reply.Root.Uri)
228
+
if err != nil {
229
+
return fmt.Errorf("getting thread root: %w", err)
230
+
}
231
+
232
+
p.InThread = thread
233
+
234
+
r, err := b.GetOrCreateRepo(ctx, b.mydid)
235
+
if err != nil {
236
+
return err
237
+
}
238
+
239
+
if p.ReplyToUsr == r.ID {
240
+
if err := b.AddNotification(ctx, r.ID, p.Author, uri, cc, NotifKindReply); err != nil {
241
+
slog.Warn("failed to create notification", "uri", uri, "error", err)
242
+
}
243
+
}
244
+
}
245
+
246
+
if rec.Embed != nil {
247
+
var rpref string
248
+
if rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
249
+
rpref = rec.Embed.EmbedRecord.Record.Uri
250
+
}
251
+
if rec.Embed.EmbedRecordWithMedia != nil &&
252
+
rec.Embed.EmbedRecordWithMedia.Record != nil &&
253
+
rec.Embed.EmbedRecordWithMedia.Record.Record != nil {
254
+
rpref = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri
255
+
}
256
+
257
+
if rpref != "" && strings.Contains(rpref, "app.bsky.feed.post") {
258
+
rp, err := b.postIDForUri(ctx, rpref)
259
+
if err != nil {
260
+
return fmt.Errorf("getting quote subject: %w", err)
261
+
}
262
+
263
+
p.Reposting = rp
264
+
}
265
+
}
266
+
267
+
if err := b.doPostCreate(ctx, &p); err != nil {
268
+
return err
269
+
}
270
+
271
+
// Check for mentions and create notifications
272
+
if rec.Facets != nil {
273
+
for _, facet := range rec.Facets {
274
+
for _, feature := range facet.Features {
275
+
if feature.RichtextFacet_Mention != nil {
276
+
mentionDid := feature.RichtextFacet_Mention.Did
277
+
// This is a mention
278
+
mentionedRepo, err := b.GetOrCreateRepo(ctx, mentionDid)
279
+
if err != nil {
280
+
slog.Warn("failed to get repo for mention", "did", mentionDid, "error", err)
281
+
continue
282
+
}
283
+
284
+
// Create notification if the mentioned user is the current user
285
+
if mentionedRepo.ID == b.myrepo.ID {
286
+
if err := b.AddNotification(ctx, b.myrepo.ID, p.Author, uri, cc, NotifKindMention); err != nil {
287
+
slog.Warn("failed to create mention notification", "uri", uri, "error", err)
288
+
}
289
+
}
290
+
}
291
+
}
292
+
}
293
+
}
294
+
295
+
b.postInfoCache.Add(uri, cachedPostInfo{
296
+
ID: p.ID,
297
+
Author: p.Author,
298
+
})
299
+
300
+
return nil
301
+
}
302
+
303
+
func (b *PostgresBackend) doPostCreate(ctx context.Context, p *Post) error {
304
+
/*
305
+
if err := b.db.Clauses(clause.OnConflict{
306
+
Columns: []clause.Column{{Name: "author"}, {Name: "rkey"}},
307
+
DoUpdates: clause.AssignmentColumns([]string{"cid", "not_found", "raw", "created", "indexed"}),
308
+
}).Create(p).Error; err != nil {
309
+
return err
310
+
}
311
+
*/
312
+
313
+
query := `
314
+
INSERT INTO posts (author, rkey, cid, not_found, raw, created, indexed, reposting, reply_to, reply_to_usr, in_thread)
315
+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
316
+
ON CONFLICT (author, rkey)
317
+
DO UPDATE SET
318
+
cid = $3,
319
+
not_found = $4,
320
+
raw = $5,
321
+
created = $6,
322
+
indexed = $7,
323
+
reposting = $8,
324
+
reply_to = $9,
325
+
reply_to_usr = $10,
326
+
in_thread = $11
327
+
RETURNING id
328
+
`
329
+
330
+
// Execute the query with parameters from the Post struct
331
+
if err := b.pgx.QueryRow(
332
+
ctx,
333
+
query,
334
+
p.Author,
335
+
p.Rkey,
336
+
p.Cid,
337
+
p.NotFound,
338
+
p.Raw,
339
+
p.Created,
340
+
p.Indexed,
341
+
p.Reposting,
342
+
p.ReplyTo,
343
+
p.ReplyToUsr,
344
+
p.InThread,
345
+
).Scan(&p.ID); err != nil {
346
+
return err
347
+
}
348
+
349
+
return nil
350
+
}
351
+
352
+
func (b *PostgresBackend) HandleCreateLike(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
353
+
var rec bsky.FeedLike
354
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
355
+
return err
356
+
}
357
+
358
+
if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) {
359
+
return nil
360
+
}
361
+
362
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
363
+
if err != nil {
364
+
return fmt.Errorf("invalid timestamp: %w", err)
365
+
}
366
+
367
+
pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri)
368
+
if err != nil {
369
+
return fmt.Errorf("getting like subject: %w", err)
370
+
}
371
+
372
+
if _, err := b.pgx.Exec(ctx, `INSERT INTO "likes" ("created","indexed","author","rkey","subject","cid") VALUES ($1, $2, $3, $4, $5, $6)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID, cc.String()); err != nil {
373
+
pgErr, ok := err.(*pgconn.PgError)
374
+
if ok && pgErr.Code == "23505" {
375
+
return nil
376
+
}
377
+
return err
378
+
}
379
+
380
+
// Create notification if the liked post belongs to the current user
381
+
if pinfo.Author == b.myrepo.ID {
382
+
uri := fmt.Sprintf("at://%s/app.bsky.feed.like/%s", repo.Did, rkey)
383
+
if err := b.AddNotification(ctx, b.myrepo.ID, repo.ID, uri, cc, NotifKindLike); err != nil {
384
+
slog.Warn("failed to create like notification", "uri", uri, "error", err)
385
+
}
386
+
}
387
+
388
+
return nil
389
+
}
390
+
391
+
func (b *PostgresBackend) HandleCreateRepost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
392
+
var rec bsky.FeedRepost
393
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
394
+
return err
395
+
}
396
+
397
+
if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) {
398
+
return nil
399
+
}
400
+
401
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
402
+
if err != nil {
403
+
return fmt.Errorf("invalid timestamp: %w", err)
404
+
}
405
+
406
+
pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri)
407
+
if err != nil {
408
+
return fmt.Errorf("getting repost subject: %w", err)
409
+
}
410
+
411
+
if _, err := b.pgx.Exec(ctx, `INSERT INTO "reposts" ("created","indexed","author","rkey","subject") VALUES ($1, $2, $3, $4, $5)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID); err != nil {
412
+
pgErr, ok := err.(*pgconn.PgError)
413
+
if ok && pgErr.Code == "23505" {
414
+
return nil
415
+
}
416
+
return err
417
+
}
418
+
419
+
// Create notification if the reposted post belongs to the current user
420
+
if pinfo.Author == b.myrepo.ID {
421
+
uri := fmt.Sprintf("at://%s/app.bsky.feed.repost/%s", repo.Did, rkey)
422
+
if err := b.AddNotification(ctx, b.myrepo.ID, repo.ID, uri, cc, NotifKindRepost); err != nil {
423
+
slog.Warn("failed to create repost notification", "uri", uri, "error", err)
424
+
}
425
+
}
426
+
427
+
return nil
428
+
}
429
+
430
+
func (b *PostgresBackend) HandleCreateFollow(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
431
+
var rec bsky.GraphFollow
432
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
433
+
return err
434
+
}
435
+
436
+
if !b.anyRelevantIdents(repo.Did, rec.Subject) {
437
+
return nil
438
+
}
439
+
440
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
441
+
if err != nil {
442
+
return fmt.Errorf("invalid timestamp: %w", err)
443
+
}
444
+
445
+
subj, err := b.GetOrCreateRepo(ctx, rec.Subject)
446
+
if err != nil {
447
+
return err
448
+
}
449
+
450
+
if _, err := b.pgx.Exec(ctx, "INSERT INTO follows (created, indexed, author, rkey, subject) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING", created.Time(), time.Now(), repo.ID, rkey, subj.ID); err != nil {
451
+
return err
452
+
}
453
+
454
+
return nil
455
+
}
456
+
457
+
func (b *PostgresBackend) HandleCreateBlock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
458
+
var rec bsky.GraphBlock
459
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
460
+
return err
461
+
}
462
+
463
+
if !b.anyRelevantIdents(repo.Did, rec.Subject) {
464
+
return nil
465
+
}
466
+
467
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
468
+
if err != nil {
469
+
return fmt.Errorf("invalid timestamp: %w", err)
470
+
}
471
+
472
+
subj, err := b.GetOrCreateRepo(ctx, rec.Subject)
473
+
if err != nil {
474
+
return err
475
+
}
476
+
477
+
if err := b.db.Create(&Block{
478
+
Created: created.Time(),
479
+
Indexed: time.Now(),
480
+
Author: repo.ID,
481
+
Rkey: rkey,
482
+
Subject: subj.ID,
483
+
}).Error; err != nil {
484
+
return err
485
+
}
486
+
487
+
return nil
488
+
}
489
+
490
+
func (b *PostgresBackend) HandleCreateList(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
491
+
var rec bsky.GraphList
492
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
493
+
return err
494
+
}
495
+
496
+
if !b.anyRelevantIdents(repo.Did) {
497
+
return nil
498
+
}
499
+
500
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
501
+
if err != nil {
502
+
return fmt.Errorf("invalid timestamp: %w", err)
503
+
}
504
+
505
+
if err := b.db.Create(&List{
506
+
Created: created.Time(),
507
+
Indexed: time.Now(),
508
+
Author: repo.ID,
509
+
Rkey: rkey,
510
+
Raw: recb,
511
+
}).Error; err != nil {
512
+
return err
513
+
}
514
+
515
+
return nil
516
+
}
517
+
518
+
func (b *PostgresBackend) HandleCreateListitem(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
519
+
var rec bsky.GraphListitem
520
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
521
+
return err
522
+
}
523
+
if !b.anyRelevantIdents(repo.Did) {
524
+
return nil
525
+
}
526
+
527
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
528
+
if err != nil {
529
+
return fmt.Errorf("invalid timestamp: %w", err)
530
+
}
531
+
532
+
subj, err := b.GetOrCreateRepo(ctx, rec.Subject)
533
+
if err != nil {
534
+
return err
535
+
}
536
+
537
+
list, err := b.GetOrCreateList(ctx, rec.List)
538
+
if err != nil {
539
+
return err
540
+
}
541
+
542
+
if err := b.db.Create(&ListItem{
543
+
Created: created.Time(),
544
+
Indexed: time.Now(),
545
+
Author: repo.ID,
546
+
Rkey: rkey,
547
+
Subject: subj.ID,
548
+
List: list.ID,
549
+
}).Error; err != nil {
550
+
return err
551
+
}
552
+
553
+
return nil
554
+
}
555
+
556
+
func (b *PostgresBackend) HandleCreateListblock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
557
+
var rec bsky.GraphListblock
558
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
559
+
return err
560
+
}
561
+
562
+
if !b.anyRelevantIdents(repo.Did, rec.Subject) {
563
+
return nil
564
+
}
565
+
566
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
567
+
if err != nil {
568
+
return fmt.Errorf("invalid timestamp: %w", err)
569
+
}
570
+
571
+
list, err := b.GetOrCreateList(ctx, rec.Subject)
572
+
if err != nil {
573
+
return err
574
+
}
575
+
576
+
if err := b.db.Create(&ListBlock{
577
+
Created: created.Time(),
578
+
Indexed: time.Now(),
579
+
Author: repo.ID,
580
+
Rkey: rkey,
581
+
List: list.ID,
582
+
}).Error; err != nil {
583
+
return err
584
+
}
585
+
586
+
return nil
587
+
}
588
+
589
+
func (b *PostgresBackend) HandleCreateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error {
590
+
if !b.anyRelevantIdents(repo.Did) {
591
+
return nil
592
+
}
593
+
594
+
if err := b.db.Create(&Profile{
595
+
//Created: created.Time(),
596
+
Indexed: time.Now(),
597
+
Repo: repo.ID,
598
+
Raw: recb,
599
+
Rev: rev,
600
+
}).Error; err != nil {
601
+
return err
602
+
}
603
+
604
+
return nil
605
+
}
606
+
607
+
func (b *PostgresBackend) HandleUpdateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error {
608
+
if !b.anyRelevantIdents(repo.Did) {
609
+
return nil
610
+
}
611
+
612
+
if err := b.db.Create(&Profile{
613
+
Indexed: time.Now(),
614
+
Repo: repo.ID,
615
+
Raw: recb,
616
+
Rev: rev,
617
+
}).Error; err != nil {
618
+
return err
619
+
}
620
+
621
+
return nil
622
+
}
623
+
624
+
func (b *PostgresBackend) HandleCreateFeedGenerator(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
625
+
if !b.anyRelevantIdents(repo.Did) {
626
+
return nil
627
+
}
628
+
629
+
var rec bsky.FeedGenerator
630
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
631
+
return err
632
+
}
633
+
634
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
635
+
if err != nil {
636
+
return fmt.Errorf("invalid timestamp: %w", err)
637
+
}
638
+
639
+
if err := b.db.Create(&FeedGenerator{
640
+
Created: created.Time(),
641
+
Indexed: time.Now(),
642
+
Author: repo.ID,
643
+
Rkey: rkey,
644
+
Did: rec.Did,
645
+
Raw: recb,
646
+
}).Error; err != nil {
647
+
return err
648
+
}
649
+
650
+
return nil
651
+
}
652
+
653
+
func (b *PostgresBackend) HandleCreateThreadgate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
654
+
if !b.anyRelevantIdents(repo.Did) {
655
+
return nil
656
+
}
657
+
var rec bsky.FeedThreadgate
658
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
659
+
return err
660
+
}
661
+
662
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
663
+
if err != nil {
664
+
return fmt.Errorf("invalid timestamp: %w", err)
665
+
}
666
+
667
+
pid, err := b.postIDForUri(ctx, rec.Post)
668
+
if err != nil {
669
+
return err
670
+
}
671
+
672
+
if err := b.db.Create(&ThreadGate{
673
+
Created: created.Time(),
674
+
Indexed: time.Now(),
675
+
Author: repo.ID,
676
+
Rkey: rkey,
677
+
Post: pid,
678
+
}).Error; err != nil {
679
+
return err
680
+
}
681
+
682
+
return nil
683
+
}
684
+
685
+
func (b *PostgresBackend) HandleCreateChatDeclaration(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
686
+
// TODO: maybe track these?
687
+
return nil
688
+
}
689
+
690
+
func (b *PostgresBackend) HandleCreatePostGate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
691
+
if !b.anyRelevantIdents(repo.Did) {
692
+
return nil
693
+
}
694
+
var rec bsky.FeedPostgate
695
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
696
+
return err
697
+
}
698
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
699
+
if err != nil {
700
+
return fmt.Errorf("invalid timestamp: %w", err)
701
+
}
702
+
703
+
refPost, err := b.postInfoForUri(ctx, rec.Post)
704
+
if err != nil {
705
+
return err
706
+
}
707
+
708
+
if err := b.db.Create(&PostGate{
709
+
Created: created.Time(),
710
+
Indexed: time.Now(),
711
+
Author: repo.ID,
712
+
Rkey: rkey,
713
+
Subject: refPost.ID,
714
+
Raw: recb,
715
+
}).Error; err != nil {
716
+
return err
717
+
}
718
+
719
+
return nil
720
+
}
721
+
722
+
func (b *PostgresBackend) HandleCreateStarterPack(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
723
+
if !b.anyRelevantIdents(repo.Did) {
724
+
return nil
725
+
}
726
+
var rec bsky.GraphStarterpack
727
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
728
+
return err
729
+
}
730
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
731
+
if err != nil {
732
+
return fmt.Errorf("invalid timestamp: %w", err)
733
+
}
734
+
735
+
list, err := b.GetOrCreateList(ctx, rec.List)
736
+
if err != nil {
737
+
return err
738
+
}
739
+
740
+
if err := b.db.Create(&StarterPack{
741
+
Created: created.Time(),
742
+
Indexed: time.Now(),
743
+
Author: repo.ID,
744
+
Rkey: rkey,
745
+
Raw: recb,
746
+
List: list.ID,
747
+
}).Error; err != nil {
748
+
return err
749
+
}
750
+
751
+
return nil
752
+
}
753
+
754
+
func (b *PostgresBackend) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
755
+
start := time.Now()
756
+
757
+
rr, err := b.GetOrCreateRepo(ctx, repo)
758
+
if err != nil {
759
+
return fmt.Errorf("get user failed: %w", err)
760
+
}
761
+
762
+
lrev, err := b.revForRepo(rr)
763
+
if err != nil {
764
+
return err
765
+
}
766
+
if lrev != "" {
767
+
if rev < lrev {
768
+
//slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path)
769
+
return nil
770
+
}
771
+
}
772
+
773
+
parts := strings.Split(path, "/")
774
+
if len(parts) != 2 {
775
+
return fmt.Errorf("invalid path in HandleCreate: %q", path)
776
+
}
777
+
col := parts[0]
778
+
rkey := parts[1]
779
+
780
+
defer func() {
781
+
handleOpHist.WithLabelValues("update", col).Observe(float64(time.Since(start).Milliseconds()))
782
+
}()
783
+
784
+
if rkey == "" {
785
+
fmt.Printf("messed up path: %q\n", rkey)
786
+
}
787
+
788
+
switch col {
789
+
/*
790
+
case "app.bsky.feed.post":
791
+
if err := s.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil {
792
+
return err
793
+
}
794
+
case "app.bsky.feed.like":
795
+
if err := s.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil {
796
+
return err
797
+
}
798
+
case "app.bsky.feed.repost":
799
+
if err := s.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil {
800
+
return err
801
+
}
802
+
case "app.bsky.graph.follow":
803
+
if err := s.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil {
804
+
return err
805
+
}
806
+
case "app.bsky.graph.block":
807
+
if err := s.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil {
808
+
return err
809
+
}
810
+
case "app.bsky.graph.list":
811
+
if err := s.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil {
812
+
return err
813
+
}
814
+
case "app.bsky.graph.listitem":
815
+
if err := s.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil {
816
+
return err
817
+
}
818
+
case "app.bsky.graph.listblock":
819
+
if err := s.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil {
820
+
return err
821
+
}
822
+
*/
823
+
case "app.bsky.actor.profile":
824
+
if err := b.HandleUpdateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil {
825
+
return err
826
+
}
827
+
/*
828
+
case "app.bsky.feed.generator":
829
+
if err := s.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil {
830
+
return err
831
+
}
832
+
case "app.bsky.feed.threadgate":
833
+
if err := s.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil {
834
+
return err
835
+
}
836
+
case "chat.bsky.actor.declaration":
837
+
if err := s.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil {
838
+
return err
839
+
}
840
+
*/
841
+
default:
842
+
slog.Debug("unrecognized record type in update", "repo", repo, "path", path, "rev", rev)
843
+
}
844
+
845
+
return nil
846
+
}
847
+
848
+
func (b *PostgresBackend) HandleDelete(ctx context.Context, repo string, rev string, path string) error {
849
+
start := time.Now()
850
+
851
+
rr, err := b.GetOrCreateRepo(ctx, repo)
852
+
if err != nil {
853
+
return fmt.Errorf("get user failed: %w", err)
854
+
}
855
+
856
+
lrev, ok := b.revCache.Get(rr.ID)
857
+
if ok {
858
+
if rev < lrev {
859
+
//slog.Info("skipping old rev delete", "did", rr.Did, "rev", rev, "oldrev", lrev)
860
+
return nil
861
+
}
862
+
}
863
+
864
+
parts := strings.Split(path, "/")
865
+
if len(parts) != 2 {
866
+
return fmt.Errorf("invalid path in HandleDelete: %q", path)
867
+
}
868
+
col := parts[0]
869
+
rkey := parts[1]
870
+
871
+
defer func() {
872
+
handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds()))
873
+
}()
874
+
875
+
switch col {
876
+
case "app.bsky.feed.post":
877
+
if err := b.HandleDeletePost(ctx, rr, rkey); err != nil {
878
+
return err
879
+
}
880
+
case "app.bsky.feed.like":
881
+
if err := b.HandleDeleteLike(ctx, rr, rkey); err != nil {
882
+
return err
883
+
}
884
+
case "app.bsky.feed.repost":
885
+
if err := b.HandleDeleteRepost(ctx, rr, rkey); err != nil {
886
+
return err
887
+
}
888
+
case "app.bsky.graph.follow":
889
+
if err := b.HandleDeleteFollow(ctx, rr, rkey); err != nil {
890
+
return err
891
+
}
892
+
case "app.bsky.graph.block":
893
+
if err := b.HandleDeleteBlock(ctx, rr, rkey); err != nil {
894
+
return err
895
+
}
896
+
case "app.bsky.graph.list":
897
+
if err := b.HandleDeleteList(ctx, rr, rkey); err != nil {
898
+
return err
899
+
}
900
+
case "app.bsky.graph.listitem":
901
+
if err := b.HandleDeleteListitem(ctx, rr, rkey); err != nil {
902
+
return err
903
+
}
904
+
case "app.bsky.graph.listblock":
905
+
if err := b.HandleDeleteListblock(ctx, rr, rkey); err != nil {
906
+
return err
907
+
}
908
+
case "app.bsky.actor.profile":
909
+
if err := b.HandleDeleteProfile(ctx, rr, rkey); err != nil {
910
+
return err
911
+
}
912
+
case "app.bsky.feed.generator":
913
+
if err := b.HandleDeleteFeedGenerator(ctx, rr, rkey); err != nil {
914
+
return err
915
+
}
916
+
case "app.bsky.feed.threadgate":
917
+
if err := b.HandleDeleteThreadgate(ctx, rr, rkey); err != nil {
918
+
return err
919
+
}
920
+
default:
921
+
slog.Warn("delete unrecognized record type", "repo", repo, "path", path, "rev", rev)
922
+
}
923
+
924
+
b.revCache.Add(rr.ID, rev)
925
+
return nil
926
+
}
927
+
928
+
func (b *PostgresBackend) HandleDeletePost(ctx context.Context, repo *Repo, rkey string) error {
929
+
var p Post
930
+
if err := b.db.Find(&p, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
931
+
return err
932
+
}
933
+
934
+
if p.ID == 0 {
935
+
//slog.Warn("delete of unknown post record", "repo", repo.Did, "rkey", rkey)
936
+
return nil
937
+
}
938
+
939
+
if err := b.db.Delete(&Post{}, p.ID).Error; err != nil {
940
+
return err
941
+
}
942
+
943
+
return nil
944
+
}
945
+
946
+
func (b *PostgresBackend) HandleDeleteLike(ctx context.Context, repo *Repo, rkey string) error {
947
+
var like Like
948
+
if err := b.db.Find(&like, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
949
+
return err
950
+
}
951
+
952
+
if like.ID == 0 {
953
+
//slog.Warn("delete of missing like", "repo", repo.Did, "rkey", rkey)
954
+
return nil
955
+
}
956
+
957
+
if err := b.db.Exec("DELETE FROM likes WHERE id = ?", like.ID).Error; err != nil {
958
+
return err
959
+
}
960
+
961
+
return nil
962
+
}
963
+
964
+
func (b *PostgresBackend) HandleDeleteRepost(ctx context.Context, repo *Repo, rkey string) error {
965
+
var repost Repost
966
+
if err := b.db.Find(&repost, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
967
+
return err
968
+
}
969
+
970
+
if repost.ID == 0 {
971
+
//return fmt.Errorf("delete of missing repost: %s %s", repo.Did, rkey)
972
+
return nil
973
+
}
974
+
975
+
if err := b.db.Exec("DELETE FROM reposts WHERE id = ?", repost.ID).Error; err != nil {
976
+
return err
977
+
}
978
+
979
+
return nil
980
+
}
981
+
982
+
func (b *PostgresBackend) HandleDeleteFollow(ctx context.Context, repo *Repo, rkey string) error {
983
+
var follow Follow
984
+
if err := b.db.Find(&follow, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
985
+
return err
986
+
}
987
+
988
+
if follow.ID == 0 {
989
+
//slog.Warn("delete of missing follow", "repo", repo.Did, "rkey", rkey)
990
+
return nil
991
+
}
992
+
993
+
if err := b.db.Exec("DELETE FROM follows WHERE id = ?", follow.ID).Error; err != nil {
994
+
return err
995
+
}
996
+
997
+
return nil
998
+
}
999
+
1000
+
func (b *PostgresBackend) HandleDeleteBlock(ctx context.Context, repo *Repo, rkey string) error {
1001
+
var block Block
1002
+
if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1003
+
return err
1004
+
}
1005
+
1006
+
if block.ID == 0 {
1007
+
//slog.Warn("delete of missing block", "repo", repo.Did, "rkey", rkey)
1008
+
return nil
1009
+
}
1010
+
1011
+
if err := b.db.Exec("DELETE FROM blocks WHERE id = ?", block.ID).Error; err != nil {
1012
+
return err
1013
+
}
1014
+
1015
+
return nil
1016
+
}
1017
+
1018
+
func (b *PostgresBackend) HandleDeleteList(ctx context.Context, repo *Repo, rkey string) error {
1019
+
var list List
1020
+
if err := b.db.Find(&list, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1021
+
return err
1022
+
}
1023
+
1024
+
if list.ID == 0 {
1025
+
return nil
1026
+
//return fmt.Errorf("delete of missing list: %s %s", repo.Did, rkey)
1027
+
}
1028
+
1029
+
if err := b.db.Exec("DELETE FROM lists WHERE id = ?", list.ID).Error; err != nil {
1030
+
return err
1031
+
}
1032
+
1033
+
return nil
1034
+
}
1035
+
1036
+
func (b *PostgresBackend) HandleDeleteListitem(ctx context.Context, repo *Repo, rkey string) error {
1037
+
var item ListItem
1038
+
if err := b.db.Find(&item, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1039
+
return err
1040
+
}
1041
+
1042
+
if item.ID == 0 {
1043
+
return nil
1044
+
//return fmt.Errorf("delete of missing listitem: %s %s", repo.Did, rkey)
1045
+
}
1046
+
1047
+
if err := b.db.Exec("DELETE FROM list_items WHERE id = ?", item.ID).Error; err != nil {
1048
+
return err
1049
+
}
1050
+
1051
+
return nil
1052
+
}
1053
+
1054
+
func (b *PostgresBackend) HandleDeleteListblock(ctx context.Context, repo *Repo, rkey string) error {
1055
+
var block ListBlock
1056
+
if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1057
+
return err
1058
+
}
1059
+
1060
+
if block.ID == 0 {
1061
+
return nil
1062
+
//return fmt.Errorf("delete of missing listblock: %s %s", repo.Did, rkey)
1063
+
}
1064
+
1065
+
if err := b.db.Exec("DELETE FROM list_blocks WHERE id = ?", block.ID).Error; err != nil {
1066
+
return err
1067
+
}
1068
+
1069
+
return nil
1070
+
}
1071
+
1072
+
func (b *PostgresBackend) HandleDeleteFeedGenerator(ctx context.Context, repo *Repo, rkey string) error {
1073
+
var feedgen FeedGenerator
1074
+
if err := b.db.Find(&feedgen, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1075
+
return err
1076
+
}
1077
+
1078
+
if feedgen.ID == 0 {
1079
+
return nil
1080
+
//return fmt.Errorf("delete of missing feedgen: %s %s", repo.Did, rkey)
1081
+
}
1082
+
1083
+
if err := b.db.Exec("DELETE FROM feed_generators WHERE id = ?", feedgen.ID).Error; err != nil {
1084
+
return err
1085
+
}
1086
+
1087
+
return nil
1088
+
}
1089
+
1090
+
func (b *PostgresBackend) HandleDeleteThreadgate(ctx context.Context, repo *Repo, rkey string) error {
1091
+
var threadgate ThreadGate
1092
+
if err := b.db.Find(&threadgate, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1093
+
return err
1094
+
}
1095
+
1096
+
if threadgate.ID == 0 {
1097
+
return nil
1098
+
//return fmt.Errorf("delete of missing threadgate: %s %s", repo.Did, rkey)
1099
+
}
1100
+
1101
+
if err := b.db.Exec("DELETE FROM thread_gates WHERE id = ?", threadgate.ID).Error; err != nil {
1102
+
return err
1103
+
}
1104
+
1105
+
return nil
1106
+
}
1107
+
1108
+
func (b *PostgresBackend) HandleDeleteProfile(ctx context.Context, repo *Repo, rkey string) error {
1109
+
var profile Profile
1110
+
if err := b.db.Find(&profile, "repo = ?", repo.ID).Error; err != nil {
1111
+
return err
1112
+
}
1113
+
1114
+
if profile.ID == 0 {
1115
+
return nil
1116
+
}
1117
+
1118
+
if err := b.db.Exec("DELETE FROM profiles WHERE id = ?", profile.ID).Error; err != nil {
1119
+
return err
1120
+
}
1121
+
1122
+
return nil
1123
+
}
1124
+
1125
+
const (
1126
+
NotifKindReply = "reply"
1127
+
NotifKindLike = "like"
1128
+
NotifKindMention = "mention"
1129
+
NotifKindRepost = "repost"
1130
+
)
1131
+
1132
+
func (b *PostgresBackend) AddNotification(ctx context.Context, forUser, author uint, recordUri string, recordCid cid.Cid, kind string) error {
1133
+
return b.db.Create(&Notification{
1134
+
For: forUser,
1135
+
Author: author,
1136
+
Source: recordUri,
1137
+
SourceCid: recordCid.String(),
1138
+
Kind: kind,
1139
+
}).Error
1140
+
}