A locally focused bluesky appview
1package backend
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "strings"
8 "sync"
9 "time"
10
11 "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/api/bsky"
13 "github.com/bluesky-social/indigo/atproto/identity"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/util"
16 "github.com/bluesky-social/indigo/xrpc"
17 lru "github.com/hashicorp/golang-lru/v2"
18 "github.com/jackc/pgx/v5"
19 "github.com/jackc/pgx/v5/pgconn"
20 "github.com/jackc/pgx/v5/pgxpool"
21 . "github.com/whyrusleeping/konbini/models"
22 "github.com/whyrusleeping/market/models"
23 "gorm.io/gorm"
24 "gorm.io/gorm/clause"
25 "gorm.io/gorm/logger"
26)
27
28// PostgresBackend handles database operations
29type PostgresBackend struct {
30 db *gorm.DB
31 pgx *pgxpool.Pool
32
33 dir identity.Directory
34
35 client *xrpc.Client
36
37 mydid string
38 myrepo *models.Repo
39
40 relevantDids map[string]bool
41 rdLk sync.Mutex
42
43 revCache *lru.TwoQueueCache[uint, string]
44
45 repoCache *lru.TwoQueueCache[string, *Repo]
46 reposLk sync.Mutex
47
48 didByIDCache *lru.TwoQueueCache[uint, string]
49
50 postInfoCache *lru.TwoQueueCache[string, cachedPostInfo]
51
52 missingRecords chan MissingRecord
53}
54
55type cachedPostInfo struct {
56 ID uint
57 Author uint
58}
59
60// NewPostgresBackend creates a new PostgresBackend
61func NewPostgresBackend(mydid string, db *gorm.DB, pgx *pgxpool.Pool, client *xrpc.Client, dir identity.Directory) (*PostgresBackend, error) {
62 rc, _ := lru.New2Q[string, *Repo](1_000_000)
63 pc, _ := lru.New2Q[string, cachedPostInfo](1_000_000)
64 revc, _ := lru.New2Q[uint, string](1_000_000)
65 dbic, _ := lru.New2Q[uint, string](1_000_000)
66
67 b := &PostgresBackend{
68 client: client,
69 mydid: mydid,
70 db: db,
71 pgx: pgx,
72 relevantDids: make(map[string]bool),
73 repoCache: rc,
74 postInfoCache: pc,
75 revCache: revc,
76 didByIDCache: dbic,
77 dir: dir,
78
79 missingRecords: make(chan MissingRecord, 1000),
80 }
81
82 r, err := b.GetOrCreateRepo(context.TODO(), mydid)
83 if err != nil {
84 return nil, err
85 }
86
87 b.myrepo = r
88
89 go b.missingRecordFetcher()
90 return b, nil
91}
92
93// TrackMissingRecord implements the RecordTracker interface
94func (b *PostgresBackend) TrackMissingRecord(identifier string, wait bool) {
95 mr := MissingRecord{
96 Type: mrTypeFromIdent(identifier),
97 Identifier: identifier,
98 Wait: wait,
99 }
100
101 b.addMissingRecord(context.TODO(), mr)
102}
103
104func mrTypeFromIdent(ident string) MissingRecordType {
105 if strings.HasPrefix(ident, "did:") {
106 return MissingRecordTypeProfile
107 }
108
109 puri, _ := syntax.ParseATURI(ident)
110 switch puri.Collection().String() {
111 case "app.bsky.feed.post":
112 return MissingRecordTypePost
113 case "app.bsky.feed.generator":
114 return MissingRecordTypeFeedGenerator
115 default:
116 return MissingRecordTypeUnknown
117 }
118
119}
120
121// DidToID converts a DID to a database ID
122func (b *PostgresBackend) DidToID(ctx context.Context, did string) (uint, error) {
123 r, err := b.GetOrCreateRepo(ctx, did)
124 if err != nil {
125 return 0, err
126 }
127 return r.ID, nil
128}
129
130func (b *PostgresBackend) GetOrCreateRepo(ctx context.Context, did string) (*Repo, error) {
131 r, ok := b.repoCache.Get(did)
132 if !ok {
133 b.reposLk.Lock()
134
135 r, ok = b.repoCache.Get(did)
136 if !ok {
137 r = &Repo{}
138 r.Did = did
139 b.repoCache.Add(did, r)
140 }
141
142 b.reposLk.Unlock()
143 }
144
145 r.Lk.Lock()
146 defer r.Lk.Unlock()
147 if r.Setup {
148 return r, nil
149 }
150
151 row := b.pgx.QueryRow(ctx, "SELECT id, created_at, did FROM repos WHERE did = $1", did)
152
153 err := row.Scan(&r.ID, &r.CreatedAt, &r.Did)
154 if err == nil {
155 // found it!
156 r.Setup = true
157 return r, nil
158 }
159
160 if err != pgx.ErrNoRows {
161 return nil, err
162 }
163
164 r.Did = did
165 if err := b.db.Create(r).Error; err != nil {
166 return nil, err
167 }
168
169 r.Setup = true
170
171 return r, nil
172}
173
174func (b *PostgresBackend) GetOrCreateList(ctx context.Context, uri string) (*List, error) {
175 puri, err := util.ParseAtUri(uri)
176 if err != nil {
177 return nil, err
178 }
179
180 r, err := b.GetOrCreateRepo(ctx, puri.Did)
181 if err != nil {
182 return nil, err
183 }
184
185 // TODO: needs upsert treatment when we actually find the list
186 var list List
187 if err := b.db.FirstOrCreate(&list, map[string]any{
188 "author": r.ID,
189 "rkey": puri.Rkey,
190 }).Error; err != nil {
191 return nil, err
192 }
193 return &list, nil
194}
195
196func (b *PostgresBackend) postIDForUri(ctx context.Context, uri string) (uint, error) {
197 // getPostByUri implicitly fills the cache
198 p, err := b.postInfoForUri(ctx, uri)
199 if err != nil {
200 return 0, err
201 }
202
203 return p.ID, nil
204}
205
206func (b *PostgresBackend) postInfoForUri(ctx context.Context, uri string) (cachedPostInfo, error) {
207 v, ok := b.postInfoCache.Get(uri)
208 if ok {
209 return v, nil
210 }
211
212 // getPostByUri implicitly fills the cache
213 p, err := b.getOrCreatePostBare(ctx, uri)
214 if err != nil {
215 return cachedPostInfo{}, err
216 }
217
218 return cachedPostInfo{ID: p.ID, Author: p.Author}, nil
219}
220
221func (b *PostgresBackend) tryLoadPostInfo(ctx context.Context, uid uint, rkey string) (*Post, error) {
222 var p Post
223 q := "SELECT id, author FROM posts WHERE author = $1 AND rkey = $2"
224 if err := b.pgx.QueryRow(ctx, q, uid, rkey).Scan(&p.ID, &p.Author); err != nil {
225 if errors.Is(err, pgx.ErrNoRows) {
226 return nil, nil
227 }
228 return nil, err
229 }
230
231 return &p, nil
232}
233
234func (b *PostgresBackend) getOrCreatePostBare(ctx context.Context, uri string) (*Post, error) {
235 puri, err := util.ParseAtUri(uri)
236 if err != nil {
237 return nil, err
238 }
239
240 r, err := b.GetOrCreateRepo(ctx, puri.Did)
241 if err != nil {
242 return nil, err
243 }
244
245 post, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey)
246 if err != nil {
247 return nil, err
248 }
249
250 if post == nil {
251 post = &Post{
252 Rkey: puri.Rkey,
253 Author: r.ID,
254 NotFound: true,
255 }
256
257 err := b.pgx.QueryRow(ctx, "INSERT INTO posts (rkey, author, not_found) VALUES ($1, $2, $3) RETURNING id", puri.Rkey, r.ID, true).Scan(&post.ID)
258 if err != nil {
259 pgErr, ok := err.(*pgconn.PgError)
260 if !ok || pgErr.Code != "23505" {
261 return nil, err
262 }
263
264 out, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey)
265 if err != nil {
266 return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err)
267 }
268 if out == nil {
269 return nil, fmt.Errorf("postgres is lying to us: %d %s", r.ID, puri.Rkey)
270 }
271
272 post = out
273 }
274
275 }
276
277 b.postInfoCache.Add(uri, cachedPostInfo{
278 ID: post.ID,
279 Author: post.Author,
280 })
281
282 return post, nil
283}
284
285func (b *PostgresBackend) GetPostByUri(ctx context.Context, uri string, fields string) (*Post, error) {
286 puri, err := util.ParseAtUri(uri)
287 if err != nil {
288 return nil, err
289 }
290
291 r, err := b.GetOrCreateRepo(ctx, puri.Did)
292 if err != nil {
293 return nil, err
294 }
295
296 q := "SELECT " + fields + " FROM posts WHERE author = ? AND rkey = ?"
297
298 var post Post
299 if err := b.db.Raw(q, r.ID, puri.Rkey).Scan(&post).Error; err != nil {
300 return nil, err
301 }
302
303 if post.ID == 0 {
304 post.Rkey = puri.Rkey
305 post.Author = r.ID
306 post.NotFound = true
307
308 if err := b.db.Session(&gorm.Session{
309 Logger: logger.Default.LogMode(logger.Silent),
310 }).Create(&post).Error; err != nil {
311 if !errors.Is(err, gorm.ErrDuplicatedKey) {
312 return nil, err
313 }
314 if err := b.db.Find(&post, "author = ? AND rkey = ?", r.ID, puri.Rkey).Error; err != nil {
315 return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err)
316 }
317 }
318
319 }
320
321 b.postInfoCache.Add(uri, cachedPostInfo{
322 ID: post.ID,
323 Author: post.Author,
324 })
325
326 return &post, nil
327}
328
329func (b *PostgresBackend) revForRepo(rr *Repo) (string, error) {
330 lrev, ok := b.revCache.Get(rr.ID)
331 if ok {
332 return lrev, nil
333 }
334
335 var rev string
336 if err := b.pgx.QueryRow(context.TODO(), "SELECT COALESCE(rev, '') FROM sync_infos WHERE repo = $1", rr.ID).Scan(&rev); err != nil {
337 if errors.Is(err, pgx.ErrNoRows) {
338 return "", nil
339 }
340 return "", err
341 }
342
343 if rev != "" {
344 b.revCache.Add(rr.ID, rev)
345 }
346 return rev, nil
347}
348
349func (b *PostgresBackend) AddRelevantDid(did string) {
350 b.rdLk.Lock()
351 defer b.rdLk.Unlock()
352 b.relevantDids[did] = true
353}
354
355func (b *PostgresBackend) DidIsRelevant(did string) bool {
356 b.rdLk.Lock()
357 defer b.rdLk.Unlock()
358 return b.relevantDids[did]
359}
360
361func (b *PostgresBackend) anyRelevantIdents(idents ...string) bool {
362 for _, id := range idents {
363 if strings.HasPrefix(id, "did:") {
364 if b.DidIsRelevant(id) {
365 return true
366 }
367 } else if strings.HasPrefix(id, "at://") {
368 puri, err := syntax.ParseATURI(id)
369 if err != nil {
370 continue
371 }
372
373 if b.DidIsRelevant(puri.Authority().String()) {
374 return true
375 }
376 }
377 }
378
379 return false
380}
381
382func (b *PostgresBackend) GetRelevantDids() []string {
383 b.rdLk.Lock()
384 var out []string
385 for k := range b.relevantDids {
386 out = append(out, k)
387 }
388 b.rdLk.Unlock()
389 return out
390}
391
392func (b *PostgresBackend) GetRepoByID(ctx context.Context, id uint) (*models.Repo, error) {
393 var r models.Repo
394 if err := b.db.Find(&r, "id = ?", id).Error; err != nil {
395 return nil, err
396 }
397
398 return &r, nil
399}
400
401func (b *PostgresBackend) DidFromID(ctx context.Context, uid uint) (string, error) {
402 val, ok := b.didByIDCache.Get(uid)
403 if ok {
404 return val, nil
405 }
406
407 r, err := b.GetRepoByID(ctx, uid)
408 if err != nil {
409 return "", err
410 }
411
412 b.didByIDCache.Add(uid, r.Did)
413 return r.Did, nil
414}
415
416func (b *PostgresBackend) checkPostExists(ctx context.Context, repo *Repo, rkey string) (bool, error) {
417 var id uint
418 var notfound bool
419 if err := b.pgx.QueryRow(ctx, "SELECT id, not_found FROM posts WHERE author = $1 AND rkey = $2", repo.ID, rkey).Scan(&id, ¬found); err != nil {
420 if errors.Is(err, pgx.ErrNoRows) {
421 return false, nil
422 }
423 return false, err
424 }
425
426 if id != 0 && !notfound {
427 return true, nil
428 }
429
430 return false, nil
431}
432
433func (b *PostgresBackend) LoadRelevantDids() error {
434 ctx := context.TODO()
435
436 if err := b.ensureFollowsScraped(ctx, b.mydid); err != nil {
437 return fmt.Errorf("failed to scrape follows: %w", err)
438 }
439
440 r, err := b.GetOrCreateRepo(ctx, b.mydid)
441 if err != nil {
442 return err
443 }
444
445 var dids []string
446 if err := b.db.Raw("select did from follows left join repos on follows.subject = repos.id where follows.author = ?", r.ID).Scan(&dids).Error; err != nil {
447 return err
448 }
449
450 b.relevantDids[b.mydid] = true
451 for _, d := range dids {
452 fmt.Println("adding did: ", d)
453 b.relevantDids[d] = true
454 }
455
456 return nil
457}
458
459type SyncInfo struct {
460 Repo uint `gorm:"index"`
461 FollowsSynced bool
462 Rev string
463}
464
465func (b *PostgresBackend) ensureFollowsScraped(ctx context.Context, user string) error {
466 r, err := b.GetOrCreateRepo(ctx, user)
467 if err != nil {
468 return err
469 }
470
471 var si SyncInfo
472 if err := b.db.Find(&si, "repo = ?", r.ID).Error; err != nil {
473 return err
474 }
475
476 // not found
477 if si.Repo == 0 {
478 if err := b.db.Create(&SyncInfo{
479 Repo: r.ID,
480 }).Error; err != nil {
481 return err
482 }
483 }
484
485 if si.FollowsSynced {
486 return nil
487 }
488
489 var follows []Follow
490 var cursor string
491 for {
492 resp, err := atproto.RepoListRecords(ctx, b.client, "app.bsky.graph.follow", cursor, 100, b.mydid, false)
493 if err != nil {
494 return err
495 }
496
497 for _, rec := range resp.Records {
498 if fol, ok := rec.Value.Val.(*bsky.GraphFollow); ok {
499 fr, err := b.GetOrCreateRepo(ctx, fol.Subject)
500 if err != nil {
501 return err
502 }
503
504 puri, err := syntax.ParseATURI(rec.Uri)
505 if err != nil {
506 return err
507 }
508
509 follows = append(follows, Follow{
510 Created: time.Now(),
511 Indexed: time.Now(),
512 Rkey: puri.RecordKey().String(),
513 Author: r.ID,
514 Subject: fr.ID,
515 })
516 }
517 }
518
519 if resp.Cursor == nil || len(resp.Records) == 0 {
520 break
521 }
522 cursor = *resp.Cursor
523 }
524
525 if err := b.db.Clauses(clause.OnConflict{DoNothing: true}).CreateInBatches(follows, 200).Error; err != nil {
526 return err
527 }
528
529 if err := b.db.Model(SyncInfo{}).Where("repo = ?", r.ID).Update("follows_synced", true).Error; err != nil {
530 return err
531 }
532
533 fmt.Println("Got follows: ", len(follows))
534
535 return nil
536}