A locally focused bluesky appview
at master 12 kB view raw
1package backend 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "strings" 8 "sync" 9 "time" 10 11 "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/api/bsky" 13 "github.com/bluesky-social/indigo/atproto/identity" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/util" 16 "github.com/bluesky-social/indigo/xrpc" 17 lru "github.com/hashicorp/golang-lru/v2" 18 "github.com/jackc/pgx/v5" 19 "github.com/jackc/pgx/v5/pgconn" 20 "github.com/jackc/pgx/v5/pgxpool" 21 . "github.com/whyrusleeping/konbini/models" 22 "github.com/whyrusleeping/market/models" 23 "gorm.io/gorm" 24 "gorm.io/gorm/clause" 25 "gorm.io/gorm/logger" 26) 27 28// PostgresBackend handles database operations 29type PostgresBackend struct { 30 db *gorm.DB 31 pgx *pgxpool.Pool 32 33 dir identity.Directory 34 35 client *xrpc.Client 36 37 mydid string 38 myrepo *models.Repo 39 40 relevantDids map[string]bool 41 rdLk sync.Mutex 42 43 revCache *lru.TwoQueueCache[uint, string] 44 45 repoCache *lru.TwoQueueCache[string, *Repo] 46 reposLk sync.Mutex 47 48 didByIDCache *lru.TwoQueueCache[uint, string] 49 50 postInfoCache *lru.TwoQueueCache[string, cachedPostInfo] 51 52 missingRecords chan MissingRecord 53} 54 55type cachedPostInfo struct { 56 ID uint 57 Author uint 58} 59 60// NewPostgresBackend creates a new PostgresBackend 61func NewPostgresBackend(mydid string, db *gorm.DB, pgx *pgxpool.Pool, client *xrpc.Client, dir identity.Directory) (*PostgresBackend, error) { 62 rc, _ := lru.New2Q[string, *Repo](1_000_000) 63 pc, _ := lru.New2Q[string, cachedPostInfo](1_000_000) 64 revc, _ := lru.New2Q[uint, string](1_000_000) 65 dbic, _ := lru.New2Q[uint, string](1_000_000) 66 67 b := &PostgresBackend{ 68 client: client, 69 mydid: mydid, 70 db: db, 71 pgx: pgx, 72 relevantDids: make(map[string]bool), 73 repoCache: rc, 74 postInfoCache: pc, 75 revCache: revc, 76 didByIDCache: dbic, 77 dir: dir, 78 79 missingRecords: make(chan MissingRecord, 1000), 80 } 81 82 r, err := b.GetOrCreateRepo(context.TODO(), mydid) 83 if err != nil { 84 return nil, err 85 } 86 87 b.myrepo = r 88 89 go b.missingRecordFetcher() 90 return b, nil 91} 92 93// TrackMissingRecord implements the RecordTracker interface 94func (b *PostgresBackend) TrackMissingRecord(identifier string, wait bool) { 95 mr := MissingRecord{ 96 Type: mrTypeFromIdent(identifier), 97 Identifier: identifier, 98 Wait: wait, 99 } 100 101 b.addMissingRecord(context.TODO(), mr) 102} 103 104func mrTypeFromIdent(ident string) MissingRecordType { 105 if strings.HasPrefix(ident, "did:") { 106 return MissingRecordTypeProfile 107 } 108 109 puri, _ := syntax.ParseATURI(ident) 110 switch puri.Collection().String() { 111 case "app.bsky.feed.post": 112 return MissingRecordTypePost 113 case "app.bsky.feed.generator": 114 return MissingRecordTypeFeedGenerator 115 default: 116 return MissingRecordTypeUnknown 117 } 118 119} 120 121// DidToID converts a DID to a database ID 122func (b *PostgresBackend) DidToID(ctx context.Context, did string) (uint, error) { 123 r, err := b.GetOrCreateRepo(ctx, did) 124 if err != nil { 125 return 0, err 126 } 127 return r.ID, nil 128} 129 130func (b *PostgresBackend) GetOrCreateRepo(ctx context.Context, did string) (*Repo, error) { 131 r, ok := b.repoCache.Get(did) 132 if !ok { 133 b.reposLk.Lock() 134 135 r, ok = b.repoCache.Get(did) 136 if !ok { 137 r = &Repo{} 138 r.Did = did 139 b.repoCache.Add(did, r) 140 } 141 142 b.reposLk.Unlock() 143 } 144 145 r.Lk.Lock() 146 defer r.Lk.Unlock() 147 if r.Setup { 148 return r, nil 149 } 150 151 row := b.pgx.QueryRow(ctx, "SELECT id, created_at, did FROM repos WHERE did = $1", did) 152 153 err := row.Scan(&r.ID, &r.CreatedAt, &r.Did) 154 if err == nil { 155 // found it! 156 r.Setup = true 157 return r, nil 158 } 159 160 if err != pgx.ErrNoRows { 161 return nil, err 162 } 163 164 r.Did = did 165 if err := b.db.Create(r).Error; err != nil { 166 return nil, err 167 } 168 169 r.Setup = true 170 171 return r, nil 172} 173 174func (b *PostgresBackend) GetOrCreateList(ctx context.Context, uri string) (*List, error) { 175 puri, err := util.ParseAtUri(uri) 176 if err != nil { 177 return nil, err 178 } 179 180 r, err := b.GetOrCreateRepo(ctx, puri.Did) 181 if err != nil { 182 return nil, err 183 } 184 185 // TODO: needs upsert treatment when we actually find the list 186 var list List 187 if err := b.db.FirstOrCreate(&list, map[string]any{ 188 "author": r.ID, 189 "rkey": puri.Rkey, 190 }).Error; err != nil { 191 return nil, err 192 } 193 return &list, nil 194} 195 196func (b *PostgresBackend) postIDForUri(ctx context.Context, uri string) (uint, error) { 197 // getPostByUri implicitly fills the cache 198 p, err := b.postInfoForUri(ctx, uri) 199 if err != nil { 200 return 0, err 201 } 202 203 return p.ID, nil 204} 205 206func (b *PostgresBackend) postInfoForUri(ctx context.Context, uri string) (cachedPostInfo, error) { 207 v, ok := b.postInfoCache.Get(uri) 208 if ok { 209 return v, nil 210 } 211 212 // getPostByUri implicitly fills the cache 213 p, err := b.getOrCreatePostBare(ctx, uri) 214 if err != nil { 215 return cachedPostInfo{}, err 216 } 217 218 return cachedPostInfo{ID: p.ID, Author: p.Author}, nil 219} 220 221func (b *PostgresBackend) tryLoadPostInfo(ctx context.Context, uid uint, rkey string) (*Post, error) { 222 var p Post 223 q := "SELECT id, author FROM posts WHERE author = $1 AND rkey = $2" 224 if err := b.pgx.QueryRow(ctx, q, uid, rkey).Scan(&p.ID, &p.Author); err != nil { 225 if errors.Is(err, pgx.ErrNoRows) { 226 return nil, nil 227 } 228 return nil, err 229 } 230 231 return &p, nil 232} 233 234func (b *PostgresBackend) getOrCreatePostBare(ctx context.Context, uri string) (*Post, error) { 235 puri, err := util.ParseAtUri(uri) 236 if err != nil { 237 return nil, err 238 } 239 240 r, err := b.GetOrCreateRepo(ctx, puri.Did) 241 if err != nil { 242 return nil, err 243 } 244 245 post, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey) 246 if err != nil { 247 return nil, err 248 } 249 250 if post == nil { 251 post = &Post{ 252 Rkey: puri.Rkey, 253 Author: r.ID, 254 NotFound: true, 255 } 256 257 err := b.pgx.QueryRow(ctx, "INSERT INTO posts (rkey, author, not_found) VALUES ($1, $2, $3) RETURNING id", puri.Rkey, r.ID, true).Scan(&post.ID) 258 if err != nil { 259 pgErr, ok := err.(*pgconn.PgError) 260 if !ok || pgErr.Code != "23505" { 261 return nil, err 262 } 263 264 out, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey) 265 if err != nil { 266 return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err) 267 } 268 if out == nil { 269 return nil, fmt.Errorf("postgres is lying to us: %d %s", r.ID, puri.Rkey) 270 } 271 272 post = out 273 } 274 275 } 276 277 b.postInfoCache.Add(uri, cachedPostInfo{ 278 ID: post.ID, 279 Author: post.Author, 280 }) 281 282 return post, nil 283} 284 285func (b *PostgresBackend) GetPostByUri(ctx context.Context, uri string, fields string) (*Post, error) { 286 puri, err := util.ParseAtUri(uri) 287 if err != nil { 288 return nil, err 289 } 290 291 r, err := b.GetOrCreateRepo(ctx, puri.Did) 292 if err != nil { 293 return nil, err 294 } 295 296 q := "SELECT " + fields + " FROM posts WHERE author = ? AND rkey = ?" 297 298 var post Post 299 if err := b.db.Raw(q, r.ID, puri.Rkey).Scan(&post).Error; err != nil { 300 return nil, err 301 } 302 303 if post.ID == 0 { 304 post.Rkey = puri.Rkey 305 post.Author = r.ID 306 post.NotFound = true 307 308 if err := b.db.Session(&gorm.Session{ 309 Logger: logger.Default.LogMode(logger.Silent), 310 }).Create(&post).Error; err != nil { 311 if !errors.Is(err, gorm.ErrDuplicatedKey) { 312 return nil, err 313 } 314 if err := b.db.Find(&post, "author = ? AND rkey = ?", r.ID, puri.Rkey).Error; err != nil { 315 return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err) 316 } 317 } 318 319 } 320 321 b.postInfoCache.Add(uri, cachedPostInfo{ 322 ID: post.ID, 323 Author: post.Author, 324 }) 325 326 return &post, nil 327} 328 329func (b *PostgresBackend) revForRepo(rr *Repo) (string, error) { 330 lrev, ok := b.revCache.Get(rr.ID) 331 if ok { 332 return lrev, nil 333 } 334 335 var rev string 336 if err := b.pgx.QueryRow(context.TODO(), "SELECT COALESCE(rev, '') FROM sync_infos WHERE repo = $1", rr.ID).Scan(&rev); err != nil { 337 if errors.Is(err, pgx.ErrNoRows) { 338 return "", nil 339 } 340 return "", err 341 } 342 343 if rev != "" { 344 b.revCache.Add(rr.ID, rev) 345 } 346 return rev, nil 347} 348 349func (b *PostgresBackend) AddRelevantDid(did string) { 350 b.rdLk.Lock() 351 defer b.rdLk.Unlock() 352 b.relevantDids[did] = true 353} 354 355func (b *PostgresBackend) DidIsRelevant(did string) bool { 356 b.rdLk.Lock() 357 defer b.rdLk.Unlock() 358 return b.relevantDids[did] 359} 360 361func (b *PostgresBackend) anyRelevantIdents(idents ...string) bool { 362 for _, id := range idents { 363 if strings.HasPrefix(id, "did:") { 364 if b.DidIsRelevant(id) { 365 return true 366 } 367 } else if strings.HasPrefix(id, "at://") { 368 puri, err := syntax.ParseATURI(id) 369 if err != nil { 370 continue 371 } 372 373 if b.DidIsRelevant(puri.Authority().String()) { 374 return true 375 } 376 } 377 } 378 379 return false 380} 381 382func (b *PostgresBackend) GetRelevantDids() []string { 383 b.rdLk.Lock() 384 var out []string 385 for k := range b.relevantDids { 386 out = append(out, k) 387 } 388 b.rdLk.Unlock() 389 return out 390} 391 392func (b *PostgresBackend) GetRepoByID(ctx context.Context, id uint) (*models.Repo, error) { 393 var r models.Repo 394 if err := b.db.Find(&r, "id = ?", id).Error; err != nil { 395 return nil, err 396 } 397 398 return &r, nil 399} 400 401func (b *PostgresBackend) DidFromID(ctx context.Context, uid uint) (string, error) { 402 val, ok := b.didByIDCache.Get(uid) 403 if ok { 404 return val, nil 405 } 406 407 r, err := b.GetRepoByID(ctx, uid) 408 if err != nil { 409 return "", err 410 } 411 412 b.didByIDCache.Add(uid, r.Did) 413 return r.Did, nil 414} 415 416func (b *PostgresBackend) checkPostExists(ctx context.Context, repo *Repo, rkey string) (bool, error) { 417 var id uint 418 var notfound bool 419 if err := b.pgx.QueryRow(ctx, "SELECT id, not_found FROM posts WHERE author = $1 AND rkey = $2", repo.ID, rkey).Scan(&id, &notfound); err != nil { 420 if errors.Is(err, pgx.ErrNoRows) { 421 return false, nil 422 } 423 return false, err 424 } 425 426 if id != 0 && !notfound { 427 return true, nil 428 } 429 430 return false, nil 431} 432 433func (b *PostgresBackend) LoadRelevantDids() error { 434 ctx := context.TODO() 435 436 if err := b.ensureFollowsScraped(ctx, b.mydid); err != nil { 437 return fmt.Errorf("failed to scrape follows: %w", err) 438 } 439 440 r, err := b.GetOrCreateRepo(ctx, b.mydid) 441 if err != nil { 442 return err 443 } 444 445 var dids []string 446 if err := b.db.Raw("select did from follows left join repos on follows.subject = repos.id where follows.author = ?", r.ID).Scan(&dids).Error; err != nil { 447 return err 448 } 449 450 b.relevantDids[b.mydid] = true 451 for _, d := range dids { 452 fmt.Println("adding did: ", d) 453 b.relevantDids[d] = true 454 } 455 456 return nil 457} 458 459type SyncInfo struct { 460 Repo uint `gorm:"index"` 461 FollowsSynced bool 462 Rev string 463} 464 465func (b *PostgresBackend) ensureFollowsScraped(ctx context.Context, user string) error { 466 r, err := b.GetOrCreateRepo(ctx, user) 467 if err != nil { 468 return err 469 } 470 471 var si SyncInfo 472 if err := b.db.Find(&si, "repo = ?", r.ID).Error; err != nil { 473 return err 474 } 475 476 // not found 477 if si.Repo == 0 { 478 if err := b.db.Create(&SyncInfo{ 479 Repo: r.ID, 480 }).Error; err != nil { 481 return err 482 } 483 } 484 485 if si.FollowsSynced { 486 return nil 487 } 488 489 var follows []Follow 490 var cursor string 491 for { 492 resp, err := atproto.RepoListRecords(ctx, b.client, "app.bsky.graph.follow", cursor, 100, b.mydid, false) 493 if err != nil { 494 return err 495 } 496 497 for _, rec := range resp.Records { 498 if fol, ok := rec.Value.Val.(*bsky.GraphFollow); ok { 499 fr, err := b.GetOrCreateRepo(ctx, fol.Subject) 500 if err != nil { 501 return err 502 } 503 504 puri, err := syntax.ParseATURI(rec.Uri) 505 if err != nil { 506 return err 507 } 508 509 follows = append(follows, Follow{ 510 Created: time.Now(), 511 Indexed: time.Now(), 512 Rkey: puri.RecordKey().String(), 513 Author: r.ID, 514 Subject: fr.ID, 515 }) 516 } 517 } 518 519 if resp.Cursor == nil || len(resp.Records) == 0 { 520 break 521 } 522 cursor = *resp.Cursor 523 } 524 525 if err := b.db.Clauses(clause.OnConflict{DoNothing: true}).CreateInBatches(follows, 200).Error; err != nil { 526 return err 527 } 528 529 if err := b.db.Model(SyncInfo{}).Where("repo = ?", r.ID).Update("follows_synced", true).Error; err != nil { 530 return err 531 } 532 533 fmt.Println("Got follows: ", len(follows)) 534 535 return nil 536}