A locally focused bluesky appview
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 75d5295fde853347d619d5c9eb5c40e8afe7b9a2 406 lines 8.7 kB view raw
1package main 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "strings" 8 "time" 9 10 "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "github.com/bluesky-social/indigo/util" 14 "github.com/jackc/pgx/v5" 15 "github.com/jackc/pgx/v5/pgconn" 16 "github.com/whyrusleeping/market/models" 17 "gorm.io/gorm" 18 "gorm.io/gorm/clause" 19 "gorm.io/gorm/logger" 20) 21 22func (b *PostgresBackend) getOrCreateRepo(ctx context.Context, did string) (*Repo, error) { 23 r, ok := b.repoCache.Get(did) 24 if !ok { 25 b.reposLk.Lock() 26 27 r, ok = b.repoCache.Get(did) 28 if !ok { 29 r = &Repo{} 30 r.Did = did 31 b.repoCache.Add(did, r) 32 } 33 34 b.reposLk.Unlock() 35 } 36 37 r.Lk.Lock() 38 defer r.Lk.Unlock() 39 if r.Setup { 40 return r, nil 41 } 42 43 row := b.pgx.QueryRow(ctx, "SELECT id, created_at, did FROM repos WHERE did = $1", did) 44 45 err := row.Scan(&r.ID, &r.CreatedAt, &r.Did) 46 if err == nil { 47 // found it! 48 r.Setup = true 49 return r, nil 50 } 51 52 if err != pgx.ErrNoRows { 53 return nil, err 54 } 55 56 r.Did = did 57 if err := b.db.Create(r).Error; err != nil { 58 return nil, err 59 } 60 61 r.Setup = true 62 63 return r, nil 64} 65 66func (b *PostgresBackend) getOrCreateList(ctx context.Context, uri string) (*List, error) { 67 puri, err := util.ParseAtUri(uri) 68 if err != nil { 69 return nil, err 70 } 71 72 r, err := b.getOrCreateRepo(ctx, puri.Did) 73 if err != nil { 74 return nil, err 75 } 76 77 // TODO: needs upsert treatment when we actually find the list 78 var list List 79 if err := b.db.FirstOrCreate(&list, map[string]any{ 80 "author": r.ID, 81 "rkey": puri.Rkey, 82 }).Error; err != nil { 83 return nil, err 84 } 85 return &list, nil 86} 87 88type cachedPostInfo struct { 89 ID uint 90 Author uint 91} 92 93func (b *PostgresBackend) postIDForUri(ctx context.Context, uri string) (uint, error) { 94 // getPostByUri implicitly fills the cache 95 p, err := b.postInfoForUri(ctx, uri) 96 if err != nil { 97 return 0, err 98 } 99 100 return p.ID, nil 101} 102 103func (b *PostgresBackend) postInfoForUri(ctx context.Context, uri string) (cachedPostInfo, error) { 104 v, ok := b.postInfoCache.Get(uri) 105 if ok { 106 return v, nil 107 } 108 109 // getPostByUri implicitly fills the cache 110 p, err := b.getOrCreatePostBare(ctx, uri) 111 if err != nil { 112 return cachedPostInfo{}, err 113 } 114 115 return cachedPostInfo{ID: p.ID, Author: p.Author}, nil 116} 117 118func (b *PostgresBackend) tryLoadPostInfo(ctx context.Context, uid uint, rkey string) (*Post, error) { 119 var p Post 120 q := "SELECT id, author FROM posts WHERE author = $1 AND rkey = $2" 121 if err := b.pgx.QueryRow(ctx, q, uid, rkey).Scan(&p.ID, &p.Author); err != nil { 122 if errors.Is(err, pgx.ErrNoRows) { 123 return nil, nil 124 } 125 return nil, err 126 } 127 128 return &p, nil 129} 130 131func (b *PostgresBackend) getOrCreatePostBare(ctx context.Context, uri string) (*Post, error) { 132 puri, err := util.ParseAtUri(uri) 133 if err != nil { 134 return nil, err 135 } 136 137 r, err := b.getOrCreateRepo(ctx, puri.Did) 138 if err != nil { 139 return nil, err 140 } 141 142 post, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey) 143 if err != nil { 144 return nil, err 145 } 146 147 if post == nil { 148 post = &Post{ 149 Rkey: puri.Rkey, 150 Author: r.ID, 151 NotFound: true, 152 } 153 154 err := b.pgx.QueryRow(ctx, "INSERT INTO posts (rkey, author, not_found) VALUES ($1, $2, $3) RETURNING id", puri.Rkey, r.ID, true).Scan(&post.ID) 155 if err != nil { 156 pgErr, ok := err.(*pgconn.PgError) 157 if !ok || pgErr.Code != "23505" { 158 return nil, err 159 } 160 161 out, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey) 162 if err != nil { 163 return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err) 164 } 165 if out == nil { 166 return nil, fmt.Errorf("postgres is lying to us: %d %s", r.ID, puri.Rkey) 167 } 168 169 post = out 170 } 171 172 } 173 174 b.postInfoCache.Add(uri, cachedPostInfo{ 175 ID: post.ID, 176 Author: post.Author, 177 }) 178 179 return post, nil 180} 181 182func (b *PostgresBackend) getPostByUri(ctx context.Context, uri string, fields string) (*Post, error) { 183 puri, err := util.ParseAtUri(uri) 184 if err != nil { 185 return nil, err 186 } 187 188 r, err := b.getOrCreateRepo(ctx, puri.Did) 189 if err != nil { 190 return nil, err 191 } 192 193 q := "SELECT " + fields + " FROM posts WHERE author = ? AND rkey = ?" 194 195 var post Post 196 if err := b.db.Raw(q, r.ID, puri.Rkey).Scan(&post).Error; err != nil { 197 return nil, err 198 } 199 200 if post.ID == 0 { 201 post.Rkey = puri.Rkey 202 post.Author = r.ID 203 post.NotFound = true 204 205 if err := b.db.Session(&gorm.Session{ 206 Logger: logger.Default.LogMode(logger.Silent), 207 }).Create(&post).Error; err != nil { 208 if !errors.Is(err, gorm.ErrDuplicatedKey) { 209 return nil, err 210 } 211 if err := b.db.Find(&post, "author = ? AND rkey = ?", r.ID, puri.Rkey).Error; err != nil { 212 return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err) 213 } 214 } 215 216 } 217 218 b.postInfoCache.Add(uri, cachedPostInfo{ 219 ID: post.ID, 220 Author: post.Author, 221 }) 222 223 return &post, nil 224} 225 226func (b *PostgresBackend) revForRepo(rr *Repo) (string, error) { 227 lrev, ok := b.revCache.Get(rr.ID) 228 if ok { 229 return lrev, nil 230 } 231 232 var rev string 233 if err := b.pgx.QueryRow(context.TODO(), "SELECT COALESCE(rev, '') FROM sync_infos WHERE repo = $1", rr.ID).Scan(&rev); err != nil { 234 if errors.Is(err, pgx.ErrNoRows) { 235 return "", nil 236 } 237 return "", err 238 } 239 240 if rev != "" { 241 b.revCache.Add(rr.ID, rev) 242 } 243 return rev, nil 244} 245 246func (b *PostgresBackend) ensureFollowsScraped(ctx context.Context, user string) error { 247 r, err := b.getOrCreateRepo(ctx, user) 248 if err != nil { 249 return err 250 } 251 252 var si SyncInfo 253 if err := b.db.Find(&si, "repo = ?", r.ID).Error; err != nil { 254 return err 255 } 256 257 // not found 258 if si.Repo == 0 { 259 if err := b.db.Create(&SyncInfo{ 260 Repo: r.ID, 261 }).Error; err != nil { 262 return err 263 } 264 } 265 266 if si.FollowsSynced { 267 return nil 268 } 269 270 var follows []Follow 271 var cursor string 272 for { 273 resp, err := atproto.RepoListRecords(ctx, b.s.client, "app.bsky.graph.follow", cursor, 100, b.s.mydid, false) 274 if err != nil { 275 return err 276 } 277 278 for _, rec := range resp.Records { 279 if fol, ok := rec.Value.Val.(*bsky.GraphFollow); ok { 280 fr, err := b.getOrCreateRepo(ctx, fol.Subject) 281 if err != nil { 282 return err 283 } 284 285 puri, err := syntax.ParseATURI(rec.Uri) 286 if err != nil { 287 return err 288 } 289 290 follows = append(follows, Follow{ 291 Created: time.Now(), 292 Indexed: time.Now(), 293 Rkey: puri.RecordKey().String(), 294 Author: r.ID, 295 Subject: fr.ID, 296 }) 297 } 298 } 299 300 if resp.Cursor == nil || len(resp.Records) == 0 { 301 break 302 } 303 cursor = *resp.Cursor 304 } 305 306 if err := b.db.Clauses(clause.OnConflict{DoNothing: true}).CreateInBatches(follows, 200).Error; err != nil { 307 return err 308 } 309 310 if err := b.db.Model(SyncInfo{}).Where("repo = ?", r.ID).Update("follows_synced", true).Error; err != nil { 311 return err 312 } 313 314 fmt.Println("Got follows: ", len(follows)) 315 316 return nil 317} 318 319func (b *PostgresBackend) loadRelevantDids() error { 320 ctx := context.TODO() 321 322 if err := b.ensureFollowsScraped(ctx, b.s.mydid); err != nil { 323 return fmt.Errorf("failed to scrape follows: %w", err) 324 } 325 326 r, err := b.getOrCreateRepo(ctx, b.s.mydid) 327 if err != nil { 328 return err 329 } 330 331 var dids []string 332 if err := b.db.Raw("select did from follows left join repos on follows.subject = repos.id where follows.author = ?", r.ID).Scan(&dids).Error; err != nil { 333 return err 334 } 335 336 b.relevantDids[b.s.mydid] = true 337 for _, d := range dids { 338 fmt.Println("adding did: ", d) 339 b.relevantDids[d] = true 340 } 341 342 return nil 343} 344 345type SyncInfo struct { 346 Repo uint `gorm:"index"` 347 FollowsSynced bool 348 Rev string 349} 350 351func (b *PostgresBackend) checkPostExists(ctx context.Context, repo *Repo, rkey string) (bool, error) { 352 var id uint 353 var notfound bool 354 if err := b.pgx.QueryRow(ctx, "SELECT id, not_found FROM posts WHERE author = $1 AND rkey = $2", repo.ID, rkey).Scan(&id, &notfound); err != nil { 355 if errors.Is(err, pgx.ErrNoRows) { 356 return false, nil 357 } 358 return false, err 359 } 360 361 if id != 0 && !notfound { 362 return true, nil 363 } 364 365 return false, nil 366} 367 368func (b *PostgresBackend) didIsRelevant(did string) bool { 369 b.rdLk.Lock() 370 defer b.rdLk.Unlock() 371 return b.relevantDids[did] 372} 373 374func (b *PostgresBackend) anyRelevantIdents(idents ...string) bool { 375 for _, id := range idents { 376 if strings.HasPrefix(id, "did:") { 377 if b.didIsRelevant(id) { 378 return true 379 } 380 } else if strings.HasPrefix(id, "at://") { 381 puri, err := syntax.ParseATURI(id) 382 if err != nil { 383 continue 384 } 385 386 if b.didIsRelevant(puri.Authority().String()) { 387 return true 388 } 389 } 390 } 391 392 return false 393} 394 395func (b *PostgresBackend) getRepoByID(ctx context.Context, id uint) (*models.Repo, error) { 396 var r models.Repo 397 if err := b.db.Find(&r, "id = ?", id).Error; err != nil { 398 return nil, err 399 } 400 401 return &r, nil 402} 403 404func (b *PostgresBackend) TrackMissingActor(did string) { 405 b.s.addMissingProfile(context.TODO(), did) 406}