+1125
events.go
+1125
events.go
···
1
+
package main
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"fmt"
7
+
"log/slog"
8
+
"strings"
9
+
"sync"
10
+
"time"
11
+
12
+
"github.com/bluesky-social/indigo/api/atproto"
13
+
"github.com/bluesky-social/indigo/api/bsky"
14
+
"github.com/bluesky-social/indigo/atproto/syntax"
15
+
"github.com/bluesky-social/indigo/repo"
16
+
lru "github.com/hashicorp/golang-lru/v2"
17
+
"github.com/ipfs/go-cid"
18
+
"github.com/jackc/pgx/v5/pgconn"
19
+
"github.com/jackc/pgx/v5/pgxpool"
20
+
"gorm.io/gorm"
21
+
)
22
+
23
+
type PostgresBackend struct {
24
+
db *gorm.DB
25
+
pgx *pgxpool.Pool
26
+
s *Server
27
+
28
+
relevantDids map[string]bool
29
+
rdLk sync.Mutex
30
+
31
+
revCache *lru.TwoQueueCache[uint, string]
32
+
33
+
repoCache *lru.TwoQueueCache[string, *Repo]
34
+
reposLk sync.Mutex
35
+
36
+
postInfoCache *lru.TwoQueueCache[string, cachedPostInfo]
37
+
}
38
+
39
+
func (b *PostgresBackend) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error {
40
+
r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
41
+
if err != nil {
42
+
return fmt.Errorf("failed to read event repo: %w", err)
43
+
}
44
+
45
+
for _, op := range evt.Ops {
46
+
switch op.Action {
47
+
case "create":
48
+
c, rec, err := r.GetRecordBytes(ctx, op.Path)
49
+
if err != nil {
50
+
return err
51
+
}
52
+
if err := b.HandleCreate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil {
53
+
return fmt.Errorf("create record failed: %w", err)
54
+
}
55
+
case "update":
56
+
c, rec, err := r.GetRecordBytes(ctx, op.Path)
57
+
if err != nil {
58
+
return err
59
+
}
60
+
if err := b.HandleUpdate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil {
61
+
return fmt.Errorf("update record failed: %w", err)
62
+
}
63
+
case "delete":
64
+
if err := b.HandleDelete(ctx, evt.Repo, evt.Rev, op.Path); err != nil {
65
+
return fmt.Errorf("delete record failed: %w", err)
66
+
}
67
+
}
68
+
}
69
+
70
+
// TODO: sync with the Since field to make sure we don't miss events we care about
71
+
/*
72
+
if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil {
73
+
return fmt.Errorf("failed to update rev: %w", err)
74
+
}
75
+
*/
76
+
77
+
return nil
78
+
}
79
+
80
+
func (b *PostgresBackend) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
81
+
start := time.Now()
82
+
83
+
rr, err := b.getOrCreateRepo(ctx, repo)
84
+
if err != nil {
85
+
return fmt.Errorf("get user failed: %w", err)
86
+
}
87
+
88
+
lrev, err := b.revForRepo(rr)
89
+
if err != nil {
90
+
return err
91
+
}
92
+
if lrev != "" {
93
+
if rev < lrev {
94
+
slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path)
95
+
return nil
96
+
}
97
+
}
98
+
99
+
parts := strings.Split(path, "/")
100
+
if len(parts) != 2 {
101
+
return fmt.Errorf("invalid path in HandleCreate: %q", path)
102
+
}
103
+
col := parts[0]
104
+
rkey := parts[1]
105
+
106
+
defer func() {
107
+
handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds()))
108
+
}()
109
+
110
+
if rkey == "" {
111
+
fmt.Printf("messed up path: %q\n", rkey)
112
+
}
113
+
114
+
switch col {
115
+
case "app.bsky.feed.post":
116
+
if err := b.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil {
117
+
return err
118
+
}
119
+
case "app.bsky.feed.like":
120
+
if err := b.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil {
121
+
return err
122
+
}
123
+
case "app.bsky.feed.repost":
124
+
if err := b.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil {
125
+
return err
126
+
}
127
+
case "app.bsky.graph.follow":
128
+
if err := b.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil {
129
+
return err
130
+
}
131
+
case "app.bsky.graph.block":
132
+
if err := b.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil {
133
+
return err
134
+
}
135
+
case "app.bsky.graph.list":
136
+
if err := b.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil {
137
+
return err
138
+
}
139
+
case "app.bsky.graph.listitem":
140
+
if err := b.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil {
141
+
return err
142
+
}
143
+
case "app.bsky.graph.listblock":
144
+
if err := b.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil {
145
+
return err
146
+
}
147
+
case "app.bsky.actor.profile":
148
+
if err := b.HandleCreateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil {
149
+
return err
150
+
}
151
+
case "app.bsky.feed.generator":
152
+
if err := b.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil {
153
+
return err
154
+
}
155
+
case "app.bsky.feed.threadgate":
156
+
if err := b.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil {
157
+
return err
158
+
}
159
+
case "chat.bsky.actor.declaration":
160
+
if err := b.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil {
161
+
return err
162
+
}
163
+
case "app.bsky.feed.postgate":
164
+
if err := b.HandleCreatePostGate(ctx, rr, rkey, *rec, *cid); err != nil {
165
+
return err
166
+
}
167
+
case "app.bsky.graph.starterpack":
168
+
if err := b.HandleCreateStarterPack(ctx, rr, rkey, *rec, *cid); err != nil {
169
+
return err
170
+
}
171
+
default:
172
+
slog.Debug("unrecognized record type", "repo", repo, "path", path, "rev", rev)
173
+
}
174
+
175
+
b.revCache.Add(rr.ID, rev)
176
+
return nil
177
+
}
178
+
179
+
func (b *PostgresBackend) HandleCreatePost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
180
+
exists, err := b.checkPostExists(ctx, repo, rkey)
181
+
if err != nil {
182
+
return err
183
+
}
184
+
185
+
// still technically a race condition if two creates for the same post happen concurrently... probably fine
186
+
if exists {
187
+
return nil
188
+
}
189
+
190
+
var rec bsky.FeedPost
191
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
192
+
return err
193
+
}
194
+
195
+
reldids := []string{repo.Did}
196
+
// care about a post if its in a thread of a user we are interested in
197
+
if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil {
198
+
reldids = append(reldids, rec.Reply.Parent.Uri, rec.Reply.Root.Uri)
199
+
}
200
+
// TODO: maybe also care if its mentioning a user we care about or quoting a user we care about?
201
+
if !b.anyRelevantIdents(reldids...) {
202
+
return nil
203
+
}
204
+
205
+
uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey
206
+
slog.Warn("adding post", "uri", uri)
207
+
208
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
209
+
if err != nil {
210
+
return fmt.Errorf("invalid timestamp: %w", err)
211
+
}
212
+
213
+
p := Post{
214
+
Created: created.Time(),
215
+
Indexed: time.Now(),
216
+
Author: repo.ID,
217
+
Rkey: rkey,
218
+
Raw: recb,
219
+
Cid: cc.String(),
220
+
}
221
+
222
+
if rec.Reply != nil && rec.Reply.Parent != nil {
223
+
if rec.Reply.Root == nil {
224
+
return fmt.Errorf("post reply had nil root")
225
+
}
226
+
227
+
pinfo, err := b.postInfoForUri(ctx, rec.Reply.Parent.Uri)
228
+
if err != nil {
229
+
return fmt.Errorf("getting reply parent: %w", err)
230
+
}
231
+
232
+
p.ReplyTo = pinfo.ID
233
+
p.ReplyToUsr = pinfo.Author
234
+
235
+
thread, err := b.postIDForUri(ctx, rec.Reply.Root.Uri)
236
+
if err != nil {
237
+
return fmt.Errorf("getting thread root: %w", err)
238
+
}
239
+
240
+
p.InThread = thread
241
+
242
+
if p.ReplyToUsr == b.s.myrepo.ID {
243
+
if err := b.s.AddNotification(ctx, b.s.myrepo.ID, p.Author, uri, NotifKindReply); err != nil {
244
+
slog.Warn("failed to create notification", "uri", uri, "error", err)
245
+
}
246
+
}
247
+
}
248
+
249
+
if rec.Embed != nil {
250
+
var rpref string
251
+
if rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
252
+
rpref = rec.Embed.EmbedRecord.Record.Uri
253
+
}
254
+
if rec.Embed.EmbedRecordWithMedia != nil &&
255
+
rec.Embed.EmbedRecordWithMedia.Record != nil &&
256
+
rec.Embed.EmbedRecordWithMedia.Record.Record != nil {
257
+
rpref = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri
258
+
}
259
+
260
+
if rpref != "" && strings.Contains(rpref, "app.bsky.feed.post") {
261
+
rp, err := b.postIDForUri(ctx, rpref)
262
+
if err != nil {
263
+
return fmt.Errorf("getting quote subject: %w", err)
264
+
}
265
+
266
+
p.Reposting = rp
267
+
}
268
+
}
269
+
270
+
if err := b.doPostCreate(ctx, &p); err != nil {
271
+
return err
272
+
}
273
+
274
+
// Check for mentions and create notifications
275
+
if rec.Facets != nil {
276
+
for _, facet := range rec.Facets {
277
+
for _, feature := range facet.Features {
278
+
if feature.RichtextFacet_Mention != nil {
279
+
mentionDid := feature.RichtextFacet_Mention.Did
280
+
// This is a mention
281
+
mentionedRepo, err := b.getOrCreateRepo(ctx, mentionDid)
282
+
if err != nil {
283
+
slog.Warn("failed to get repo for mention", "did", mentionDid, "error", err)
284
+
continue
285
+
}
286
+
287
+
// Create notification if the mentioned user is the current user
288
+
if mentionedRepo.ID == b.s.myrepo.ID {
289
+
if err := b.s.AddNotification(ctx, b.s.myrepo.ID, p.Author, uri, NotifKindMention); err != nil {
290
+
slog.Warn("failed to create mention notification", "uri", uri, "error", err)
291
+
}
292
+
}
293
+
}
294
+
}
295
+
}
296
+
}
297
+
298
+
b.postInfoCache.Add(uri, cachedPostInfo{
299
+
ID: p.ID,
300
+
Author: p.Author,
301
+
})
302
+
303
+
return nil
304
+
}
305
+
306
+
func (b *PostgresBackend) doPostCreate(ctx context.Context, p *Post) error {
307
+
/*
308
+
if err := b.db.Clauses(clause.OnConflict{
309
+
Columns: []clause.Column{{Name: "author"}, {Name: "rkey"}},
310
+
DoUpdates: clause.AssignmentColumns([]string{"cid", "not_found", "raw", "created", "indexed"}),
311
+
}).Create(p).Error; err != nil {
312
+
return err
313
+
}
314
+
*/
315
+
316
+
query := `
317
+
INSERT INTO posts (author, rkey, cid, not_found, raw, created, indexed, reposting, reply_to, reply_to_usr, in_thread)
318
+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
319
+
ON CONFLICT (author, rkey)
320
+
DO UPDATE SET
321
+
cid = $3,
322
+
not_found = $4,
323
+
raw = $5,
324
+
created = $6,
325
+
indexed = $7,
326
+
reposting = $8,
327
+
reply_to = $9,
328
+
reply_to_usr = $10,
329
+
in_thread = $11
330
+
RETURNING id
331
+
`
332
+
333
+
// Execute the query with parameters from the Post struct
334
+
if err := b.pgx.QueryRow(
335
+
ctx,
336
+
query,
337
+
p.Author,
338
+
p.Rkey,
339
+
p.Cid,
340
+
p.NotFound,
341
+
p.Raw,
342
+
p.Created,
343
+
p.Indexed,
344
+
p.Reposting,
345
+
p.ReplyTo,
346
+
p.ReplyToUsr,
347
+
p.InThread,
348
+
).Scan(&p.ID); err != nil {
349
+
return err
350
+
}
351
+
352
+
return nil
353
+
}
354
+
355
+
func (b *PostgresBackend) HandleCreateLike(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
356
+
var rec bsky.FeedLike
357
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
358
+
return err
359
+
}
360
+
361
+
if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) {
362
+
return nil
363
+
}
364
+
365
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
366
+
if err != nil {
367
+
return fmt.Errorf("invalid timestamp: %w", err)
368
+
}
369
+
370
+
pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri)
371
+
if err != nil {
372
+
return fmt.Errorf("getting like subject: %w", err)
373
+
}
374
+
375
+
if _, err := b.pgx.Exec(ctx, `INSERT INTO "likes" ("created","indexed","author","rkey","subject","cid") VALUES ($1, $2, $3, $4, $5, $6)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID, cc.String()); err != nil {
376
+
pgErr, ok := err.(*pgconn.PgError)
377
+
if ok && pgErr.Code == "23505" {
378
+
return nil
379
+
}
380
+
return err
381
+
}
382
+
383
+
// Create notification if the liked post belongs to the current user
384
+
if pinfo.Author == b.s.myrepo.ID {
385
+
uri := fmt.Sprintf("at://%s/app.bsky.feed.like/%s", repo.Did, rkey)
386
+
if err := b.s.AddNotification(ctx, b.s.myrepo.ID, repo.ID, uri, NotifKindLike); err != nil {
387
+
slog.Warn("failed to create like notification", "uri", uri, "error", err)
388
+
}
389
+
}
390
+
391
+
return nil
392
+
}
393
+
394
+
func (b *PostgresBackend) HandleCreateRepost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
395
+
var rec bsky.FeedRepost
396
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
397
+
return err
398
+
}
399
+
400
+
if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) {
401
+
return nil
402
+
}
403
+
404
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
405
+
if err != nil {
406
+
return fmt.Errorf("invalid timestamp: %w", err)
407
+
}
408
+
409
+
pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri)
410
+
if err != nil {
411
+
return fmt.Errorf("getting repost subject: %w", err)
412
+
}
413
+
414
+
if _, err := b.pgx.Exec(ctx, `INSERT INTO "reposts" ("created","indexed","author","rkey","subject") VALUES ($1, $2, $3, $4, $5)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID); err != nil {
415
+
pgErr, ok := err.(*pgconn.PgError)
416
+
if ok && pgErr.Code == "23505" {
417
+
return nil
418
+
}
419
+
return err
420
+
}
421
+
422
+
// Create notification if the reposted post belongs to the current user
423
+
if pinfo.Author == b.s.myrepo.ID {
424
+
uri := fmt.Sprintf("at://%s/app.bsky.feed.repost/%s", repo.Did, rkey)
425
+
if err := b.s.AddNotification(ctx, b.s.myrepo.ID, repo.ID, uri, NotifKindRepost); err != nil {
426
+
slog.Warn("failed to create repost notification", "uri", uri, "error", err)
427
+
}
428
+
}
429
+
430
+
return nil
431
+
}
432
+
433
+
func (b *PostgresBackend) HandleCreateFollow(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
434
+
var rec bsky.GraphFollow
435
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
436
+
return err
437
+
}
438
+
439
+
if !b.anyRelevantIdents(repo.Did, rec.Subject) {
440
+
return nil
441
+
}
442
+
443
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
444
+
if err != nil {
445
+
return fmt.Errorf("invalid timestamp: %w", err)
446
+
}
447
+
448
+
subj, err := b.getOrCreateRepo(ctx, rec.Subject)
449
+
if err != nil {
450
+
return err
451
+
}
452
+
453
+
if _, err := b.pgx.Exec(ctx, "INSERT INTO follows (created, indexed, author, rkey, subject) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING", created.Time(), time.Now(), repo.ID, rkey, subj.ID); err != nil {
454
+
return err
455
+
}
456
+
457
+
return nil
458
+
}
459
+
460
+
func (b *PostgresBackend) HandleCreateBlock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
461
+
var rec bsky.GraphBlock
462
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
463
+
return err
464
+
}
465
+
466
+
if !b.anyRelevantIdents(repo.Did, rec.Subject) {
467
+
return nil
468
+
}
469
+
470
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
471
+
if err != nil {
472
+
return fmt.Errorf("invalid timestamp: %w", err)
473
+
}
474
+
475
+
subj, err := b.getOrCreateRepo(ctx, rec.Subject)
476
+
if err != nil {
477
+
return err
478
+
}
479
+
480
+
if err := b.db.Create(&Block{
481
+
Created: created.Time(),
482
+
Indexed: time.Now(),
483
+
Author: repo.ID,
484
+
Rkey: rkey,
485
+
Subject: subj.ID,
486
+
}).Error; err != nil {
487
+
return err
488
+
}
489
+
490
+
return nil
491
+
}
492
+
493
+
func (b *PostgresBackend) HandleCreateList(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
494
+
var rec bsky.GraphList
495
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
496
+
return err
497
+
}
498
+
499
+
if !b.anyRelevantIdents(repo.Did) {
500
+
return nil
501
+
}
502
+
503
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
504
+
if err != nil {
505
+
return fmt.Errorf("invalid timestamp: %w", err)
506
+
}
507
+
508
+
if err := b.db.Create(&List{
509
+
Created: created.Time(),
510
+
Indexed: time.Now(),
511
+
Author: repo.ID,
512
+
Rkey: rkey,
513
+
Raw: recb,
514
+
}).Error; err != nil {
515
+
return err
516
+
}
517
+
518
+
return nil
519
+
}
520
+
521
+
func (b *PostgresBackend) HandleCreateListitem(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
522
+
var rec bsky.GraphListitem
523
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
524
+
return err
525
+
}
526
+
if !b.anyRelevantIdents(repo.Did) {
527
+
return nil
528
+
}
529
+
530
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
531
+
if err != nil {
532
+
return fmt.Errorf("invalid timestamp: %w", err)
533
+
}
534
+
535
+
subj, err := b.getOrCreateRepo(ctx, rec.Subject)
536
+
if err != nil {
537
+
return err
538
+
}
539
+
540
+
list, err := b.getOrCreateList(ctx, rec.List)
541
+
if err != nil {
542
+
return err
543
+
}
544
+
545
+
if err := b.db.Create(&ListItem{
546
+
Created: created.Time(),
547
+
Indexed: time.Now(),
548
+
Author: repo.ID,
549
+
Rkey: rkey,
550
+
Subject: subj.ID,
551
+
List: list.ID,
552
+
}).Error; err != nil {
553
+
return err
554
+
}
555
+
556
+
return nil
557
+
}
558
+
559
+
func (b *PostgresBackend) HandleCreateListblock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
560
+
var rec bsky.GraphListblock
561
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
562
+
return err
563
+
}
564
+
565
+
if !b.anyRelevantIdents(repo.Did, rec.Subject) {
566
+
return nil
567
+
}
568
+
569
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
570
+
if err != nil {
571
+
return fmt.Errorf("invalid timestamp: %w", err)
572
+
}
573
+
574
+
list, err := b.getOrCreateList(ctx, rec.Subject)
575
+
if err != nil {
576
+
return err
577
+
}
578
+
579
+
if err := b.db.Create(&ListBlock{
580
+
Created: created.Time(),
581
+
Indexed: time.Now(),
582
+
Author: repo.ID,
583
+
Rkey: rkey,
584
+
List: list.ID,
585
+
}).Error; err != nil {
586
+
return err
587
+
}
588
+
589
+
return nil
590
+
}
591
+
592
+
func (b *PostgresBackend) HandleCreateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error {
593
+
if !b.anyRelevantIdents(repo.Did) {
594
+
return nil
595
+
}
596
+
597
+
if err := b.db.Create(&Profile{
598
+
//Created: created.Time(),
599
+
Indexed: time.Now(),
600
+
Repo: repo.ID,
601
+
Raw: recb,
602
+
Rev: rev,
603
+
}).Error; err != nil {
604
+
return err
605
+
}
606
+
607
+
return nil
608
+
}
609
+
610
+
func (b *PostgresBackend) HandleUpdateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error {
611
+
if !b.anyRelevantIdents(repo.Did) {
612
+
return nil
613
+
}
614
+
615
+
if err := b.db.Create(&Profile{
616
+
Indexed: time.Now(),
617
+
Repo: repo.ID,
618
+
Raw: recb,
619
+
Rev: rev,
620
+
}).Error; err != nil {
621
+
return err
622
+
}
623
+
624
+
return nil
625
+
}
626
+
627
+
func (b *PostgresBackend) HandleCreateFeedGenerator(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
628
+
if !b.anyRelevantIdents(repo.Did) {
629
+
return nil
630
+
}
631
+
632
+
var rec bsky.FeedGenerator
633
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
634
+
return err
635
+
}
636
+
637
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
638
+
if err != nil {
639
+
return fmt.Errorf("invalid timestamp: %w", err)
640
+
}
641
+
642
+
if err := b.db.Create(&FeedGenerator{
643
+
Created: created.Time(),
644
+
Indexed: time.Now(),
645
+
Author: repo.ID,
646
+
Rkey: rkey,
647
+
Did: rec.Did,
648
+
}).Error; err != nil {
649
+
return err
650
+
}
651
+
652
+
return nil
653
+
}
654
+
655
+
func (b *PostgresBackend) HandleCreateThreadgate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
656
+
if !b.anyRelevantIdents(repo.Did) {
657
+
return nil
658
+
}
659
+
var rec bsky.FeedThreadgate
660
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
661
+
return err
662
+
}
663
+
664
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
665
+
if err != nil {
666
+
return fmt.Errorf("invalid timestamp: %w", err)
667
+
}
668
+
669
+
pid, err := b.postIDForUri(ctx, rec.Post)
670
+
if err != nil {
671
+
return err
672
+
}
673
+
674
+
if err := b.db.Create(&ThreadGate{
675
+
Created: created.Time(),
676
+
Indexed: time.Now(),
677
+
Author: repo.ID,
678
+
Rkey: rkey,
679
+
Post: pid,
680
+
}).Error; err != nil {
681
+
return err
682
+
}
683
+
684
+
return nil
685
+
}
686
+
687
+
func (b *PostgresBackend) HandleCreateChatDeclaration(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
688
+
// TODO: maybe track these?
689
+
return nil
690
+
}
691
+
692
+
func (b *PostgresBackend) HandleCreatePostGate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
693
+
if !b.anyRelevantIdents(repo.Did) {
694
+
return nil
695
+
}
696
+
var rec bsky.FeedPostgate
697
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
698
+
return err
699
+
}
700
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
701
+
if err != nil {
702
+
return fmt.Errorf("invalid timestamp: %w", err)
703
+
}
704
+
705
+
refPost, err := b.postInfoForUri(ctx, rec.Post)
706
+
if err != nil {
707
+
return err
708
+
}
709
+
710
+
if err := b.db.Create(&PostGate{
711
+
Created: created.Time(),
712
+
Indexed: time.Now(),
713
+
Author: repo.ID,
714
+
Rkey: rkey,
715
+
Subject: refPost.ID,
716
+
Raw: recb,
717
+
}).Error; err != nil {
718
+
return err
719
+
}
720
+
721
+
return nil
722
+
}
723
+
724
+
func (b *PostgresBackend) HandleCreateStarterPack(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
725
+
if !b.anyRelevantIdents(repo.Did) {
726
+
return nil
727
+
}
728
+
var rec bsky.GraphStarterpack
729
+
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
730
+
return err
731
+
}
732
+
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
733
+
if err != nil {
734
+
return fmt.Errorf("invalid timestamp: %w", err)
735
+
}
736
+
737
+
list, err := b.getOrCreateList(ctx, rec.List)
738
+
if err != nil {
739
+
return err
740
+
}
741
+
742
+
if err := b.db.Create(&StarterPack{
743
+
Created: created.Time(),
744
+
Indexed: time.Now(),
745
+
Author: repo.ID,
746
+
Rkey: rkey,
747
+
Raw: recb,
748
+
List: list.ID,
749
+
}).Error; err != nil {
750
+
return err
751
+
}
752
+
753
+
return nil
754
+
}
755
+
756
+
func (b *PostgresBackend) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
757
+
start := time.Now()
758
+
759
+
rr, err := b.getOrCreateRepo(ctx, repo)
760
+
if err != nil {
761
+
return fmt.Errorf("get user failed: %w", err)
762
+
}
763
+
764
+
lrev, err := b.revForRepo(rr)
765
+
if err != nil {
766
+
return err
767
+
}
768
+
if lrev != "" {
769
+
if rev < lrev {
770
+
//slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path)
771
+
return nil
772
+
}
773
+
}
774
+
775
+
parts := strings.Split(path, "/")
776
+
if len(parts) != 2 {
777
+
return fmt.Errorf("invalid path in HandleCreate: %q", path)
778
+
}
779
+
col := parts[0]
780
+
rkey := parts[1]
781
+
782
+
defer func() {
783
+
handleOpHist.WithLabelValues("update", col).Observe(float64(time.Since(start).Milliseconds()))
784
+
}()
785
+
786
+
if rkey == "" {
787
+
fmt.Printf("messed up path: %q\n", rkey)
788
+
}
789
+
790
+
switch col {
791
+
/*
792
+
case "app.bsky.feed.post":
793
+
if err := s.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil {
794
+
return err
795
+
}
796
+
case "app.bsky.feed.like":
797
+
if err := s.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil {
798
+
return err
799
+
}
800
+
case "app.bsky.feed.repost":
801
+
if err := s.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil {
802
+
return err
803
+
}
804
+
case "app.bsky.graph.follow":
805
+
if err := s.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil {
806
+
return err
807
+
}
808
+
case "app.bsky.graph.block":
809
+
if err := s.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil {
810
+
return err
811
+
}
812
+
case "app.bsky.graph.list":
813
+
if err := s.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil {
814
+
return err
815
+
}
816
+
case "app.bsky.graph.listitem":
817
+
if err := s.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil {
818
+
return err
819
+
}
820
+
case "app.bsky.graph.listblock":
821
+
if err := s.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil {
822
+
return err
823
+
}
824
+
*/
825
+
case "app.bsky.actor.profile":
826
+
if err := b.HandleUpdateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil {
827
+
return err
828
+
}
829
+
/*
830
+
case "app.bsky.feed.generator":
831
+
if err := s.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil {
832
+
return err
833
+
}
834
+
case "app.bsky.feed.threadgate":
835
+
if err := s.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil {
836
+
return err
837
+
}
838
+
case "chat.bsky.actor.declaration":
839
+
if err := s.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil {
840
+
return err
841
+
}
842
+
*/
843
+
default:
844
+
slog.Debug("unrecognized record type in update", "repo", repo, "path", path, "rev", rev)
845
+
}
846
+
847
+
return nil
848
+
}
849
+
850
+
func (b *PostgresBackend) HandleDelete(ctx context.Context, repo string, rev string, path string) error {
851
+
start := time.Now()
852
+
853
+
rr, err := b.getOrCreateRepo(ctx, repo)
854
+
if err != nil {
855
+
return fmt.Errorf("get user failed: %w", err)
856
+
}
857
+
858
+
lrev, ok := b.revCache.Get(rr.ID)
859
+
if ok {
860
+
if rev < lrev {
861
+
//slog.Info("skipping old rev delete", "did", rr.Did, "rev", rev, "oldrev", lrev)
862
+
return nil
863
+
}
864
+
}
865
+
866
+
parts := strings.Split(path, "/")
867
+
if len(parts) != 2 {
868
+
return fmt.Errorf("invalid path in HandleDelete: %q", path)
869
+
}
870
+
col := parts[0]
871
+
rkey := parts[1]
872
+
873
+
defer func() {
874
+
handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds()))
875
+
}()
876
+
877
+
switch col {
878
+
case "app.bsky.feed.post":
879
+
if err := b.HandleDeletePost(ctx, rr, rkey); err != nil {
880
+
return err
881
+
}
882
+
case "app.bsky.feed.like":
883
+
if err := b.HandleDeleteLike(ctx, rr, rkey); err != nil {
884
+
return err
885
+
}
886
+
case "app.bsky.feed.repost":
887
+
if err := b.HandleDeleteRepost(ctx, rr, rkey); err != nil {
888
+
return err
889
+
}
890
+
case "app.bsky.graph.follow":
891
+
if err := b.HandleDeleteFollow(ctx, rr, rkey); err != nil {
892
+
return err
893
+
}
894
+
case "app.bsky.graph.block":
895
+
if err := b.HandleDeleteBlock(ctx, rr, rkey); err != nil {
896
+
return err
897
+
}
898
+
case "app.bsky.graph.list":
899
+
if err := b.HandleDeleteList(ctx, rr, rkey); err != nil {
900
+
return err
901
+
}
902
+
case "app.bsky.graph.listitem":
903
+
if err := b.HandleDeleteListitem(ctx, rr, rkey); err != nil {
904
+
return err
905
+
}
906
+
case "app.bsky.graph.listblock":
907
+
if err := b.HandleDeleteListblock(ctx, rr, rkey); err != nil {
908
+
return err
909
+
}
910
+
case "app.bsky.actor.profile":
911
+
if err := b.HandleDeleteProfile(ctx, rr, rkey); err != nil {
912
+
return err
913
+
}
914
+
case "app.bsky.feed.generator":
915
+
if err := b.HandleDeleteFeedGenerator(ctx, rr, rkey); err != nil {
916
+
return err
917
+
}
918
+
case "app.bsky.feed.threadgate":
919
+
if err := b.HandleDeleteThreadgate(ctx, rr, rkey); err != nil {
920
+
return err
921
+
}
922
+
default:
923
+
slog.Warn("delete unrecognized record type", "repo", repo, "path", path, "rev", rev)
924
+
}
925
+
926
+
b.revCache.Add(rr.ID, rev)
927
+
return nil
928
+
}
929
+
930
+
func (b *PostgresBackend) HandleDeletePost(ctx context.Context, repo *Repo, rkey string) error {
931
+
var p Post
932
+
if err := b.db.Find(&p, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
933
+
return err
934
+
}
935
+
936
+
if p.ID == 0 {
937
+
//slog.Warn("delete of unknown post record", "repo", repo.Did, "rkey", rkey)
938
+
return nil
939
+
}
940
+
941
+
if err := b.db.Delete(&Post{}, p.ID).Error; err != nil {
942
+
return err
943
+
}
944
+
945
+
return nil
946
+
}
947
+
948
+
func (b *PostgresBackend) HandleDeleteLike(ctx context.Context, repo *Repo, rkey string) error {
949
+
var like Like
950
+
if err := b.db.Find(&like, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
951
+
return err
952
+
}
953
+
954
+
if like.ID == 0 {
955
+
//slog.Warn("delete of missing like", "repo", repo.Did, "rkey", rkey)
956
+
return nil
957
+
}
958
+
959
+
if err := b.db.Exec("DELETE FROM likes WHERE id = ?", like.ID).Error; err != nil {
960
+
return err
961
+
}
962
+
963
+
return nil
964
+
}
965
+
966
+
func (b *PostgresBackend) HandleDeleteRepost(ctx context.Context, repo *Repo, rkey string) error {
967
+
var repost Repost
968
+
if err := b.db.Find(&repost, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
969
+
return err
970
+
}
971
+
972
+
if repost.ID == 0 {
973
+
//return fmt.Errorf("delete of missing repost: %s %s", repo.Did, rkey)
974
+
return nil
975
+
}
976
+
977
+
if err := b.db.Exec("DELETE FROM reposts WHERE id = ?", repost.ID).Error; err != nil {
978
+
return err
979
+
}
980
+
981
+
return nil
982
+
}
983
+
984
+
func (b *PostgresBackend) HandleDeleteFollow(ctx context.Context, repo *Repo, rkey string) error {
985
+
var follow Follow
986
+
if err := b.db.Find(&follow, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
987
+
return err
988
+
}
989
+
990
+
if follow.ID == 0 {
991
+
//slog.Warn("delete of missing follow", "repo", repo.Did, "rkey", rkey)
992
+
return nil
993
+
}
994
+
995
+
if err := b.db.Exec("DELETE FROM follows WHERE id = ?", follow.ID).Error; err != nil {
996
+
return err
997
+
}
998
+
999
+
return nil
1000
+
}
1001
+
1002
+
func (b *PostgresBackend) HandleDeleteBlock(ctx context.Context, repo *Repo, rkey string) error {
1003
+
var block Block
1004
+
if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1005
+
return err
1006
+
}
1007
+
1008
+
if block.ID == 0 {
1009
+
//slog.Warn("delete of missing block", "repo", repo.Did, "rkey", rkey)
1010
+
return nil
1011
+
}
1012
+
1013
+
if err := b.db.Exec("DELETE FROM blocks WHERE id = ?", block.ID).Error; err != nil {
1014
+
return err
1015
+
}
1016
+
1017
+
return nil
1018
+
}
1019
+
1020
+
func (b *PostgresBackend) HandleDeleteList(ctx context.Context, repo *Repo, rkey string) error {
1021
+
var list List
1022
+
if err := b.db.Find(&list, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1023
+
return err
1024
+
}
1025
+
1026
+
if list.ID == 0 {
1027
+
return nil
1028
+
//return fmt.Errorf("delete of missing list: %s %s", repo.Did, rkey)
1029
+
}
1030
+
1031
+
if err := b.db.Exec("DELETE FROM lists WHERE id = ?", list.ID).Error; err != nil {
1032
+
return err
1033
+
}
1034
+
1035
+
return nil
1036
+
}
1037
+
1038
+
func (b *PostgresBackend) HandleDeleteListitem(ctx context.Context, repo *Repo, rkey string) error {
1039
+
var item ListItem
1040
+
if err := b.db.Find(&item, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1041
+
return err
1042
+
}
1043
+
1044
+
if item.ID == 0 {
1045
+
return nil
1046
+
//return fmt.Errorf("delete of missing listitem: %s %s", repo.Did, rkey)
1047
+
}
1048
+
1049
+
if err := b.db.Exec("DELETE FROM list_items WHERE id = ?", item.ID).Error; err != nil {
1050
+
return err
1051
+
}
1052
+
1053
+
return nil
1054
+
}
1055
+
1056
+
func (b *PostgresBackend) HandleDeleteListblock(ctx context.Context, repo *Repo, rkey string) error {
1057
+
var block ListBlock
1058
+
if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1059
+
return err
1060
+
}
1061
+
1062
+
if block.ID == 0 {
1063
+
return nil
1064
+
//return fmt.Errorf("delete of missing listblock: %s %s", repo.Did, rkey)
1065
+
}
1066
+
1067
+
if err := b.db.Exec("DELETE FROM list_blocks WHERE id = ?", block.ID).Error; err != nil {
1068
+
return err
1069
+
}
1070
+
1071
+
return nil
1072
+
}
1073
+
1074
+
func (b *PostgresBackend) HandleDeleteFeedGenerator(ctx context.Context, repo *Repo, rkey string) error {
1075
+
var feedgen FeedGenerator
1076
+
if err := b.db.Find(&feedgen, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1077
+
return err
1078
+
}
1079
+
1080
+
if feedgen.ID == 0 {
1081
+
return nil
1082
+
//return fmt.Errorf("delete of missing feedgen: %s %s", repo.Did, rkey)
1083
+
}
1084
+
1085
+
if err := b.db.Exec("DELETE FROM feed_generators WHERE id = ?", feedgen.ID).Error; err != nil {
1086
+
return err
1087
+
}
1088
+
1089
+
return nil
1090
+
}
1091
+
1092
+
func (b *PostgresBackend) HandleDeleteThreadgate(ctx context.Context, repo *Repo, rkey string) error {
1093
+
var threadgate ThreadGate
1094
+
if err := b.db.Find(&threadgate, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1095
+
return err
1096
+
}
1097
+
1098
+
if threadgate.ID == 0 {
1099
+
return nil
1100
+
//return fmt.Errorf("delete of missing threadgate: %s %s", repo.Did, rkey)
1101
+
}
1102
+
1103
+
if err := b.db.Exec("DELETE FROM thread_gates WHERE id = ?", threadgate.ID).Error; err != nil {
1104
+
return err
1105
+
}
1106
+
1107
+
return nil
1108
+
}
1109
+
1110
+
func (b *PostgresBackend) HandleDeleteProfile(ctx context.Context, repo *Repo, rkey string) error {
1111
+
var profile Profile
1112
+
if err := b.db.Find(&profile, "repo = ?", repo.ID).Error; err != nil {
1113
+
return err
1114
+
}
1115
+
1116
+
if profile.ID == 0 {
1117
+
return nil
1118
+
}
1119
+
1120
+
if err := b.db.Exec("DELETE FROM profiles WHERE id = ?", profile.ID).Error; err != nil {
1121
+
return err
1122
+
}
1123
+
1124
+
return nil
1125
+
}
+8
frontend/src/App.tsx
+8
frontend/src/App.tsx
···
5
5
import { PostView } from './components/PostView';
6
6
import { ThreadView } from './components/ThreadView';
7
7
import { PostComposer } from './components/PostComposer';
8
+
import { NotificationsPage } from './components/NotificationsPage';
8
9
import { ApiClient } from './api';
9
10
import './App.css';
10
11
···
33
34
>
34
35
Following
35
36
</Link>
37
+
<Link
38
+
to="/notifications"
39
+
className={`nav-link ${location.pathname === '/notifications' ? 'active' : ''}`}
40
+
>
41
+
Notifications
42
+
</Link>
36
43
{myHandle && (
37
44
<Link
38
45
to={`/profile/${myHandle}`}
···
57
64
<main className="app-main">
58
65
<Routes>
59
66
<Route path="/" element={<FollowingFeed />} />
67
+
<Route path="/notifications" element={<NotificationsPage />} />
60
68
<Route path="/profile/:account" element={<ProfilePage />} />
61
69
<Route path="/profile/:account/post/:rkey" element={<PostView />} />
62
70
<Route path="/thread" element={<ThreadView />} />
+12
-1
frontend/src/api.ts
+12
-1
frontend/src/api.ts
···
1
-
import { PostResponse, ActorProfile, ApiError, ThreadResponse, EngagementResponse, FeedResponse } from './types';
1
+
import { PostResponse, ActorProfile, ApiError, ThreadResponse, EngagementResponse, FeedResponse, NotificationsResponse } from './types';
2
2
3
3
const API_BASE_URL = 'http://localhost:4444/api';
4
4
···
115
115
text: text,
116
116
createdAt: new Date().toISOString(),
117
117
});
118
+
}
119
+
120
+
static async getNotifications(cursor?: string): Promise<NotificationsResponse> {
121
+
const url = cursor
122
+
? `${API_BASE_URL}/notifications?cursor=${encodeURIComponent(cursor)}`
123
+
: `${API_BASE_URL}/notifications`;
124
+
const response = await fetch(url);
125
+
if (!response.ok) {
126
+
throw new Error(`Failed to fetch notifications: ${response.statusText}`);
127
+
}
128
+
return response.json();
118
129
}
119
130
}
+184
frontend/src/components/NotificationsPage.css
+184
frontend/src/components/NotificationsPage.css
···
1
+
.notifications-page {
2
+
max-width: 600px;
3
+
margin: 0 auto;
4
+
background: white;
5
+
min-height: 100vh;
6
+
}
7
+
8
+
.notifications-header {
9
+
padding: 16px 20px;
10
+
border-bottom: 1px solid #e1e8ed;
11
+
background: white;
12
+
position: sticky;
13
+
top: 0;
14
+
z-index: 10;
15
+
}
16
+
17
+
.notifications-header h1 {
18
+
margin: 0;
19
+
font-size: 20px;
20
+
font-weight: 700;
21
+
color: #0f1419;
22
+
}
23
+
24
+
.notifications-list {
25
+
padding-bottom: 20px;
26
+
}
27
+
28
+
.notification {
29
+
border-bottom: 1px solid #e1e8ed;
30
+
transition: background-color 0.2s;
31
+
}
32
+
33
+
.notification:hover {
34
+
background-color: #f7f9fa;
35
+
}
36
+
37
+
.notification-link,
38
+
.notification-inner {
39
+
display: flex;
40
+
padding: 16px 20px;
41
+
text-decoration: none;
42
+
color: inherit;
43
+
gap: 12px;
44
+
}
45
+
46
+
.notification-link:hover {
47
+
text-decoration: none;
48
+
}
49
+
50
+
.notification-icon {
51
+
font-size: 24px;
52
+
flex-shrink: 0;
53
+
width: 32px;
54
+
height: 32px;
55
+
display: flex;
56
+
align-items: center;
57
+
justify-content: center;
58
+
}
59
+
60
+
.notification-like .notification-icon {
61
+
color: #e0245e;
62
+
}
63
+
64
+
.notification-reply .notification-icon {
65
+
color: #1da1f2;
66
+
}
67
+
68
+
.notification-repost .notification-icon {
69
+
color: #17bf63;
70
+
}
71
+
72
+
.notification-mention .notification-icon {
73
+
color: #794bc4;
74
+
}
75
+
76
+
.notification-content {
77
+
flex: 1;
78
+
min-width: 0;
79
+
}
80
+
81
+
.notification-author {
82
+
display: flex;
83
+
align-items: flex-start;
84
+
gap: 8px;
85
+
margin-bottom: 8px;
86
+
}
87
+
88
+
.notification-avatar {
89
+
width: 32px;
90
+
height: 32px;
91
+
border-radius: 50%;
92
+
object-fit: cover;
93
+
flex-shrink: 0;
94
+
}
95
+
96
+
.notification-text {
97
+
flex: 1;
98
+
font-size: 15px;
99
+
line-height: 1.4;
100
+
}
101
+
102
+
.notification-author-name {
103
+
font-weight: 600;
104
+
color: #0f1419;
105
+
text-decoration: none;
106
+
}
107
+
108
+
.notification-author-name:hover {
109
+
text-decoration: underline;
110
+
}
111
+
112
+
.notification-action {
113
+
color: #536471;
114
+
}
115
+
116
+
.notification-preview {
117
+
padding: 12px;
118
+
margin-top: 8px;
119
+
background-color: #f7f9fa;
120
+
border-radius: 8px;
121
+
font-size: 14px;
122
+
color: #0f1419;
123
+
line-height: 1.4;
124
+
white-space: pre-wrap;
125
+
overflow: hidden;
126
+
}
127
+
128
+
.notification-time {
129
+
font-size: 13px;
130
+
color: #657786;
131
+
margin-top: 4px;
132
+
}
133
+
134
+
.loading {
135
+
text-align: center;
136
+
padding: 40px;
137
+
font-size: 16px;
138
+
color: #536471;
139
+
}
140
+
141
+
.error {
142
+
text-align: center;
143
+
padding: 40px;
144
+
margin: 20px;
145
+
font-size: 16px;
146
+
color: #d63939;
147
+
background-color: #fef2f2;
148
+
border: 1px solid #fecaca;
149
+
border-radius: 8px;
150
+
}
151
+
152
+
.empty-notifications {
153
+
text-align: center;
154
+
padding: 60px 20px;
155
+
color: #536471;
156
+
}
157
+
158
+
.empty-notifications p {
159
+
margin: 0;
160
+
font-size: 16px;
161
+
}
162
+
163
+
.load-more-trigger {
164
+
min-height: 20px;
165
+
padding: 20px 0;
166
+
}
167
+
168
+
.loading-more {
169
+
text-align: center;
170
+
padding: 20px;
171
+
color: #536471;
172
+
font-size: 14px;
173
+
}
174
+
175
+
.end-of-notifications {
176
+
text-align: center;
177
+
padding: 40px 20px;
178
+
color: #657786;
179
+
font-size: 14px;
180
+
}
181
+
182
+
.end-of-notifications p {
183
+
margin: 0;
184
+
}
+208
frontend/src/components/NotificationsPage.tsx
+208
frontend/src/components/NotificationsPage.tsx
···
1
+
import React, { useState, useEffect, useRef, useCallback } from 'react';
2
+
import { Notification } from '../types';
3
+
import { ApiClient } from '../api';
4
+
import { formatRelativeTime, getBlobUrl, getPostUrl } from '../utils';
5
+
import { Link } from 'react-router-dom';
6
+
import './NotificationsPage.css';
7
+
8
+
export const NotificationsPage: React.FC = () => {
9
+
const [notifications, setNotifications] = useState<Notification[]>([]);
10
+
const [loading, setLoading] = useState(true);
11
+
const [loadingMore, setLoadingMore] = useState(false);
12
+
const [error, setError] = useState<string | null>(null);
13
+
const [cursor, setCursor] = useState<string | null>(null);
14
+
const [hasMore, setHasMore] = useState(true);
15
+
const observerTarget = useRef<HTMLDivElement>(null);
16
+
17
+
useEffect(() => {
18
+
const fetchNotifications = async () => {
19
+
try {
20
+
setLoading(true);
21
+
const data = await ApiClient.getNotifications();
22
+
setNotifications(data.notifications);
23
+
setCursor(data.cursor || null);
24
+
setHasMore(!!(data.cursor && data.notifications.length > 0));
25
+
} catch (err) {
26
+
setError(err instanceof Error ? err.message : 'Failed to load notifications');
27
+
} finally {
28
+
setLoading(false);
29
+
}
30
+
};
31
+
32
+
fetchNotifications();
33
+
}, []);
34
+
35
+
const fetchMoreNotifications = useCallback(async (cursorToUse: string) => {
36
+
if (loadingMore || !hasMore) return;
37
+
38
+
try {
39
+
setLoadingMore(true);
40
+
const data = await ApiClient.getNotifications(cursorToUse);
41
+
setNotifications(prev => [...prev, ...data.notifications]);
42
+
setCursor(data.cursor || null);
43
+
setHasMore(!!(data.cursor && data.notifications.length > 0));
44
+
} catch (err) {
45
+
console.error('Failed to fetch more notifications:', err);
46
+
} finally {
47
+
setLoadingMore(false);
48
+
}
49
+
}, [loadingMore, hasMore]);
50
+
51
+
useEffect(() => {
52
+
const observer = new IntersectionObserver(
53
+
(entries) => {
54
+
if (entries[0].isIntersecting && hasMore && !loadingMore && !loading && cursor) {
55
+
fetchMoreNotifications(cursor);
56
+
}
57
+
},
58
+
{ threshold: 0.1 }
59
+
);
60
+
61
+
const currentTarget = observerTarget.current;
62
+
if (currentTarget) {
63
+
observer.observe(currentTarget);
64
+
}
65
+
66
+
return () => {
67
+
if (currentTarget) {
68
+
observer.unobserve(currentTarget);
69
+
}
70
+
};
71
+
}, [hasMore, loadingMore, loading, cursor, fetchMoreNotifications]);
72
+
73
+
const getNotificationIcon = (kind: string) => {
74
+
switch (kind) {
75
+
case 'like':
76
+
return '♥';
77
+
case 'reply':
78
+
return '💬';
79
+
case 'repost':
80
+
return '🔄';
81
+
case 'mention':
82
+
return '@';
83
+
default:
84
+
return '🔔';
85
+
}
86
+
};
87
+
88
+
const getNotificationText = (notif: Notification) => {
89
+
switch (notif.kind) {
90
+
case 'like':
91
+
return 'liked your post';
92
+
case 'reply':
93
+
return 'replied to your post';
94
+
case 'repost':
95
+
return 'reposted your post';
96
+
case 'mention':
97
+
return 'mentioned you in a post';
98
+
default:
99
+
return 'interacted with your post';
100
+
}
101
+
};
102
+
103
+
const getNotificationLink = (notif: Notification) => {
104
+
// For replies and mentions, link to the post
105
+
// For likes and reposts, we could link to the original post but we don't have it easily
106
+
if (notif.kind === 'reply' || notif.kind === 'mention') {
107
+
return getPostUrl(notif.source);
108
+
}
109
+
return null;
110
+
};
111
+
112
+
if (loading) {
113
+
return (
114
+
<div className="notifications-page">
115
+
<div className="notifications-header">
116
+
<h1>Notifications</h1>
117
+
</div>
118
+
<div className="loading">Loading notifications...</div>
119
+
</div>
120
+
);
121
+
}
122
+
123
+
if (error && notifications.length === 0) {
124
+
return (
125
+
<div className="notifications-page">
126
+
<div className="notifications-header">
127
+
<h1>Notifications</h1>
128
+
</div>
129
+
<div className="error">Error: {error}</div>
130
+
</div>
131
+
);
132
+
}
133
+
134
+
return (
135
+
<div className="notifications-page">
136
+
<div className="notifications-header">
137
+
<h1>Notifications</h1>
138
+
</div>
139
+
<div className="notifications-list">
140
+
{notifications.map((notif) => {
141
+
const link = getNotificationLink(notif);
142
+
const content = (
143
+
<>
144
+
<div className="notification-icon">
145
+
{getNotificationIcon(notif.kind)}
146
+
</div>
147
+
<div className="notification-content">
148
+
<div className="notification-author">
149
+
{notif.author.profile?.avatar && (
150
+
<img
151
+
src={getBlobUrl(notif.author.profile.avatar, notif.author.did, 'avatar_thumbnail')}
152
+
alt="Avatar"
153
+
className="notification-avatar"
154
+
/>
155
+
)}
156
+
<div className="notification-text">
157
+
<Link to={`/profile/${notif.author.handle}`} className="notification-author-name">
158
+
{notif.author.profile?.displayName || notif.author.handle}
159
+
</Link>
160
+
{' '}
161
+
<span className="notification-action">{getNotificationText(notif)}</span>
162
+
</div>
163
+
</div>
164
+
{notif.sourcePost && (
165
+
<div className="notification-preview">
166
+
{notif.sourcePost.text}
167
+
</div>
168
+
)}
169
+
<div className="notification-time">
170
+
{formatRelativeTime(notif.createdAt)}
171
+
</div>
172
+
</div>
173
+
</>
174
+
);
175
+
176
+
return (
177
+
<div key={notif.id} className={`notification notification-${notif.kind}`}>
178
+
{link ? (
179
+
<Link to={link} className="notification-link">
180
+
{content}
181
+
</Link>
182
+
) : (
183
+
<div className="notification-inner">
184
+
{content}
185
+
</div>
186
+
)}
187
+
</div>
188
+
);
189
+
})}
190
+
{notifications.length === 0 && !loading && (
191
+
<div className="empty-notifications">
192
+
<p>No notifications yet</p>
193
+
</div>
194
+
)}
195
+
{hasMore && (
196
+
<div ref={observerTarget} className="load-more-trigger">
197
+
{loadingMore && <div className="loading-more">Loading more...</div>}
198
+
</div>
199
+
)}
200
+
{!hasMore && notifications.length > 0 && (
201
+
<div className="end-of-notifications">
202
+
<p>You're all caught up!</p>
203
+
</div>
204
+
)}
205
+
</div>
206
+
</div>
207
+
);
208
+
};
+17
frontend/src/types.ts
+17
frontend/src/types.ts
···
138
138
export interface FeedResponse {
139
139
posts: PostResponse[];
140
140
cursor: string;
141
+
}
142
+
143
+
export interface Notification {
144
+
id: number;
145
+
kind: 'reply' | 'like' | 'mention' | 'repost';
146
+
author: AuthorInfo;
147
+
source: string;
148
+
sourcePost?: {
149
+
text: string;
150
+
uri: string;
151
+
};
152
+
createdAt: string;
153
+
}
154
+
155
+
export interface NotificationsResponse {
156
+
notifications: Notification[];
157
+
cursor: string;
141
158
}
+110
handlers.go
+110
handlers.go
···
26
26
27
27
views := e.Group("/api")
28
28
views.GET("/me", s.handleGetMe)
29
+
views.GET("/notifications", s.handleGetNotifications)
29
30
views.GET("/profile/:account/post/:rkey", s.handleGetPost)
30
31
views.GET("/profile/:account", s.handleGetProfileView)
31
32
views.GET("/profile/:account/posts", s.handleGetProfilePosts)
···
635
636
if err := p.UnmarshalCBOR(bytes.NewReader(profile.Raw)); err == nil {
636
637
prof = &p
637
638
}
639
+
} else {
640
+
s.addMissingProfile(ctx, r.Did)
638
641
}
639
642
640
643
users = append(users, engagementUser{
···
693
696
if err := p.UnmarshalCBOR(bytes.NewReader(profile.Raw)); err == nil {
694
697
prof = &p
695
698
}
699
+
} else {
700
+
s.addMissingProfile(ctx, r.Did)
696
701
}
697
702
698
703
users = append(users, engagementUser{
···
759
764
if err := p.UnmarshalCBOR(bytes.NewReader(profile.Raw)); err == nil {
760
765
prof = &p
761
766
}
767
+
} else {
768
+
s.addMissingProfile(ctx, r.Did)
762
769
}
763
770
764
771
users = append(users, engagementUser{
···
822
829
823
830
return e.JSON(200, resp)
824
831
}
832
+
833
+
type notificationResponse struct {
834
+
ID uint `json:"id"`
835
+
Kind string `json:"kind"`
836
+
Author *authorInfo `json:"author"`
837
+
Source string `json:"source"`
838
+
SourcePost *struct {
839
+
Text string `json:"text"`
840
+
Uri string `json:"uri"`
841
+
} `json:"sourcePost,omitempty"`
842
+
CreatedAt string `json:"createdAt"`
843
+
}
844
+
845
+
func (s *Server) handleGetNotifications(e echo.Context) error {
846
+
ctx := e.Request().Context()
847
+
848
+
// Get cursor from query parameter (notification ID)
849
+
cursor := e.QueryParam("cursor")
850
+
limit := 50
851
+
852
+
var cursorID uint
853
+
if cursor != "" {
854
+
if _, err := fmt.Sscanf(cursor, "%d", &cursorID); err != nil {
855
+
return e.JSON(400, map[string]any{
856
+
"error": "invalid cursor",
857
+
})
858
+
}
859
+
}
860
+
861
+
// Query notifications
862
+
var notifications []Notification
863
+
query := `SELECT * FROM notifications WHERE "for" = ?`
864
+
if cursorID > 0 {
865
+
query += ` AND id < ?`
866
+
if err := s.backend.db.Raw(query+" ORDER BY created_at DESC LIMIT ?", s.myrepo.ID, cursorID, limit).Scan(¬ifications).Error; err != nil {
867
+
return err
868
+
}
869
+
} else {
870
+
if err := s.backend.db.Raw(query+" ORDER BY created_at DESC LIMIT ?", s.myrepo.ID, limit).Scan(¬ifications).Error; err != nil {
871
+
return err
872
+
}
873
+
}
874
+
875
+
// Hydrate notifications
876
+
results := []notificationResponse{}
877
+
for _, notif := range notifications {
878
+
// Get author info
879
+
author, err := s.backend.getRepoByID(ctx, notif.Author)
880
+
if err != nil {
881
+
slog.Error("failed to get repo for notification author", "error", err)
882
+
continue
883
+
}
884
+
885
+
authorInfo, err := s.getAuthorInfo(ctx, author)
886
+
if err != nil {
887
+
slog.Error("failed to get author info", "error", err)
888
+
continue
889
+
}
890
+
891
+
resp := notificationResponse{
892
+
ID: notif.ID,
893
+
Kind: notif.Kind,
894
+
Author: authorInfo,
895
+
Source: notif.Source,
896
+
CreatedAt: notif.CreatedAt.Format(time.RFC3339),
897
+
}
898
+
899
+
// Try to get source post preview for reply/mention notifications
900
+
if notif.Kind == NotifKindReply || notif.Kind == NotifKindMention {
901
+
// Parse URI to get post
902
+
p, err := s.backend.getPostByUri(ctx, notif.Source, "*")
903
+
if err == nil && p.Raw != nil && len(p.Raw) > 0 {
904
+
var fp bsky.FeedPost
905
+
if err := fp.UnmarshalCBOR(bytes.NewReader(p.Raw)); err == nil {
906
+
preview := fp.Text
907
+
if len(preview) > 100 {
908
+
preview = preview[:100] + "..."
909
+
}
910
+
resp.SourcePost = &struct {
911
+
Text string `json:"text"`
912
+
Uri string `json:"uri"`
913
+
}{
914
+
Text: preview,
915
+
Uri: notif.Source,
916
+
}
917
+
}
918
+
}
919
+
}
920
+
921
+
results = append(results, resp)
922
+
}
923
+
924
+
// Generate next cursor
925
+
var nextCursor string
926
+
if len(notifications) > 0 {
927
+
nextCursor = fmt.Sprintf("%d", notifications[len(notifications)-1].ID)
928
+
}
929
+
930
+
return e.JSON(200, map[string]any{
931
+
"notifications": results,
932
+
"cursor": nextCursor,
933
+
})
934
+
}
+14
-1448
main.go
+14
-1448
main.go
···
1
1
package main
2
2
3
3
import (
4
-
"bytes"
5
4
"context"
6
-
"errors"
7
5
"fmt"
8
6
"log"
9
7
"log/slog"
···
17
15
"time"
18
16
19
17
"github.com/bluesky-social/indigo/api/atproto"
20
-
"github.com/bluesky-social/indigo/api/bsky"
21
18
"github.com/bluesky-social/indigo/atproto/identity"
22
19
"github.com/bluesky-social/indigo/atproto/syntax"
23
20
"github.com/bluesky-social/indigo/cmd/relay/stream"
24
21
"github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel"
25
-
"github.com/bluesky-social/indigo/repo"
26
-
"github.com/bluesky-social/indigo/util"
27
22
"github.com/bluesky-social/indigo/util/cliutil"
28
23
"github.com/bluesky-social/indigo/xrpc"
29
24
"github.com/gorilla/websocket"
30
25
lru "github.com/hashicorp/golang-lru/v2"
31
-
"github.com/ipfs/go-cid"
32
-
"github.com/jackc/pgx/v5"
33
-
"github.com/jackc/pgx/v5/pgconn"
34
26
"github.com/jackc/pgx/v5/pgxpool"
35
27
"github.com/prometheus/client_golang/prometheus"
36
28
"github.com/prometheus/client_golang/prometheus/promauto"
37
29
"github.com/urfave/cli/v2"
38
-
"github.com/whyrusleeping/market/models"
39
-
"gorm.io/gorm"
40
-
"gorm.io/gorm/clause"
41
30
"gorm.io/gorm/logger"
42
31
)
43
32
···
98
87
db.AutoMigrate(PostGate{})
99
88
db.AutoMigrate(StarterPack{})
100
89
db.AutoMigrate(SyncInfo{})
90
+
db.AutoMigrate(Notification{})
101
91
102
92
ctx := context.TODO()
103
93
···
337
327
return resp.DID.String(), nil
338
328
}
339
329
340
-
func (b *PostgresBackend) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error {
341
-
r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
342
-
if err != nil {
343
-
return fmt.Errorf("failed to read event repo: %w", err)
344
-
}
345
-
346
-
for _, op := range evt.Ops {
347
-
switch op.Action {
348
-
case "create":
349
-
c, rec, err := r.GetRecordBytes(ctx, op.Path)
350
-
if err != nil {
351
-
return err
352
-
}
353
-
if err := b.HandleCreate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil {
354
-
return fmt.Errorf("create record failed: %w", err)
355
-
}
356
-
case "update":
357
-
c, rec, err := r.GetRecordBytes(ctx, op.Path)
358
-
if err != nil {
359
-
return err
360
-
}
361
-
if err := b.HandleUpdate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil {
362
-
return fmt.Errorf("update record failed: %w", err)
363
-
}
364
-
case "delete":
365
-
if err := b.HandleDelete(ctx, evt.Repo, evt.Rev, op.Path); err != nil {
366
-
return fmt.Errorf("delete record failed: %w", err)
367
-
}
368
-
}
369
-
}
370
-
371
-
// TODO: sync with the Since field to make sure we don't miss events we care about
372
-
/*
373
-
if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil {
374
-
return fmt.Errorf("failed to update rev: %w", err)
375
-
}
376
-
*/
377
-
378
-
return nil
379
-
}
380
-
381
-
func (b *PostgresBackend) getOrCreateRepo(ctx context.Context, did string) (*Repo, error) {
382
-
r, ok := b.repoCache.Get(did)
383
-
if !ok {
384
-
b.reposLk.Lock()
385
-
386
-
r, ok = b.repoCache.Get(did)
387
-
if !ok {
388
-
r = &Repo{}
389
-
r.Did = did
390
-
b.repoCache.Add(did, r)
391
-
}
392
-
393
-
b.reposLk.Unlock()
394
-
}
395
-
396
-
r.Lk.Lock()
397
-
defer r.Lk.Unlock()
398
-
if r.Setup {
399
-
return r, nil
400
-
}
401
-
402
-
row := b.pgx.QueryRow(ctx, "SELECT id, created_at, did FROM repos WHERE did = $1", did)
403
-
404
-
err := row.Scan(&r.ID, &r.CreatedAt, &r.Did)
405
-
if err == nil {
406
-
// found it!
407
-
r.Setup = true
408
-
return r, nil
409
-
}
410
-
411
-
if err != pgx.ErrNoRows {
412
-
return nil, err
413
-
}
414
-
415
-
r.Did = did
416
-
if err := b.db.Create(r).Error; err != nil {
417
-
return nil, err
418
-
}
419
-
420
-
r.Setup = true
421
-
422
-
return r, nil
423
-
}
424
-
425
-
func (b *PostgresBackend) getOrCreateList(ctx context.Context, uri string) (*List, error) {
426
-
puri, err := util.ParseAtUri(uri)
427
-
if err != nil {
428
-
return nil, err
429
-
}
430
-
431
-
r, err := b.getOrCreateRepo(ctx, puri.Did)
432
-
if err != nil {
433
-
return nil, err
434
-
}
435
-
436
-
// TODO: needs upsert treatment when we actually find the list
437
-
var list List
438
-
if err := b.db.FirstOrCreate(&list, map[string]any{
439
-
"author": r.ID,
440
-
"rkey": puri.Rkey,
441
-
}).Error; err != nil {
442
-
return nil, err
443
-
}
444
-
return &list, nil
445
-
}
446
-
447
-
type cachedPostInfo struct {
448
-
ID uint
449
-
Author uint
450
-
}
451
-
452
-
func (b *PostgresBackend) postIDForUri(ctx context.Context, uri string) (uint, error) {
453
-
// getPostByUri implicitly fills the cache
454
-
p, err := b.postInfoForUri(ctx, uri)
455
-
if err != nil {
456
-
return 0, err
457
-
}
458
-
459
-
return p.ID, nil
460
-
}
461
-
462
-
func (b *PostgresBackend) postInfoForUri(ctx context.Context, uri string) (cachedPostInfo, error) {
463
-
v, ok := b.postInfoCache.Get(uri)
464
-
if ok {
465
-
return v, nil
466
-
}
467
-
468
-
// getPostByUri implicitly fills the cache
469
-
p, err := b.getOrCreatePostBare(ctx, uri)
470
-
if err != nil {
471
-
return cachedPostInfo{}, err
472
-
}
473
-
474
-
return cachedPostInfo{ID: p.ID, Author: p.Author}, nil
475
-
}
476
-
477
-
func (b *PostgresBackend) tryLoadPostInfo(ctx context.Context, uid uint, rkey string) (*Post, error) {
478
-
var p Post
479
-
q := "SELECT id, author FROM posts WHERE author = $1 AND rkey = $2"
480
-
if err := b.pgx.QueryRow(ctx, q, uid, rkey).Scan(&p.ID, &p.Author); err != nil {
481
-
if errors.Is(err, pgx.ErrNoRows) {
482
-
return nil, nil
483
-
}
484
-
return nil, err
485
-
}
486
-
487
-
return &p, nil
488
-
}
489
-
490
-
func (b *PostgresBackend) getOrCreatePostBare(ctx context.Context, uri string) (*Post, error) {
491
-
puri, err := util.ParseAtUri(uri)
492
-
if err != nil {
493
-
return nil, err
494
-
}
495
-
496
-
r, err := b.getOrCreateRepo(ctx, puri.Did)
497
-
if err != nil {
498
-
return nil, err
499
-
}
500
-
501
-
post, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey)
502
-
if err != nil {
503
-
return nil, err
504
-
}
505
-
506
-
if post == nil {
507
-
post = &Post{
508
-
Rkey: puri.Rkey,
509
-
Author: r.ID,
510
-
NotFound: true,
511
-
}
512
-
513
-
err := b.pgx.QueryRow(ctx, "INSERT INTO posts (rkey, author, not_found) VALUES ($1, $2, $3) RETURNING id", puri.Rkey, r.ID, true).Scan(&post.ID)
514
-
if err != nil {
515
-
pgErr, ok := err.(*pgconn.PgError)
516
-
if !ok || pgErr.Code != "23505" {
517
-
return nil, err
518
-
}
519
-
520
-
out, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey)
521
-
if err != nil {
522
-
return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err)
523
-
}
524
-
if out == nil {
525
-
return nil, fmt.Errorf("postgres is lying to us: %d %s", r.ID, puri.Rkey)
526
-
}
527
-
528
-
post = out
529
-
}
530
-
531
-
}
532
-
533
-
b.postInfoCache.Add(uri, cachedPostInfo{
534
-
ID: post.ID,
535
-
Author: post.Author,
536
-
})
537
-
538
-
return post, nil
539
-
}
540
-
541
-
func (b *PostgresBackend) getPostByUri(ctx context.Context, uri string, fields string) (*Post, error) {
542
-
puri, err := util.ParseAtUri(uri)
543
-
if err != nil {
544
-
return nil, err
545
-
}
546
-
547
-
r, err := b.getOrCreateRepo(ctx, puri.Did)
548
-
if err != nil {
549
-
return nil, err
550
-
}
551
-
552
-
q := "SELECT " + fields + " FROM posts WHERE author = ? AND rkey = ?"
553
-
554
-
var post Post
555
-
if err := b.db.Raw(q, r.ID, puri.Rkey).Scan(&post).Error; err != nil {
556
-
return nil, err
557
-
}
558
-
559
-
if post.ID == 0 {
560
-
post.Rkey = puri.Rkey
561
-
post.Author = r.ID
562
-
post.NotFound = true
563
-
564
-
if err := b.db.Session(&gorm.Session{
565
-
Logger: logger.Default.LogMode(logger.Silent),
566
-
}).Create(&post).Error; err != nil {
567
-
if !errors.Is(err, gorm.ErrDuplicatedKey) {
568
-
return nil, err
569
-
}
570
-
if err := b.db.Find(&post, "author = ? AND rkey = ?", r.ID, puri.Rkey).Error; err != nil {
571
-
return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err)
572
-
}
573
-
}
574
-
575
-
}
576
-
577
-
b.postInfoCache.Add(uri, cachedPostInfo{
578
-
ID: post.ID,
579
-
Author: post.Author,
580
-
})
581
-
582
-
return &post, nil
583
-
}
584
-
585
-
func (b *PostgresBackend) revForRepo(rr *Repo) (string, error) {
586
-
lrev, ok := b.revCache.Get(rr.ID)
587
-
if ok {
588
-
return lrev, nil
589
-
}
590
-
591
-
var rev string
592
-
if err := b.pgx.QueryRow(context.TODO(), "SELECT COALESCE(rev, '') FROM sync_infos WHERE repo = $1", rr.ID).Scan(&rev); err != nil {
593
-
if errors.Is(err, pgx.ErrNoRows) {
594
-
return "", nil
595
-
}
596
-
return "", err
597
-
}
598
-
599
-
if rev != "" {
600
-
b.revCache.Add(rr.ID, rev)
601
-
}
602
-
return rev, nil
603
-
}
604
-
605
-
func (b *PostgresBackend) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
606
-
start := time.Now()
607
-
608
-
rr, err := b.getOrCreateRepo(ctx, repo)
609
-
if err != nil {
610
-
return fmt.Errorf("get user failed: %w", err)
611
-
}
612
-
613
-
lrev, err := b.revForRepo(rr)
614
-
if err != nil {
615
-
return err
616
-
}
617
-
if lrev != "" {
618
-
if rev < lrev {
619
-
slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path)
620
-
return nil
621
-
}
622
-
}
623
-
624
-
parts := strings.Split(path, "/")
625
-
if len(parts) != 2 {
626
-
return fmt.Errorf("invalid path in HandleCreate: %q", path)
627
-
}
628
-
col := parts[0]
629
-
rkey := parts[1]
630
-
631
-
defer func() {
632
-
handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds()))
633
-
}()
634
-
635
-
if rkey == "" {
636
-
fmt.Printf("messed up path: %q\n", rkey)
637
-
}
638
-
639
-
switch col {
640
-
case "app.bsky.feed.post":
641
-
if err := b.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil {
642
-
return err
643
-
}
644
-
case "app.bsky.feed.like":
645
-
if err := b.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil {
646
-
return err
647
-
}
648
-
case "app.bsky.feed.repost":
649
-
if err := b.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil {
650
-
return err
651
-
}
652
-
case "app.bsky.graph.follow":
653
-
if err := b.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil {
654
-
return err
655
-
}
656
-
case "app.bsky.graph.block":
657
-
if err := b.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil {
658
-
return err
659
-
}
660
-
case "app.bsky.graph.list":
661
-
if err := b.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil {
662
-
return err
663
-
}
664
-
case "app.bsky.graph.listitem":
665
-
if err := b.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil {
666
-
return err
667
-
}
668
-
case "app.bsky.graph.listblock":
669
-
if err := b.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil {
670
-
return err
671
-
}
672
-
case "app.bsky.actor.profile":
673
-
if err := b.HandleCreateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil {
674
-
return err
675
-
}
676
-
case "app.bsky.feed.generator":
677
-
if err := b.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil {
678
-
return err
679
-
}
680
-
case "app.bsky.feed.threadgate":
681
-
if err := b.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil {
682
-
return err
683
-
}
684
-
case "chat.bsky.actor.declaration":
685
-
if err := b.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil {
686
-
return err
687
-
}
688
-
case "app.bsky.feed.postgate":
689
-
if err := b.HandleCreatePostGate(ctx, rr, rkey, *rec, *cid); err != nil {
690
-
return err
691
-
}
692
-
case "app.bsky.graph.starterpack":
693
-
if err := b.HandleCreateStarterPack(ctx, rr, rkey, *rec, *cid); err != nil {
694
-
return err
695
-
}
696
-
default:
697
-
slog.Debug("unrecognized record type", "repo", repo, "path", path, "rev", rev)
698
-
}
699
-
700
-
b.revCache.Add(rr.ID, rev)
701
-
return nil
702
-
}
703
-
704
-
type PostgresBackend struct {
705
-
db *gorm.DB
706
-
pgx *pgxpool.Pool
707
-
s *Server
708
-
709
-
relevantDids map[string]bool
710
-
rdLk sync.Mutex
711
-
712
-
revCache *lru.TwoQueueCache[uint, string]
713
-
714
-
repoCache *lru.TwoQueueCache[string, *Repo]
715
-
reposLk sync.Mutex
716
-
717
-
postInfoCache *lru.TwoQueueCache[string, cachedPostInfo]
718
-
}
719
-
720
-
func (b *PostgresBackend) ensureFollowsScraped(ctx context.Context, user string) error {
721
-
r, err := b.getOrCreateRepo(ctx, user)
722
-
if err != nil {
723
-
return err
724
-
}
725
-
726
-
var si SyncInfo
727
-
if err := b.db.Find(&si, "repo = ?", r.ID).Error; err != nil {
728
-
return err
729
-
}
730
-
731
-
// not found
732
-
if si.Repo == 0 {
733
-
if err := b.db.Create(&SyncInfo{
734
-
Repo: r.ID,
735
-
}).Error; err != nil {
736
-
return err
737
-
}
738
-
}
739
-
740
-
if si.FollowsSynced {
741
-
return nil
742
-
}
743
-
744
-
var follows []Follow
745
-
var cursor string
746
-
for {
747
-
resp, err := atproto.RepoListRecords(ctx, b.s.client, "app.bsky.graph.follow", cursor, 100, b.s.mydid, false)
748
-
if err != nil {
749
-
return err
750
-
}
751
-
752
-
for _, rec := range resp.Records {
753
-
if fol, ok := rec.Value.Val.(*bsky.GraphFollow); ok {
754
-
fr, err := b.getOrCreateRepo(ctx, fol.Subject)
755
-
if err != nil {
756
-
return err
757
-
}
758
-
759
-
puri, err := syntax.ParseATURI(rec.Uri)
760
-
if err != nil {
761
-
return err
762
-
}
763
-
764
-
follows = append(follows, Follow{
765
-
Created: time.Now(),
766
-
Indexed: time.Now(),
767
-
Rkey: puri.RecordKey().String(),
768
-
Author: r.ID,
769
-
Subject: fr.ID,
770
-
})
771
-
}
772
-
}
773
-
774
-
if resp.Cursor == nil || len(resp.Records) == 0 {
775
-
break
776
-
}
777
-
cursor = *resp.Cursor
778
-
}
779
-
780
-
if err := b.db.Clauses(clause.OnConflict{DoNothing: true}).CreateInBatches(follows, 200).Error; err != nil {
781
-
return err
782
-
}
783
-
784
-
if err := b.db.Model(SyncInfo{}).Where("repo = ?", r.ID).Update("follows_synced", true).Error; err != nil {
785
-
return err
786
-
}
787
-
788
-
fmt.Println("Got follows: ", len(follows))
789
-
790
-
return nil
791
-
}
792
-
793
-
func (b *PostgresBackend) loadRelevantDids() error {
794
-
ctx := context.TODO()
795
-
796
-
if err := b.ensureFollowsScraped(ctx, b.s.mydid); err != nil {
797
-
return fmt.Errorf("failed to scrape follows: %w", err)
798
-
}
799
-
800
-
r, err := b.getOrCreateRepo(ctx, b.s.mydid)
801
-
if err != nil {
802
-
return err
803
-
}
804
-
805
-
var dids []string
806
-
if err := b.db.Raw("select did from follows left join repos on follows.subject = repos.id where follows.author = ?", r.ID).Scan(&dids).Error; err != nil {
807
-
return err
808
-
}
809
-
810
-
b.relevantDids[b.s.mydid] = true
811
-
for _, d := range dids {
812
-
fmt.Println("adding did: ", d)
813
-
b.relevantDids[d] = true
814
-
}
815
-
816
-
return nil
817
-
}
818
-
819
-
type SyncInfo struct {
820
-
Repo uint `gorm:"index"`
821
-
FollowsSynced bool
822
-
Rev string
823
-
}
824
-
825
-
func (b *PostgresBackend) checkPostExists(ctx context.Context, repo *Repo, rkey string) (bool, error) {
826
-
var id uint
827
-
var notfound bool
828
-
if err := b.pgx.QueryRow(ctx, "SELECT id, not_found FROM posts WHERE author = $1 AND rkey = $2", repo.ID, rkey).Scan(&id, ¬found); err != nil {
829
-
if errors.Is(err, pgx.ErrNoRows) {
830
-
return false, nil
831
-
}
832
-
return false, err
833
-
}
834
-
835
-
if id != 0 && !notfound {
836
-
return true, nil
837
-
}
838
-
839
-
return false, nil
840
-
}
841
-
842
-
func (b *PostgresBackend) HandleCreatePost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
843
-
exists, err := b.checkPostExists(ctx, repo, rkey)
844
-
if err != nil {
845
-
return err
846
-
}
847
-
848
-
// still technically a race condition if two creates for the same post happen concurrently... probably fine
849
-
if exists {
850
-
return nil
851
-
}
852
-
853
-
var rec bsky.FeedPost
854
-
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
855
-
return err
856
-
}
857
-
858
-
reldids := []string{repo.Did}
859
-
// care about a post if its in a thread of a user we are interested in
860
-
if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil {
861
-
reldids = append(reldids, rec.Reply.Parent.Uri, rec.Reply.Root.Uri)
862
-
}
863
-
// TODO: maybe also care if its mentioning a user we care about or quoting a user we care about?
864
-
if !b.anyRelevantIdents(reldids...) {
865
-
return nil
866
-
}
867
-
868
-
uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey
869
-
slog.Warn("adding post", "uri", uri)
870
-
871
-
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
872
-
if err != nil {
873
-
return fmt.Errorf("invalid timestamp: %w", err)
874
-
}
875
-
876
-
p := Post{
877
-
Created: created.Time(),
878
-
Indexed: time.Now(),
879
-
Author: repo.ID,
880
-
Rkey: rkey,
881
-
Raw: recb,
882
-
Cid: cc.String(),
883
-
}
884
-
885
-
if rec.Reply != nil && rec.Reply.Parent != nil {
886
-
if rec.Reply.Root == nil {
887
-
return fmt.Errorf("post reply had nil root")
888
-
}
889
-
890
-
pinfo, err := b.postInfoForUri(ctx, rec.Reply.Parent.Uri)
891
-
if err != nil {
892
-
return fmt.Errorf("getting reply parent: %w", err)
893
-
}
894
-
895
-
p.ReplyTo = pinfo.ID
896
-
p.ReplyToUsr = pinfo.Author
897
-
898
-
thread, err := b.postIDForUri(ctx, rec.Reply.Root.Uri)
899
-
if err != nil {
900
-
return fmt.Errorf("getting thread root: %w", err)
901
-
}
902
-
903
-
p.InThread = thread
904
-
}
905
-
906
-
if rec.Embed != nil {
907
-
var rpref string
908
-
if rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
909
-
rpref = rec.Embed.EmbedRecord.Record.Uri
910
-
}
911
-
if rec.Embed.EmbedRecordWithMedia != nil &&
912
-
rec.Embed.EmbedRecordWithMedia.Record != nil &&
913
-
rec.Embed.EmbedRecordWithMedia.Record.Record != nil {
914
-
rpref = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri
915
-
}
916
-
917
-
if rpref != "" && strings.Contains(rpref, "app.bsky.feed.post") {
918
-
rp, err := b.postIDForUri(ctx, rpref)
919
-
if err != nil {
920
-
return fmt.Errorf("getting quote subject: %w", err)
921
-
}
922
-
923
-
p.Reposting = rp
924
-
}
925
-
}
926
-
927
-
if err := b.doPostCreate(ctx, &p); err != nil {
928
-
return err
929
-
}
930
-
931
-
b.postInfoCache.Add(uri, cachedPostInfo{
932
-
ID: p.ID,
933
-
Author: p.Author,
934
-
})
935
-
936
-
return nil
937
-
}
938
-
939
-
func (b *PostgresBackend) doPostCreate(ctx context.Context, p *Post) error {
940
-
/*
941
-
if err := b.db.Clauses(clause.OnConflict{
942
-
Columns: []clause.Column{{Name: "author"}, {Name: "rkey"}},
943
-
DoUpdates: clause.AssignmentColumns([]string{"cid", "not_found", "raw", "created", "indexed"}),
944
-
}).Create(p).Error; err != nil {
945
-
return err
946
-
}
947
-
*/
948
-
949
-
query := `
950
-
INSERT INTO posts (author, rkey, cid, not_found, raw, created, indexed, reposting, reply_to, reply_to_usr, in_thread)
951
-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
952
-
ON CONFLICT (author, rkey)
953
-
DO UPDATE SET
954
-
cid = $3,
955
-
not_found = $4,
956
-
raw = $5,
957
-
created = $6,
958
-
indexed = $7,
959
-
reposting = $8,
960
-
reply_to = $9,
961
-
reply_to_usr = $10,
962
-
in_thread = $11
963
-
RETURNING id
964
-
`
965
-
966
-
// Execute the query with parameters from the Post struct
967
-
if err := b.pgx.QueryRow(
968
-
ctx,
969
-
query,
970
-
p.Author,
971
-
p.Rkey,
972
-
p.Cid,
973
-
p.NotFound,
974
-
p.Raw,
975
-
p.Created,
976
-
p.Indexed,
977
-
p.Reposting,
978
-
p.ReplyTo,
979
-
p.ReplyToUsr,
980
-
p.InThread,
981
-
).Scan(&p.ID); err != nil {
982
-
return err
983
-
}
984
-
985
-
return nil
986
-
}
987
-
988
-
func (b *PostgresBackend) didIsRelevant(did string) bool {
989
-
b.rdLk.Lock()
990
-
defer b.rdLk.Unlock()
991
-
return b.relevantDids[did]
992
-
}
993
-
994
-
func (b *PostgresBackend) anyRelevantIdents(idents ...string) bool {
995
-
for _, id := range idents {
996
-
if strings.HasPrefix(id, "did:") {
997
-
if b.didIsRelevant(id) {
998
-
return true
999
-
}
1000
-
} else if strings.HasPrefix(id, "at://") {
1001
-
puri, err := syntax.ParseATURI(id)
1002
-
if err != nil {
1003
-
continue
1004
-
}
1005
-
1006
-
if b.didIsRelevant(puri.Authority().String()) {
1007
-
return true
1008
-
}
1009
-
}
1010
-
}
1011
-
1012
-
return false
1013
-
}
1014
-
1015
-
func (b *PostgresBackend) HandleCreateLike(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
1016
-
var rec bsky.FeedLike
1017
-
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
1018
-
return err
1019
-
}
1020
-
1021
-
if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) {
1022
-
return nil
1023
-
}
1024
-
1025
-
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
1026
-
if err != nil {
1027
-
return fmt.Errorf("invalid timestamp: %w", err)
1028
-
}
1029
-
1030
-
pid, err := b.postIDForUri(ctx, rec.Subject.Uri)
1031
-
if err != nil {
1032
-
return fmt.Errorf("getting like subject: %w", err)
1033
-
}
1034
-
1035
-
if _, err := b.pgx.Exec(ctx, `INSERT INTO "likes" ("created","indexed","author","rkey","subject","cid") VALUES ($1, $2, $3, $4, $5, $6)`, created.Time(), time.Now(), repo.ID, rkey, pid, cc.String()); err != nil {
1036
-
pgErr, ok := err.(*pgconn.PgError)
1037
-
if ok && pgErr.Code == "23505" {
1038
-
return nil
1039
-
}
1040
-
return err
1041
-
}
1042
-
1043
-
return nil
1044
-
}
1045
-
1046
-
func (b *PostgresBackend) HandleCreateRepost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
1047
-
var rec bsky.FeedRepost
1048
-
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
1049
-
return err
1050
-
}
1051
-
1052
-
if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) {
1053
-
return nil
1054
-
}
1055
-
1056
-
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
1057
-
if err != nil {
1058
-
return fmt.Errorf("invalid timestamp: %w", err)
1059
-
}
1060
-
1061
-
pid, err := b.postIDForUri(ctx, rec.Subject.Uri)
1062
-
if err != nil {
1063
-
return fmt.Errorf("getting repost subject: %w", err)
1064
-
}
1065
-
1066
-
if _, err := b.pgx.Exec(ctx, `INSERT INTO "reposts" ("created","indexed","author","rkey","subject") VALUES ($1, $2, $3, $4, $5)`, created.Time(), time.Now(), repo.ID, rkey, pid); err != nil {
1067
-
pgErr, ok := err.(*pgconn.PgError)
1068
-
if ok && pgErr.Code == "23505" {
1069
-
return nil
1070
-
}
1071
-
return err
1072
-
}
1073
-
1074
-
return nil
1075
-
}
1076
-
1077
-
func (b *PostgresBackend) HandleCreateFollow(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
1078
-
var rec bsky.GraphFollow
1079
-
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
1080
-
return err
1081
-
}
1082
-
1083
-
if !b.anyRelevantIdents(repo.Did, rec.Subject) {
1084
-
return nil
1085
-
}
1086
-
1087
-
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
1088
-
if err != nil {
1089
-
return fmt.Errorf("invalid timestamp: %w", err)
1090
-
}
1091
-
1092
-
subj, err := b.getOrCreateRepo(ctx, rec.Subject)
1093
-
if err != nil {
1094
-
return err
1095
-
}
1096
-
1097
-
if _, err := b.pgx.Exec(ctx, "INSERT INTO follows (created, indexed, author, rkey, subject) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING", created.Time(), time.Now(), repo.ID, rkey, subj.ID); err != nil {
1098
-
return err
1099
-
}
1100
-
1101
-
return nil
1102
-
}
1103
-
1104
-
func (b *PostgresBackend) HandleCreateBlock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
1105
-
var rec bsky.GraphBlock
1106
-
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
1107
-
return err
1108
-
}
1109
-
1110
-
if !b.anyRelevantIdents(repo.Did, rec.Subject) {
1111
-
return nil
1112
-
}
1113
-
1114
-
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
1115
-
if err != nil {
1116
-
return fmt.Errorf("invalid timestamp: %w", err)
1117
-
}
1118
-
1119
-
subj, err := b.getOrCreateRepo(ctx, rec.Subject)
1120
-
if err != nil {
1121
-
return err
1122
-
}
1123
-
1124
-
if err := b.db.Create(&Block{
1125
-
Created: created.Time(),
1126
-
Indexed: time.Now(),
1127
-
Author: repo.ID,
1128
-
Rkey: rkey,
1129
-
Subject: subj.ID,
1130
-
}).Error; err != nil {
1131
-
return err
1132
-
}
1133
-
1134
-
return nil
1135
-
}
1136
-
1137
-
func (b *PostgresBackend) HandleCreateList(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
1138
-
var rec bsky.GraphList
1139
-
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
1140
-
return err
1141
-
}
1142
-
1143
-
if !b.anyRelevantIdents(repo.Did) {
1144
-
return nil
1145
-
}
1146
-
1147
-
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
1148
-
if err != nil {
1149
-
return fmt.Errorf("invalid timestamp: %w", err)
1150
-
}
1151
-
1152
-
if err := b.db.Create(&List{
1153
-
Created: created.Time(),
1154
-
Indexed: time.Now(),
1155
-
Author: repo.ID,
1156
-
Rkey: rkey,
1157
-
Raw: recb,
1158
-
}).Error; err != nil {
1159
-
return err
1160
-
}
1161
-
1162
-
return nil
1163
-
}
1164
-
1165
-
func (b *PostgresBackend) HandleCreateListitem(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
1166
-
var rec bsky.GraphListitem
1167
-
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
1168
-
return err
1169
-
}
1170
-
if !b.anyRelevantIdents(repo.Did) {
1171
-
return nil
1172
-
}
1173
-
1174
-
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
1175
-
if err != nil {
1176
-
return fmt.Errorf("invalid timestamp: %w", err)
1177
-
}
1178
-
1179
-
subj, err := b.getOrCreateRepo(ctx, rec.Subject)
1180
-
if err != nil {
1181
-
return err
1182
-
}
1183
-
1184
-
list, err := b.getOrCreateList(ctx, rec.List)
1185
-
if err != nil {
1186
-
return err
1187
-
}
1188
-
1189
-
if err := b.db.Create(&ListItem{
1190
-
Created: created.Time(),
1191
-
Indexed: time.Now(),
1192
-
Author: repo.ID,
1193
-
Rkey: rkey,
1194
-
Subject: subj.ID,
1195
-
List: list.ID,
1196
-
}).Error; err != nil {
1197
-
return err
1198
-
}
1199
-
1200
-
return nil
1201
-
}
1202
-
1203
-
func (b *PostgresBackend) HandleCreateListblock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
1204
-
var rec bsky.GraphListblock
1205
-
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
1206
-
return err
1207
-
}
1208
-
1209
-
if !b.anyRelevantIdents(repo.Did, rec.Subject) {
1210
-
return nil
1211
-
}
1212
-
1213
-
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
1214
-
if err != nil {
1215
-
return fmt.Errorf("invalid timestamp: %w", err)
1216
-
}
1217
-
1218
-
list, err := b.getOrCreateList(ctx, rec.Subject)
1219
-
if err != nil {
1220
-
return err
1221
-
}
1222
-
1223
-
if err := b.db.Create(&ListBlock{
1224
-
Created: created.Time(),
1225
-
Indexed: time.Now(),
1226
-
Author: repo.ID,
1227
-
Rkey: rkey,
1228
-
List: list.ID,
1229
-
}).Error; err != nil {
1230
-
return err
1231
-
}
1232
-
1233
-
return nil
1234
-
}
1235
-
1236
-
func (b *PostgresBackend) HandleCreateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error {
1237
-
if !b.anyRelevantIdents(repo.Did) {
1238
-
return nil
1239
-
}
1240
-
1241
-
if err := b.db.Create(&Profile{
1242
-
//Created: created.Time(),
1243
-
Indexed: time.Now(),
1244
-
Repo: repo.ID,
1245
-
Raw: recb,
1246
-
Rev: rev,
1247
-
}).Error; err != nil {
1248
-
return err
1249
-
}
1250
-
1251
-
return nil
1252
-
}
1253
-
1254
-
func (b *PostgresBackend) HandleUpdateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error {
1255
-
if !b.anyRelevantIdents(repo.Did) {
1256
-
return nil
1257
-
}
1258
-
1259
-
if err := b.db.Create(&Profile{
1260
-
Indexed: time.Now(),
1261
-
Repo: repo.ID,
1262
-
Raw: recb,
1263
-
Rev: rev,
1264
-
}).Error; err != nil {
1265
-
return err
1266
-
}
1267
-
1268
-
return nil
1269
-
}
1270
-
1271
-
func (b *PostgresBackend) HandleCreateFeedGenerator(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
1272
-
if !b.anyRelevantIdents(repo.Did) {
1273
-
return nil
1274
-
}
1275
-
1276
-
var rec bsky.FeedGenerator
1277
-
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
1278
-
return err
1279
-
}
1280
-
1281
-
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
1282
-
if err != nil {
1283
-
return fmt.Errorf("invalid timestamp: %w", err)
1284
-
}
1285
-
1286
-
if err := b.db.Create(&FeedGenerator{
1287
-
Created: created.Time(),
1288
-
Indexed: time.Now(),
1289
-
Author: repo.ID,
1290
-
Rkey: rkey,
1291
-
Did: rec.Did,
1292
-
}).Error; err != nil {
1293
-
return err
1294
-
}
1295
-
1296
-
return nil
1297
-
}
1298
-
1299
-
func (b *PostgresBackend) HandleCreateThreadgate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
1300
-
if !b.anyRelevantIdents(repo.Did) {
1301
-
return nil
1302
-
}
1303
-
var rec bsky.FeedThreadgate
1304
-
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
1305
-
return err
1306
-
}
1307
-
1308
-
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
1309
-
if err != nil {
1310
-
return fmt.Errorf("invalid timestamp: %w", err)
1311
-
}
330
+
const (
331
+
NotifKindReply = "reply"
332
+
NotifKindLike = "like"
333
+
NotifKindMention = "mention"
334
+
NotifKindRepost = "repost"
335
+
)
1312
336
1313
-
pid, err := b.postIDForUri(ctx, rec.Post)
1314
-
if err != nil {
1315
-
return err
1316
-
}
1317
-
1318
-
if err := b.db.Create(&ThreadGate{
1319
-
Created: created.Time(),
1320
-
Indexed: time.Now(),
1321
-
Author: repo.ID,
1322
-
Rkey: rkey,
1323
-
Post: pid,
1324
-
}).Error; err != nil {
1325
-
return err
1326
-
}
1327
-
1328
-
return nil
1329
-
}
1330
-
1331
-
func (b *PostgresBackend) HandleCreateChatDeclaration(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
1332
-
// TODO: maybe track these?
1333
-
return nil
1334
-
}
1335
-
1336
-
func (b *PostgresBackend) HandleCreatePostGate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
1337
-
if !b.anyRelevantIdents(repo.Did) {
1338
-
return nil
1339
-
}
1340
-
var rec bsky.FeedPostgate
1341
-
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
1342
-
return err
1343
-
}
1344
-
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
1345
-
if err != nil {
1346
-
return fmt.Errorf("invalid timestamp: %w", err)
1347
-
}
1348
-
1349
-
refPost, err := b.postInfoForUri(ctx, rec.Post)
1350
-
if err != nil {
1351
-
return err
1352
-
}
1353
-
1354
-
if err := b.db.Create(&PostGate{
1355
-
Created: created.Time(),
1356
-
Indexed: time.Now(),
1357
-
Author: repo.ID,
1358
-
Rkey: rkey,
1359
-
Subject: refPost.ID,
1360
-
Raw: recb,
1361
-
}).Error; err != nil {
1362
-
return err
1363
-
}
1364
-
1365
-
return nil
1366
-
}
1367
-
1368
-
func (b *PostgresBackend) HandleCreateStarterPack(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error {
1369
-
if !b.anyRelevantIdents(repo.Did) {
1370
-
return nil
1371
-
}
1372
-
var rec bsky.GraphStarterpack
1373
-
if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil {
1374
-
return err
1375
-
}
1376
-
created, err := syntax.ParseDatetimeLenient(rec.CreatedAt)
1377
-
if err != nil {
1378
-
return fmt.Errorf("invalid timestamp: %w", err)
1379
-
}
1380
-
1381
-
list, err := b.getOrCreateList(ctx, rec.List)
1382
-
if err != nil {
1383
-
return err
1384
-
}
1385
-
1386
-
if err := b.db.Create(&StarterPack{
1387
-
Created: created.Time(),
1388
-
Indexed: time.Now(),
1389
-
Author: repo.ID,
1390
-
Rkey: rkey,
1391
-
Raw: recb,
1392
-
List: list.ID,
1393
-
}).Error; err != nil {
1394
-
return err
1395
-
}
1396
-
1397
-
return nil
1398
-
}
1399
-
1400
-
func (b *PostgresBackend) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
1401
-
start := time.Now()
1402
-
1403
-
rr, err := b.getOrCreateRepo(ctx, repo)
1404
-
if err != nil {
1405
-
return fmt.Errorf("get user failed: %w", err)
1406
-
}
1407
-
1408
-
lrev, err := b.revForRepo(rr)
1409
-
if err != nil {
1410
-
return err
1411
-
}
1412
-
if lrev != "" {
1413
-
if rev < lrev {
1414
-
//slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path)
1415
-
return nil
1416
-
}
1417
-
}
1418
-
1419
-
parts := strings.Split(path, "/")
1420
-
if len(parts) != 2 {
1421
-
return fmt.Errorf("invalid path in HandleCreate: %q", path)
1422
-
}
1423
-
col := parts[0]
1424
-
rkey := parts[1]
1425
-
1426
-
defer func() {
1427
-
handleOpHist.WithLabelValues("update", col).Observe(float64(time.Since(start).Milliseconds()))
1428
-
}()
1429
-
1430
-
if rkey == "" {
1431
-
fmt.Printf("messed up path: %q\n", rkey)
1432
-
}
1433
-
1434
-
switch col {
1435
-
/*
1436
-
case "app.bsky.feed.post":
1437
-
if err := s.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil {
1438
-
return err
1439
-
}
1440
-
case "app.bsky.feed.like":
1441
-
if err := s.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil {
1442
-
return err
1443
-
}
1444
-
case "app.bsky.feed.repost":
1445
-
if err := s.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil {
1446
-
return err
1447
-
}
1448
-
case "app.bsky.graph.follow":
1449
-
if err := s.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil {
1450
-
return err
1451
-
}
1452
-
case "app.bsky.graph.block":
1453
-
if err := s.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil {
1454
-
return err
1455
-
}
1456
-
case "app.bsky.graph.list":
1457
-
if err := s.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil {
1458
-
return err
1459
-
}
1460
-
case "app.bsky.graph.listitem":
1461
-
if err := s.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil {
1462
-
return err
1463
-
}
1464
-
case "app.bsky.graph.listblock":
1465
-
if err := s.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil {
1466
-
return err
1467
-
}
1468
-
*/
1469
-
case "app.bsky.actor.profile":
1470
-
if err := b.HandleUpdateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil {
1471
-
return err
1472
-
}
1473
-
/*
1474
-
case "app.bsky.feed.generator":
1475
-
if err := s.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil {
1476
-
return err
1477
-
}
1478
-
case "app.bsky.feed.threadgate":
1479
-
if err := s.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil {
1480
-
return err
1481
-
}
1482
-
case "chat.bsky.actor.declaration":
1483
-
if err := s.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil {
1484
-
return err
1485
-
}
1486
-
*/
1487
-
default:
1488
-
slog.Debug("unrecognized record type in update", "repo", repo, "path", path, "rev", rev)
1489
-
}
1490
-
1491
-
return nil
1492
-
}
1493
-
1494
-
func (b *PostgresBackend) HandleDelete(ctx context.Context, repo string, rev string, path string) error {
1495
-
start := time.Now()
1496
-
1497
-
rr, err := b.getOrCreateRepo(ctx, repo)
1498
-
if err != nil {
1499
-
return fmt.Errorf("get user failed: %w", err)
1500
-
}
1501
-
1502
-
lrev, ok := b.revCache.Get(rr.ID)
1503
-
if ok {
1504
-
if rev < lrev {
1505
-
//slog.Info("skipping old rev delete", "did", rr.Did, "rev", rev, "oldrev", lrev)
1506
-
return nil
1507
-
}
1508
-
}
1509
-
1510
-
parts := strings.Split(path, "/")
1511
-
if len(parts) != 2 {
1512
-
return fmt.Errorf("invalid path in HandleDelete: %q", path)
1513
-
}
1514
-
col := parts[0]
1515
-
rkey := parts[1]
1516
-
1517
-
defer func() {
1518
-
handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds()))
1519
-
}()
1520
-
1521
-
switch col {
1522
-
case "app.bsky.feed.post":
1523
-
if err := b.HandleDeletePost(ctx, rr, rkey); err != nil {
1524
-
return err
1525
-
}
1526
-
case "app.bsky.feed.like":
1527
-
if err := b.HandleDeleteLike(ctx, rr, rkey); err != nil {
1528
-
return err
1529
-
}
1530
-
case "app.bsky.feed.repost":
1531
-
if err := b.HandleDeleteRepost(ctx, rr, rkey); err != nil {
1532
-
return err
1533
-
}
1534
-
case "app.bsky.graph.follow":
1535
-
if err := b.HandleDeleteFollow(ctx, rr, rkey); err != nil {
1536
-
return err
1537
-
}
1538
-
case "app.bsky.graph.block":
1539
-
if err := b.HandleDeleteBlock(ctx, rr, rkey); err != nil {
1540
-
return err
1541
-
}
1542
-
case "app.bsky.graph.list":
1543
-
if err := b.HandleDeleteList(ctx, rr, rkey); err != nil {
1544
-
return err
1545
-
}
1546
-
case "app.bsky.graph.listitem":
1547
-
if err := b.HandleDeleteListitem(ctx, rr, rkey); err != nil {
1548
-
return err
1549
-
}
1550
-
case "app.bsky.graph.listblock":
1551
-
if err := b.HandleDeleteListblock(ctx, rr, rkey); err != nil {
1552
-
return err
1553
-
}
1554
-
case "app.bsky.actor.profile":
1555
-
if err := b.HandleDeleteProfile(ctx, rr, rkey); err != nil {
1556
-
return err
1557
-
}
1558
-
case "app.bsky.feed.generator":
1559
-
if err := b.HandleDeleteFeedGenerator(ctx, rr, rkey); err != nil {
1560
-
return err
1561
-
}
1562
-
case "app.bsky.feed.threadgate":
1563
-
if err := b.HandleDeleteThreadgate(ctx, rr, rkey); err != nil {
1564
-
return err
1565
-
}
1566
-
default:
1567
-
slog.Warn("delete unrecognized record type", "repo", repo, "path", path, "rev", rev)
1568
-
}
1569
-
1570
-
b.revCache.Add(rr.ID, rev)
1571
-
return nil
1572
-
}
1573
-
1574
-
func (b *PostgresBackend) HandleDeletePost(ctx context.Context, repo *Repo, rkey string) error {
1575
-
var p Post
1576
-
if err := b.db.Find(&p, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1577
-
return err
1578
-
}
1579
-
1580
-
if p.ID == 0 {
1581
-
//slog.Warn("delete of unknown post record", "repo", repo.Did, "rkey", rkey)
1582
-
return nil
1583
-
}
1584
-
1585
-
if err := b.db.Delete(&Post{}, p.ID).Error; err != nil {
1586
-
return err
1587
-
}
1588
-
1589
-
return nil
1590
-
}
1591
-
1592
-
func (b *PostgresBackend) HandleDeleteLike(ctx context.Context, repo *Repo, rkey string) error {
1593
-
var like Like
1594
-
if err := b.db.Find(&like, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1595
-
return err
1596
-
}
1597
-
1598
-
if like.ID == 0 {
1599
-
//slog.Warn("delete of missing like", "repo", repo.Did, "rkey", rkey)
1600
-
return nil
1601
-
}
1602
-
1603
-
if err := b.db.Exec("DELETE FROM likes WHERE id = ?", like.ID).Error; err != nil {
1604
-
return err
1605
-
}
1606
-
1607
-
return nil
1608
-
}
1609
-
1610
-
func (b *PostgresBackend) HandleDeleteRepost(ctx context.Context, repo *Repo, rkey string) error {
1611
-
var repost Repost
1612
-
if err := b.db.Find(&repost, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1613
-
return err
1614
-
}
1615
-
1616
-
if repost.ID == 0 {
1617
-
//return fmt.Errorf("delete of missing repost: %s %s", repo.Did, rkey)
1618
-
return nil
1619
-
}
1620
-
1621
-
if err := b.db.Exec("DELETE FROM reposts WHERE id = ?", repost.ID).Error; err != nil {
1622
-
return err
1623
-
}
1624
-
1625
-
return nil
1626
-
}
1627
-
1628
-
func (b *PostgresBackend) HandleDeleteFollow(ctx context.Context, repo *Repo, rkey string) error {
1629
-
var follow Follow
1630
-
if err := b.db.Find(&follow, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1631
-
return err
1632
-
}
1633
-
1634
-
if follow.ID == 0 {
1635
-
//slog.Warn("delete of missing follow", "repo", repo.Did, "rkey", rkey)
1636
-
return nil
1637
-
}
1638
-
1639
-
if err := b.db.Exec("DELETE FROM follows WHERE id = ?", follow.ID).Error; err != nil {
1640
-
return err
1641
-
}
1642
-
1643
-
return nil
1644
-
}
1645
-
1646
-
func (b *PostgresBackend) HandleDeleteBlock(ctx context.Context, repo *Repo, rkey string) error {
1647
-
var block Block
1648
-
if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1649
-
return err
1650
-
}
1651
-
1652
-
if block.ID == 0 {
1653
-
//slog.Warn("delete of missing block", "repo", repo.Did, "rkey", rkey)
1654
-
return nil
1655
-
}
1656
-
1657
-
if err := b.db.Exec("DELETE FROM blocks WHERE id = ?", block.ID).Error; err != nil {
1658
-
return err
1659
-
}
1660
-
1661
-
return nil
1662
-
}
1663
-
1664
-
func (b *PostgresBackend) HandleDeleteList(ctx context.Context, repo *Repo, rkey string) error {
1665
-
var list List
1666
-
if err := b.db.Find(&list, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1667
-
return err
1668
-
}
1669
-
1670
-
if list.ID == 0 {
1671
-
return nil
1672
-
//return fmt.Errorf("delete of missing list: %s %s", repo.Did, rkey)
1673
-
}
1674
-
1675
-
if err := b.db.Exec("DELETE FROM lists WHERE id = ?", list.ID).Error; err != nil {
1676
-
return err
1677
-
}
1678
-
1679
-
return nil
1680
-
}
1681
-
1682
-
func (b *PostgresBackend) HandleDeleteListitem(ctx context.Context, repo *Repo, rkey string) error {
1683
-
var item ListItem
1684
-
if err := b.db.Find(&item, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1685
-
return err
1686
-
}
1687
-
1688
-
if item.ID == 0 {
1689
-
return nil
1690
-
//return fmt.Errorf("delete of missing listitem: %s %s", repo.Did, rkey)
1691
-
}
1692
-
1693
-
if err := b.db.Exec("DELETE FROM list_items WHERE id = ?", item.ID).Error; err != nil {
1694
-
return err
1695
-
}
1696
-
1697
-
return nil
1698
-
}
1699
-
1700
-
func (b *PostgresBackend) HandleDeleteListblock(ctx context.Context, repo *Repo, rkey string) error {
1701
-
var block ListBlock
1702
-
if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1703
-
return err
1704
-
}
1705
-
1706
-
if block.ID == 0 {
1707
-
return nil
1708
-
//return fmt.Errorf("delete of missing listblock: %s %s", repo.Did, rkey)
1709
-
}
1710
-
1711
-
if err := b.db.Exec("DELETE FROM list_blocks WHERE id = ?", block.ID).Error; err != nil {
1712
-
return err
1713
-
}
1714
-
1715
-
return nil
1716
-
}
1717
-
1718
-
func (b *PostgresBackend) HandleDeleteFeedGenerator(ctx context.Context, repo *Repo, rkey string) error {
1719
-
var feedgen FeedGenerator
1720
-
if err := b.db.Find(&feedgen, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1721
-
return err
1722
-
}
1723
-
1724
-
if feedgen.ID == 0 {
1725
-
return nil
1726
-
//return fmt.Errorf("delete of missing feedgen: %s %s", repo.Did, rkey)
1727
-
}
1728
-
1729
-
if err := b.db.Exec("DELETE FROM feed_generators WHERE id = ?", feedgen.ID).Error; err != nil {
1730
-
return err
1731
-
}
1732
-
1733
-
return nil
1734
-
}
1735
-
1736
-
func (b *PostgresBackend) HandleDeleteThreadgate(ctx context.Context, repo *Repo, rkey string) error {
1737
-
var threadgate ThreadGate
1738
-
if err := b.db.Find(&threadgate, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil {
1739
-
return err
1740
-
}
1741
-
1742
-
if threadgate.ID == 0 {
1743
-
return nil
1744
-
//return fmt.Errorf("delete of missing threadgate: %s %s", repo.Did, rkey)
1745
-
}
1746
-
1747
-
if err := b.db.Exec("DELETE FROM thread_gates WHERE id = ?", threadgate.ID).Error; err != nil {
1748
-
return err
1749
-
}
1750
-
1751
-
return nil
1752
-
}
1753
-
1754
-
func (b *PostgresBackend) HandleDeleteProfile(ctx context.Context, repo *Repo, rkey string) error {
1755
-
var profile Profile
1756
-
if err := b.db.Find(&profile, "repo = ?", repo.ID).Error; err != nil {
1757
-
return err
1758
-
}
1759
-
1760
-
if profile.ID == 0 {
1761
-
return nil
1762
-
}
1763
-
1764
-
if err := b.db.Exec("DELETE FROM profiles WHERE id = ?", profile.ID).Error; err != nil {
1765
-
return err
1766
-
}
1767
-
1768
-
return nil
1769
-
}
1770
-
1771
-
func (b *PostgresBackend) getRepoByID(ctx context.Context, id uint) (*models.Repo, error) {
1772
-
var r models.Repo
1773
-
if err := b.db.Find(&r, "id = ?", id).Error; err != nil {
1774
-
return nil, err
1775
-
}
1776
-
1777
-
return &r, nil
337
+
func (s *Server) AddNotification(ctx context.Context, forUser, author uint, recordUri string, kind string) error {
338
+
return s.backend.db.Create(&Notification{
339
+
For: forUser,
340
+
Author: author,
341
+
Source: recordUri,
342
+
Kind: kind,
343
+
}).Error
1778
344
}
+10
models.go
+10
models.go
···
4
4
"time"
5
5
6
6
"github.com/whyrusleeping/market/models"
7
+
"gorm.io/gorm"
7
8
)
8
9
9
10
type Repo = models.Repo
···
30
31
Subject uint
31
32
Cid string
32
33
}
34
+
35
+
type Notification struct {
36
+
gorm.Model
37
+
For uint
38
+
39
+
Author uint
40
+
Source string
41
+
Kind string
42
+
}
+402
pgbackend.go
+402
pgbackend.go
···
1
+
package main
2
+
3
+
import (
4
+
"context"
5
+
"errors"
6
+
"fmt"
7
+
"strings"
8
+
"time"
9
+
10
+
"github.com/bluesky-social/indigo/api/atproto"
11
+
"github.com/bluesky-social/indigo/api/bsky"
12
+
"github.com/bluesky-social/indigo/atproto/syntax"
13
+
"github.com/bluesky-social/indigo/util"
14
+
"github.com/jackc/pgx/v5"
15
+
"github.com/jackc/pgx/v5/pgconn"
16
+
"github.com/whyrusleeping/market/models"
17
+
"gorm.io/gorm"
18
+
"gorm.io/gorm/clause"
19
+
"gorm.io/gorm/logger"
20
+
)
21
+
22
+
func (b *PostgresBackend) getOrCreateRepo(ctx context.Context, did string) (*Repo, error) {
23
+
r, ok := b.repoCache.Get(did)
24
+
if !ok {
25
+
b.reposLk.Lock()
26
+
27
+
r, ok = b.repoCache.Get(did)
28
+
if !ok {
29
+
r = &Repo{}
30
+
r.Did = did
31
+
b.repoCache.Add(did, r)
32
+
}
33
+
34
+
b.reposLk.Unlock()
35
+
}
36
+
37
+
r.Lk.Lock()
38
+
defer r.Lk.Unlock()
39
+
if r.Setup {
40
+
return r, nil
41
+
}
42
+
43
+
row := b.pgx.QueryRow(ctx, "SELECT id, created_at, did FROM repos WHERE did = $1", did)
44
+
45
+
err := row.Scan(&r.ID, &r.CreatedAt, &r.Did)
46
+
if err == nil {
47
+
// found it!
48
+
r.Setup = true
49
+
return r, nil
50
+
}
51
+
52
+
if err != pgx.ErrNoRows {
53
+
return nil, err
54
+
}
55
+
56
+
r.Did = did
57
+
if err := b.db.Create(r).Error; err != nil {
58
+
return nil, err
59
+
}
60
+
61
+
r.Setup = true
62
+
63
+
return r, nil
64
+
}
65
+
66
+
func (b *PostgresBackend) getOrCreateList(ctx context.Context, uri string) (*List, error) {
67
+
puri, err := util.ParseAtUri(uri)
68
+
if err != nil {
69
+
return nil, err
70
+
}
71
+
72
+
r, err := b.getOrCreateRepo(ctx, puri.Did)
73
+
if err != nil {
74
+
return nil, err
75
+
}
76
+
77
+
// TODO: needs upsert treatment when we actually find the list
78
+
var list List
79
+
if err := b.db.FirstOrCreate(&list, map[string]any{
80
+
"author": r.ID,
81
+
"rkey": puri.Rkey,
82
+
}).Error; err != nil {
83
+
return nil, err
84
+
}
85
+
return &list, nil
86
+
}
87
+
88
+
type cachedPostInfo struct {
89
+
ID uint
90
+
Author uint
91
+
}
92
+
93
+
func (b *PostgresBackend) postIDForUri(ctx context.Context, uri string) (uint, error) {
94
+
// getPostByUri implicitly fills the cache
95
+
p, err := b.postInfoForUri(ctx, uri)
96
+
if err != nil {
97
+
return 0, err
98
+
}
99
+
100
+
return p.ID, nil
101
+
}
102
+
103
+
func (b *PostgresBackend) postInfoForUri(ctx context.Context, uri string) (cachedPostInfo, error) {
104
+
v, ok := b.postInfoCache.Get(uri)
105
+
if ok {
106
+
return v, nil
107
+
}
108
+
109
+
// getPostByUri implicitly fills the cache
110
+
p, err := b.getOrCreatePostBare(ctx, uri)
111
+
if err != nil {
112
+
return cachedPostInfo{}, err
113
+
}
114
+
115
+
return cachedPostInfo{ID: p.ID, Author: p.Author}, nil
116
+
}
117
+
118
+
func (b *PostgresBackend) tryLoadPostInfo(ctx context.Context, uid uint, rkey string) (*Post, error) {
119
+
var p Post
120
+
q := "SELECT id, author FROM posts WHERE author = $1 AND rkey = $2"
121
+
if err := b.pgx.QueryRow(ctx, q, uid, rkey).Scan(&p.ID, &p.Author); err != nil {
122
+
if errors.Is(err, pgx.ErrNoRows) {
123
+
return nil, nil
124
+
}
125
+
return nil, err
126
+
}
127
+
128
+
return &p, nil
129
+
}
130
+
131
+
func (b *PostgresBackend) getOrCreatePostBare(ctx context.Context, uri string) (*Post, error) {
132
+
puri, err := util.ParseAtUri(uri)
133
+
if err != nil {
134
+
return nil, err
135
+
}
136
+
137
+
r, err := b.getOrCreateRepo(ctx, puri.Did)
138
+
if err != nil {
139
+
return nil, err
140
+
}
141
+
142
+
post, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey)
143
+
if err != nil {
144
+
return nil, err
145
+
}
146
+
147
+
if post == nil {
148
+
post = &Post{
149
+
Rkey: puri.Rkey,
150
+
Author: r.ID,
151
+
NotFound: true,
152
+
}
153
+
154
+
err := b.pgx.QueryRow(ctx, "INSERT INTO posts (rkey, author, not_found) VALUES ($1, $2, $3) RETURNING id", puri.Rkey, r.ID, true).Scan(&post.ID)
155
+
if err != nil {
156
+
pgErr, ok := err.(*pgconn.PgError)
157
+
if !ok || pgErr.Code != "23505" {
158
+
return nil, err
159
+
}
160
+
161
+
out, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey)
162
+
if err != nil {
163
+
return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err)
164
+
}
165
+
if out == nil {
166
+
return nil, fmt.Errorf("postgres is lying to us: %d %s", r.ID, puri.Rkey)
167
+
}
168
+
169
+
post = out
170
+
}
171
+
172
+
}
173
+
174
+
b.postInfoCache.Add(uri, cachedPostInfo{
175
+
ID: post.ID,
176
+
Author: post.Author,
177
+
})
178
+
179
+
return post, nil
180
+
}
181
+
182
+
func (b *PostgresBackend) getPostByUri(ctx context.Context, uri string, fields string) (*Post, error) {
183
+
puri, err := util.ParseAtUri(uri)
184
+
if err != nil {
185
+
return nil, err
186
+
}
187
+
188
+
r, err := b.getOrCreateRepo(ctx, puri.Did)
189
+
if err != nil {
190
+
return nil, err
191
+
}
192
+
193
+
q := "SELECT " + fields + " FROM posts WHERE author = ? AND rkey = ?"
194
+
195
+
var post Post
196
+
if err := b.db.Raw(q, r.ID, puri.Rkey).Scan(&post).Error; err != nil {
197
+
return nil, err
198
+
}
199
+
200
+
if post.ID == 0 {
201
+
post.Rkey = puri.Rkey
202
+
post.Author = r.ID
203
+
post.NotFound = true
204
+
205
+
if err := b.db.Session(&gorm.Session{
206
+
Logger: logger.Default.LogMode(logger.Silent),
207
+
}).Create(&post).Error; err != nil {
208
+
if !errors.Is(err, gorm.ErrDuplicatedKey) {
209
+
return nil, err
210
+
}
211
+
if err := b.db.Find(&post, "author = ? AND rkey = ?", r.ID, puri.Rkey).Error; err != nil {
212
+
return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err)
213
+
}
214
+
}
215
+
216
+
}
217
+
218
+
b.postInfoCache.Add(uri, cachedPostInfo{
219
+
ID: post.ID,
220
+
Author: post.Author,
221
+
})
222
+
223
+
return &post, nil
224
+
}
225
+
226
+
func (b *PostgresBackend) revForRepo(rr *Repo) (string, error) {
227
+
lrev, ok := b.revCache.Get(rr.ID)
228
+
if ok {
229
+
return lrev, nil
230
+
}
231
+
232
+
var rev string
233
+
if err := b.pgx.QueryRow(context.TODO(), "SELECT COALESCE(rev, '') FROM sync_infos WHERE repo = $1", rr.ID).Scan(&rev); err != nil {
234
+
if errors.Is(err, pgx.ErrNoRows) {
235
+
return "", nil
236
+
}
237
+
return "", err
238
+
}
239
+
240
+
if rev != "" {
241
+
b.revCache.Add(rr.ID, rev)
242
+
}
243
+
return rev, nil
244
+
}
245
+
246
+
func (b *PostgresBackend) ensureFollowsScraped(ctx context.Context, user string) error {
247
+
r, err := b.getOrCreateRepo(ctx, user)
248
+
if err != nil {
249
+
return err
250
+
}
251
+
252
+
var si SyncInfo
253
+
if err := b.db.Find(&si, "repo = ?", r.ID).Error; err != nil {
254
+
return err
255
+
}
256
+
257
+
// not found
258
+
if si.Repo == 0 {
259
+
if err := b.db.Create(&SyncInfo{
260
+
Repo: r.ID,
261
+
}).Error; err != nil {
262
+
return err
263
+
}
264
+
}
265
+
266
+
if si.FollowsSynced {
267
+
return nil
268
+
}
269
+
270
+
var follows []Follow
271
+
var cursor string
272
+
for {
273
+
resp, err := atproto.RepoListRecords(ctx, b.s.client, "app.bsky.graph.follow", cursor, 100, b.s.mydid, false)
274
+
if err != nil {
275
+
return err
276
+
}
277
+
278
+
for _, rec := range resp.Records {
279
+
if fol, ok := rec.Value.Val.(*bsky.GraphFollow); ok {
280
+
fr, err := b.getOrCreateRepo(ctx, fol.Subject)
281
+
if err != nil {
282
+
return err
283
+
}
284
+
285
+
puri, err := syntax.ParseATURI(rec.Uri)
286
+
if err != nil {
287
+
return err
288
+
}
289
+
290
+
follows = append(follows, Follow{
291
+
Created: time.Now(),
292
+
Indexed: time.Now(),
293
+
Rkey: puri.RecordKey().String(),
294
+
Author: r.ID,
295
+
Subject: fr.ID,
296
+
})
297
+
}
298
+
}
299
+
300
+
if resp.Cursor == nil || len(resp.Records) == 0 {
301
+
break
302
+
}
303
+
cursor = *resp.Cursor
304
+
}
305
+
306
+
if err := b.db.Clauses(clause.OnConflict{DoNothing: true}).CreateInBatches(follows, 200).Error; err != nil {
307
+
return err
308
+
}
309
+
310
+
if err := b.db.Model(SyncInfo{}).Where("repo = ?", r.ID).Update("follows_synced", true).Error; err != nil {
311
+
return err
312
+
}
313
+
314
+
fmt.Println("Got follows: ", len(follows))
315
+
316
+
return nil
317
+
}
318
+
319
+
func (b *PostgresBackend) loadRelevantDids() error {
320
+
ctx := context.TODO()
321
+
322
+
if err := b.ensureFollowsScraped(ctx, b.s.mydid); err != nil {
323
+
return fmt.Errorf("failed to scrape follows: %w", err)
324
+
}
325
+
326
+
r, err := b.getOrCreateRepo(ctx, b.s.mydid)
327
+
if err != nil {
328
+
return err
329
+
}
330
+
331
+
var dids []string
332
+
if err := b.db.Raw("select did from follows left join repos on follows.subject = repos.id where follows.author = ?", r.ID).Scan(&dids).Error; err != nil {
333
+
return err
334
+
}
335
+
336
+
b.relevantDids[b.s.mydid] = true
337
+
for _, d := range dids {
338
+
fmt.Println("adding did: ", d)
339
+
b.relevantDids[d] = true
340
+
}
341
+
342
+
return nil
343
+
}
344
+
345
+
type SyncInfo struct {
346
+
Repo uint `gorm:"index"`
347
+
FollowsSynced bool
348
+
Rev string
349
+
}
350
+
351
+
func (b *PostgresBackend) checkPostExists(ctx context.Context, repo *Repo, rkey string) (bool, error) {
352
+
var id uint
353
+
var notfound bool
354
+
if err := b.pgx.QueryRow(ctx, "SELECT id, not_found FROM posts WHERE author = $1 AND rkey = $2", repo.ID, rkey).Scan(&id, ¬found); err != nil {
355
+
if errors.Is(err, pgx.ErrNoRows) {
356
+
return false, nil
357
+
}
358
+
return false, err
359
+
}
360
+
361
+
if id != 0 && !notfound {
362
+
return true, nil
363
+
}
364
+
365
+
return false, nil
366
+
}
367
+
368
+
func (b *PostgresBackend) didIsRelevant(did string) bool {
369
+
b.rdLk.Lock()
370
+
defer b.rdLk.Unlock()
371
+
return b.relevantDids[did]
372
+
}
373
+
374
+
func (b *PostgresBackend) anyRelevantIdents(idents ...string) bool {
375
+
for _, id := range idents {
376
+
if strings.HasPrefix(id, "did:") {
377
+
if b.didIsRelevant(id) {
378
+
return true
379
+
}
380
+
} else if strings.HasPrefix(id, "at://") {
381
+
puri, err := syntax.ParseATURI(id)
382
+
if err != nil {
383
+
continue
384
+
}
385
+
386
+
if b.didIsRelevant(puri.Authority().String()) {
387
+
return true
388
+
}
389
+
}
390
+
}
391
+
392
+
return false
393
+
}
394
+
395
+
func (b *PostgresBackend) getRepoByID(ctx context.Context, id uint) (*models.Repo, error) {
396
+
var r models.Repo
397
+
if err := b.db.Find(&r, "id = ?", id).Error; err != nil {
398
+
return nil, err
399
+
}
400
+
401
+
return &r, nil
402
+
}