A locally focused bluesky appview

finish backend refactor

+153 -27
backend/backend.go
··· 6 6 "fmt" 7 7 "strings" 8 8 "sync" 9 + "time" 9 10 11 + "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/api/bsky" 10 13 "github.com/bluesky-social/indigo/atproto/syntax" 11 14 "github.com/bluesky-social/indigo/util" 15 + "github.com/bluesky-social/indigo/xrpc" 12 16 lru "github.com/hashicorp/golang-lru/v2" 13 17 "github.com/jackc/pgx/v5" 14 18 "github.com/jackc/pgx/v5/pgconn" ··· 16 20 . "github.com/whyrusleeping/konbini/models" 17 21 "github.com/whyrusleeping/market/models" 18 22 "gorm.io/gorm" 23 + "gorm.io/gorm/clause" 19 24 "gorm.io/gorm/logger" 20 25 ) 21 26 ··· 25 30 pgx *pgxpool.Pool 26 31 tracker RecordTracker 27 32 33 + client *xrpc.Client 34 + 35 + mydid string 36 + myrepo *models.Repo 37 + 28 38 relevantDids map[string]bool 29 39 rdLk sync.Mutex 30 40 ··· 42 52 } 43 53 44 54 // NewPostgresBackend creates a new PostgresBackend 45 - func NewPostgresBackend(db *gorm.DB, pgx *pgxpool.Pool, tracker RecordTracker) (*PostgresBackend, error) { 46 - rc, err := lru.New2Q[string, *Repo](1_000_000) 47 - if err != nil { 48 - return nil, err 49 - } 50 - pc, err := lru.New2Q[string, cachedPostInfo](1_000_000) 51 - if err != nil { 52 - return nil, err 53 - } 54 - revc, err := lru.New2Q[uint, string](1_000_000) 55 - if err != nil { 56 - return nil, err 57 - } 55 + func NewPostgresBackend(mydid string, db *gorm.DB, pgx *pgxpool.Pool, client *xrpc.Client, tracker RecordTracker) (*PostgresBackend, error) { 56 + rc, _ := lru.New2Q[string, *Repo](1_000_000) 57 + pc, _ := lru.New2Q[string, cachedPostInfo](1_000_000) 58 + revc, _ := lru.New2Q[uint, string](1_000_000) 58 59 59 - return &PostgresBackend{ 60 + b := &PostgresBackend{ 61 + client: client, 62 + mydid: mydid, 60 63 db: db, 61 64 pgx: pgx, 62 65 tracker: tracker, ··· 64 67 repoCache: rc, 65 68 postInfoCache: pc, 66 69 revCache: revc, 67 - }, nil 70 + } 71 + 72 + r, err := b.GetOrCreateRepo(context.TODO(), mydid) 73 + if err != nil { 74 + return nil, err 75 + } 76 + 77 + b.myrepo = r 78 + return b, nil 68 79 } 69 80 70 81 // TrackMissingRecord implements the RecordTracker interface ··· 76 87 77 88 // DidToID converts a DID to a database ID 78 89 func (b *PostgresBackend) DidToID(ctx context.Context, did string) (uint, error) { 79 - r, err := b.getOrCreateRepo(ctx, did) 90 + r, err := b.GetOrCreateRepo(ctx, did) 80 91 if err != nil { 81 92 return 0, err 82 93 } 83 94 return r.ID, nil 84 95 } 85 96 86 - func (b *PostgresBackend) getOrCreateRepo(ctx context.Context, did string) (*Repo, error) { 97 + func (b *PostgresBackend) GetOrCreateRepo(ctx context.Context, did string) (*Repo, error) { 87 98 r, ok := b.repoCache.Get(did) 88 99 if !ok { 89 100 b.reposLk.Lock() ··· 127 138 return r, nil 128 139 } 129 140 130 - func (b *PostgresBackend) getOrCreateList(ctx context.Context, uri string) (*List, error) { 141 + func (b *PostgresBackend) GetOrCreateList(ctx context.Context, uri string) (*List, error) { 131 142 puri, err := util.ParseAtUri(uri) 132 143 if err != nil { 133 144 return nil, err 134 145 } 135 146 136 - r, err := b.getOrCreateRepo(ctx, puri.Did) 147 + r, err := b.GetOrCreateRepo(ctx, puri.Did) 137 148 if err != nil { 138 149 return nil, err 139 150 } ··· 193 204 return nil, err 194 205 } 195 206 196 - r, err := b.getOrCreateRepo(ctx, puri.Did) 207 + r, err := b.GetOrCreateRepo(ctx, puri.Did) 197 208 if err != nil { 198 209 return nil, err 199 210 } ··· 238 249 return post, nil 239 250 } 240 251 241 - func (b *PostgresBackend) getPostByUri(ctx context.Context, uri string, fields string) (*Post, error) { 252 + func (b *PostgresBackend) GetPostByUri(ctx context.Context, uri string, fields string) (*Post, error) { 242 253 puri, err := util.ParseAtUri(uri) 243 254 if err != nil { 244 255 return nil, err 245 256 } 246 257 247 - r, err := b.getOrCreateRepo(ctx, puri.Did) 258 + r, err := b.GetOrCreateRepo(ctx, puri.Did) 248 259 if err != nil { 249 260 return nil, err 250 261 } ··· 302 313 return rev, nil 303 314 } 304 315 305 - func (b *PostgresBackend) addRelevantDid(did string) { 316 + func (b *PostgresBackend) AddRelevantDid(did string) { 306 317 b.rdLk.Lock() 307 318 defer b.rdLk.Unlock() 308 319 b.relevantDids[did] = true 309 320 } 310 321 311 - func (b *PostgresBackend) didIsRelevant(did string) bool { 322 + func (b *PostgresBackend) DidIsRelevant(did string) bool { 312 323 b.rdLk.Lock() 313 324 defer b.rdLk.Unlock() 314 325 return b.relevantDids[did] ··· 317 328 func (b *PostgresBackend) anyRelevantIdents(idents ...string) bool { 318 329 for _, id := range idents { 319 330 if strings.HasPrefix(id, "did:") { 320 - if b.didIsRelevant(id) { 331 + if b.DidIsRelevant(id) { 321 332 return true 322 333 } 323 334 } else if strings.HasPrefix(id, "at://") { ··· 326 337 continue 327 338 } 328 339 329 - if b.didIsRelevant(puri.Authority().String()) { 340 + if b.DidIsRelevant(puri.Authority().String()) { 330 341 return true 331 342 } 332 343 } ··· 335 346 return false 336 347 } 337 348 338 - func (b *PostgresBackend) getRepoByID(ctx context.Context, id uint) (*models.Repo, error) { 349 + func (b *PostgresBackend) GetRelevantDids() []string { 350 + b.rdLk.Lock() 351 + var out []string 352 + for k := range b.relevantDids { 353 + out = append(out, k) 354 + } 355 + b.rdLk.Unlock() 356 + return out 357 + } 358 + 359 + func (b *PostgresBackend) GetRepoByID(ctx context.Context, id uint) (*models.Repo, error) { 339 360 var r models.Repo 340 361 if err := b.db.Find(&r, "id = ?", id).Error; err != nil { 341 362 return nil, err ··· 360 381 361 382 return false, nil 362 383 } 384 + 385 + func (b *PostgresBackend) LoadRelevantDids() error { 386 + ctx := context.TODO() 387 + 388 + if err := b.ensureFollowsScraped(ctx, b.mydid); err != nil { 389 + return fmt.Errorf("failed to scrape follows: %w", err) 390 + } 391 + 392 + r, err := b.GetOrCreateRepo(ctx, b.mydid) 393 + if err != nil { 394 + return err 395 + } 396 + 397 + var dids []string 398 + if err := b.db.Raw("select did from follows left join repos on follows.subject = repos.id where follows.author = ?", r.ID).Scan(&dids).Error; err != nil { 399 + return err 400 + } 401 + 402 + b.relevantDids[b.mydid] = true 403 + for _, d := range dids { 404 + fmt.Println("adding did: ", d) 405 + b.relevantDids[d] = true 406 + } 407 + 408 + return nil 409 + } 410 + 411 + type SyncInfo struct { 412 + Repo uint `gorm:"index"` 413 + FollowsSynced bool 414 + Rev string 415 + } 416 + 417 + func (b *PostgresBackend) ensureFollowsScraped(ctx context.Context, user string) error { 418 + r, err := b.GetOrCreateRepo(ctx, user) 419 + if err != nil { 420 + return err 421 + } 422 + 423 + var si SyncInfo 424 + if err := b.db.Find(&si, "repo = ?", r.ID).Error; err != nil { 425 + return err 426 + } 427 + 428 + // not found 429 + if si.Repo == 0 { 430 + if err := b.db.Create(&SyncInfo{ 431 + Repo: r.ID, 432 + }).Error; err != nil { 433 + return err 434 + } 435 + } 436 + 437 + if si.FollowsSynced { 438 + return nil 439 + } 440 + 441 + var follows []Follow 442 + var cursor string 443 + for { 444 + resp, err := atproto.RepoListRecords(ctx, b.client, "app.bsky.graph.follow", cursor, 100, b.mydid, false) 445 + if err != nil { 446 + return err 447 + } 448 + 449 + for _, rec := range resp.Records { 450 + if fol, ok := rec.Value.Val.(*bsky.GraphFollow); ok { 451 + fr, err := b.GetOrCreateRepo(ctx, fol.Subject) 452 + if err != nil { 453 + return err 454 + } 455 + 456 + puri, err := syntax.ParseATURI(rec.Uri) 457 + if err != nil { 458 + return err 459 + } 460 + 461 + follows = append(follows, Follow{ 462 + Created: time.Now(), 463 + Indexed: time.Now(), 464 + Rkey: puri.RecordKey().String(), 465 + Author: r.ID, 466 + Subject: fr.ID, 467 + }) 468 + } 469 + } 470 + 471 + if resp.Cursor == nil || len(resp.Records) == 0 { 472 + break 473 + } 474 + cursor = *resp.Cursor 475 + } 476 + 477 + if err := b.db.Clauses(clause.OnConflict{DoNothing: true}).CreateInBatches(follows, 200).Error; err != nil { 478 + return err 479 + } 480 + 481 + if err := b.db.Model(SyncInfo{}).Where("repo = ?", r.ID).Update("follows_synced", true).Error; err != nil { 482 + return err 483 + } 484 + 485 + fmt.Println("Got follows: ", len(follows)) 486 + 487 + return nil 488 + }
-1130
events.go
··· 1 - package main 2 - 3 - import ( 4 - "bytes" 5 - "context" 6 - "fmt" 7 - "log/slog" 8 - "strings" 9 - "sync" 10 - "time" 11 - 12 - "github.com/bluesky-social/indigo/api/atproto" 13 - "github.com/bluesky-social/indigo/api/bsky" 14 - "github.com/bluesky-social/indigo/atproto/syntax" 15 - "github.com/bluesky-social/indigo/repo" 16 - lru "github.com/hashicorp/golang-lru/v2" 17 - "github.com/ipfs/go-cid" 18 - "github.com/jackc/pgx/v5/pgconn" 19 - "github.com/jackc/pgx/v5/pgxpool" 20 - "gorm.io/gorm" 21 - 22 - . "github.com/whyrusleeping/konbini/models" 23 - ) 24 - 25 - type PostgresBackend struct { 26 - db *gorm.DB 27 - pgx *pgxpool.Pool 28 - s *Server 29 - 30 - relevantDids map[string]bool 31 - rdLk sync.Mutex 32 - 33 - revCache *lru.TwoQueueCache[uint, string] 34 - 35 - repoCache *lru.TwoQueueCache[string, *Repo] 36 - reposLk sync.Mutex 37 - 38 - postInfoCache *lru.TwoQueueCache[string, cachedPostInfo] 39 - } 40 - 41 - func (b *PostgresBackend) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error { 42 - r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 43 - if err != nil { 44 - return fmt.Errorf("failed to read event repo: %w", err) 45 - } 46 - 47 - for _, op := range evt.Ops { 48 - switch op.Action { 49 - case "create": 50 - c, rec, err := r.GetRecordBytes(ctx, op.Path) 51 - if err != nil { 52 - return err 53 - } 54 - if err := b.HandleCreate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil { 55 - return fmt.Errorf("create record failed: %w", err) 56 - } 57 - case "update": 58 - c, rec, err := r.GetRecordBytes(ctx, op.Path) 59 - if err != nil { 60 - return err 61 - } 62 - if err := b.HandleUpdate(ctx, evt.Repo, evt.Rev, op.Path, rec, &c); err != nil { 63 - return fmt.Errorf("update record failed: %w", err) 64 - } 65 - case "delete": 66 - if err := b.HandleDelete(ctx, evt.Repo, evt.Rev, op.Path); err != nil { 67 - return fmt.Errorf("delete record failed: %w", err) 68 - } 69 - } 70 - } 71 - 72 - // TODO: sync with the Since field to make sure we don't miss events we care about 73 - /* 74 - if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil { 75 - return fmt.Errorf("failed to update rev: %w", err) 76 - } 77 - */ 78 - 79 - return nil 80 - } 81 - 82 - func (b *PostgresBackend) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error { 83 - start := time.Now() 84 - 85 - rr, err := b.getOrCreateRepo(ctx, repo) 86 - if err != nil { 87 - return fmt.Errorf("get user failed: %w", err) 88 - } 89 - 90 - lrev, err := b.revForRepo(rr) 91 - if err != nil { 92 - return err 93 - } 94 - if lrev != "" { 95 - if rev < lrev { 96 - slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path) 97 - return nil 98 - } 99 - } 100 - 101 - parts := strings.Split(path, "/") 102 - if len(parts) != 2 { 103 - return fmt.Errorf("invalid path in HandleCreate: %q", path) 104 - } 105 - col := parts[0] 106 - rkey := parts[1] 107 - 108 - defer func() { 109 - handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds())) 110 - }() 111 - 112 - if rkey == "" { 113 - fmt.Printf("messed up path: %q\n", rkey) 114 - } 115 - 116 - switch col { 117 - case "app.bsky.feed.post": 118 - if err := b.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil { 119 - return err 120 - } 121 - case "app.bsky.feed.like": 122 - if err := b.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil { 123 - return err 124 - } 125 - case "app.bsky.feed.repost": 126 - if err := b.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil { 127 - return err 128 - } 129 - case "app.bsky.graph.follow": 130 - if err := b.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil { 131 - return err 132 - } 133 - case "app.bsky.graph.block": 134 - if err := b.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil { 135 - return err 136 - } 137 - case "app.bsky.graph.list": 138 - if err := b.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil { 139 - return err 140 - } 141 - case "app.bsky.graph.listitem": 142 - if err := b.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil { 143 - return err 144 - } 145 - case "app.bsky.graph.listblock": 146 - if err := b.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil { 147 - return err 148 - } 149 - case "app.bsky.actor.profile": 150 - if err := b.HandleCreateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil { 151 - return err 152 - } 153 - case "app.bsky.feed.generator": 154 - if err := b.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil { 155 - return err 156 - } 157 - case "app.bsky.feed.threadgate": 158 - if err := b.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil { 159 - return err 160 - } 161 - case "chat.bsky.actor.declaration": 162 - if err := b.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil { 163 - return err 164 - } 165 - case "app.bsky.feed.postgate": 166 - if err := b.HandleCreatePostGate(ctx, rr, rkey, *rec, *cid); err != nil { 167 - return err 168 - } 169 - case "app.bsky.graph.starterpack": 170 - if err := b.HandleCreateStarterPack(ctx, rr, rkey, *rec, *cid); err != nil { 171 - return err 172 - } 173 - default: 174 - slog.Debug("unrecognized record type", "repo", repo, "path", path, "rev", rev) 175 - } 176 - 177 - b.revCache.Add(rr.ID, rev) 178 - return nil 179 - } 180 - 181 - func (b *PostgresBackend) HandleCreatePost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 182 - exists, err := b.checkPostExists(ctx, repo, rkey) 183 - if err != nil { 184 - return err 185 - } 186 - 187 - // still technically a race condition if two creates for the same post happen concurrently... probably fine 188 - if exists { 189 - return nil 190 - } 191 - 192 - var rec bsky.FeedPost 193 - if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 194 - uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey 195 - slog.Warn("skipping post with malformed data", "uri", uri, "error", err) 196 - return nil // Skip this post rather than failing the entire event 197 - } 198 - 199 - reldids := []string{repo.Did} 200 - // care about a post if its in a thread of a user we are interested in 201 - if rec.Reply != nil && rec.Reply.Parent != nil && rec.Reply.Root != nil { 202 - reldids = append(reldids, rec.Reply.Parent.Uri, rec.Reply.Root.Uri) 203 - } 204 - // TODO: maybe also care if its mentioning a user we care about or quoting a user we care about? 205 - if !b.anyRelevantIdents(reldids...) { 206 - return nil 207 - } 208 - 209 - uri := "at://" + repo.Did + "/app.bsky.feed.post/" + rkey 210 - slog.Warn("adding post", "uri", uri) 211 - 212 - created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 213 - if err != nil { 214 - return fmt.Errorf("invalid timestamp: %w", err) 215 - } 216 - 217 - p := Post{ 218 - Created: created.Time(), 219 - Indexed: time.Now(), 220 - Author: repo.ID, 221 - Rkey: rkey, 222 - Raw: recb, 223 - Cid: cc.String(), 224 - } 225 - 226 - if rec.Reply != nil && rec.Reply.Parent != nil { 227 - if rec.Reply.Root == nil { 228 - return fmt.Errorf("post reply had nil root") 229 - } 230 - 231 - pinfo, err := b.postInfoForUri(ctx, rec.Reply.Parent.Uri) 232 - if err != nil { 233 - return fmt.Errorf("getting reply parent: %w", err) 234 - } 235 - 236 - p.ReplyTo = pinfo.ID 237 - p.ReplyToUsr = pinfo.Author 238 - 239 - thread, err := b.postIDForUri(ctx, rec.Reply.Root.Uri) 240 - if err != nil { 241 - return fmt.Errorf("getting thread root: %w", err) 242 - } 243 - 244 - p.InThread = thread 245 - 246 - if p.ReplyToUsr == b.s.myrepo.ID { 247 - if err := b.s.AddNotification(ctx, b.s.myrepo.ID, p.Author, uri, cc, NotifKindReply); err != nil { 248 - slog.Warn("failed to create notification", "uri", uri, "error", err) 249 - } 250 - } 251 - } 252 - 253 - if rec.Embed != nil { 254 - var rpref string 255 - if rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil { 256 - rpref = rec.Embed.EmbedRecord.Record.Uri 257 - } 258 - if rec.Embed.EmbedRecordWithMedia != nil && 259 - rec.Embed.EmbedRecordWithMedia.Record != nil && 260 - rec.Embed.EmbedRecordWithMedia.Record.Record != nil { 261 - rpref = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri 262 - } 263 - 264 - if rpref != "" && strings.Contains(rpref, "app.bsky.feed.post") { 265 - rp, err := b.postIDForUri(ctx, rpref) 266 - if err != nil { 267 - return fmt.Errorf("getting quote subject: %w", err) 268 - } 269 - 270 - p.Reposting = rp 271 - } 272 - } 273 - 274 - if err := b.doPostCreate(ctx, &p); err != nil { 275 - return err 276 - } 277 - 278 - // Check for mentions and create notifications 279 - if rec.Facets != nil { 280 - for _, facet := range rec.Facets { 281 - for _, feature := range facet.Features { 282 - if feature.RichtextFacet_Mention != nil { 283 - mentionDid := feature.RichtextFacet_Mention.Did 284 - // This is a mention 285 - mentionedRepo, err := b.getOrCreateRepo(ctx, mentionDid) 286 - if err != nil { 287 - slog.Warn("failed to get repo for mention", "did", mentionDid, "error", err) 288 - continue 289 - } 290 - 291 - // Create notification if the mentioned user is the current user 292 - if mentionedRepo.ID == b.s.myrepo.ID { 293 - if err := b.s.AddNotification(ctx, b.s.myrepo.ID, p.Author, uri, cc, NotifKindMention); err != nil { 294 - slog.Warn("failed to create mention notification", "uri", uri, "error", err) 295 - } 296 - } 297 - } 298 - } 299 - } 300 - } 301 - 302 - b.postInfoCache.Add(uri, cachedPostInfo{ 303 - ID: p.ID, 304 - Author: p.Author, 305 - }) 306 - 307 - return nil 308 - } 309 - 310 - func (b *PostgresBackend) doPostCreate(ctx context.Context, p *Post) error { 311 - /* 312 - if err := b.db.Clauses(clause.OnConflict{ 313 - Columns: []clause.Column{{Name: "author"}, {Name: "rkey"}}, 314 - DoUpdates: clause.AssignmentColumns([]string{"cid", "not_found", "raw", "created", "indexed"}), 315 - }).Create(p).Error; err != nil { 316 - return err 317 - } 318 - */ 319 - 320 - query := ` 321 - INSERT INTO posts (author, rkey, cid, not_found, raw, created, indexed, reposting, reply_to, reply_to_usr, in_thread) 322 - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) 323 - ON CONFLICT (author, rkey) 324 - DO UPDATE SET 325 - cid = $3, 326 - not_found = $4, 327 - raw = $5, 328 - created = $6, 329 - indexed = $7, 330 - reposting = $8, 331 - reply_to = $9, 332 - reply_to_usr = $10, 333 - in_thread = $11 334 - RETURNING id 335 - ` 336 - 337 - // Execute the query with parameters from the Post struct 338 - if err := b.pgx.QueryRow( 339 - ctx, 340 - query, 341 - p.Author, 342 - p.Rkey, 343 - p.Cid, 344 - p.NotFound, 345 - p.Raw, 346 - p.Created, 347 - p.Indexed, 348 - p.Reposting, 349 - p.ReplyTo, 350 - p.ReplyToUsr, 351 - p.InThread, 352 - ).Scan(&p.ID); err != nil { 353 - return err 354 - } 355 - 356 - return nil 357 - } 358 - 359 - func (b *PostgresBackend) HandleCreateLike(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 360 - var rec bsky.FeedLike 361 - if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 362 - return err 363 - } 364 - 365 - if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) { 366 - return nil 367 - } 368 - 369 - created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 370 - if err != nil { 371 - return fmt.Errorf("invalid timestamp: %w", err) 372 - } 373 - 374 - pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri) 375 - if err != nil { 376 - return fmt.Errorf("getting like subject: %w", err) 377 - } 378 - 379 - if _, err := b.pgx.Exec(ctx, `INSERT INTO "likes" ("created","indexed","author","rkey","subject","cid") VALUES ($1, $2, $3, $4, $5, $6)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID, cc.String()); err != nil { 380 - pgErr, ok := err.(*pgconn.PgError) 381 - if ok && pgErr.Code == "23505" { 382 - return nil 383 - } 384 - return err 385 - } 386 - 387 - // Create notification if the liked post belongs to the current user 388 - if pinfo.Author == b.s.myrepo.ID { 389 - uri := fmt.Sprintf("at://%s/app.bsky.feed.like/%s", repo.Did, rkey) 390 - if err := b.s.AddNotification(ctx, b.s.myrepo.ID, repo.ID, uri, cc, NotifKindLike); err != nil { 391 - slog.Warn("failed to create like notification", "uri", uri, "error", err) 392 - } 393 - } 394 - 395 - return nil 396 - } 397 - 398 - func (b *PostgresBackend) HandleCreateRepost(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 399 - var rec bsky.FeedRepost 400 - if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 401 - return err 402 - } 403 - 404 - if !b.anyRelevantIdents(repo.Did, rec.Subject.Uri) { 405 - return nil 406 - } 407 - 408 - created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 409 - if err != nil { 410 - return fmt.Errorf("invalid timestamp: %w", err) 411 - } 412 - 413 - pinfo, err := b.postInfoForUri(ctx, rec.Subject.Uri) 414 - if err != nil { 415 - return fmt.Errorf("getting repost subject: %w", err) 416 - } 417 - 418 - if _, err := b.pgx.Exec(ctx, `INSERT INTO "reposts" ("created","indexed","author","rkey","subject") VALUES ($1, $2, $3, $4, $5)`, created.Time(), time.Now(), repo.ID, rkey, pinfo.ID); err != nil { 419 - pgErr, ok := err.(*pgconn.PgError) 420 - if ok && pgErr.Code == "23505" { 421 - return nil 422 - } 423 - return err 424 - } 425 - 426 - // Create notification if the reposted post belongs to the current user 427 - if pinfo.Author == b.s.myrepo.ID { 428 - uri := fmt.Sprintf("at://%s/app.bsky.feed.repost/%s", repo.Did, rkey) 429 - if err := b.s.AddNotification(ctx, b.s.myrepo.ID, repo.ID, uri, cc, NotifKindRepost); err != nil { 430 - slog.Warn("failed to create repost notification", "uri", uri, "error", err) 431 - } 432 - } 433 - 434 - return nil 435 - } 436 - 437 - func (b *PostgresBackend) HandleCreateFollow(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 438 - var rec bsky.GraphFollow 439 - if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 440 - return err 441 - } 442 - 443 - if !b.anyRelevantIdents(repo.Did, rec.Subject) { 444 - return nil 445 - } 446 - 447 - created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 448 - if err != nil { 449 - return fmt.Errorf("invalid timestamp: %w", err) 450 - } 451 - 452 - subj, err := b.getOrCreateRepo(ctx, rec.Subject) 453 - if err != nil { 454 - return err 455 - } 456 - 457 - if _, err := b.pgx.Exec(ctx, "INSERT INTO follows (created, indexed, author, rkey, subject) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING", created.Time(), time.Now(), repo.ID, rkey, subj.ID); err != nil { 458 - return err 459 - } 460 - 461 - return nil 462 - } 463 - 464 - func (b *PostgresBackend) HandleCreateBlock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 465 - var rec bsky.GraphBlock 466 - if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 467 - return err 468 - } 469 - 470 - if !b.anyRelevantIdents(repo.Did, rec.Subject) { 471 - return nil 472 - } 473 - 474 - created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 475 - if err != nil { 476 - return fmt.Errorf("invalid timestamp: %w", err) 477 - } 478 - 479 - subj, err := b.getOrCreateRepo(ctx, rec.Subject) 480 - if err != nil { 481 - return err 482 - } 483 - 484 - if err := b.db.Create(&Block{ 485 - Created: created.Time(), 486 - Indexed: time.Now(), 487 - Author: repo.ID, 488 - Rkey: rkey, 489 - Subject: subj.ID, 490 - }).Error; err != nil { 491 - return err 492 - } 493 - 494 - return nil 495 - } 496 - 497 - func (b *PostgresBackend) HandleCreateList(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 498 - var rec bsky.GraphList 499 - if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 500 - return err 501 - } 502 - 503 - if !b.anyRelevantIdents(repo.Did) { 504 - return nil 505 - } 506 - 507 - created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 508 - if err != nil { 509 - return fmt.Errorf("invalid timestamp: %w", err) 510 - } 511 - 512 - if err := b.db.Create(&List{ 513 - Created: created.Time(), 514 - Indexed: time.Now(), 515 - Author: repo.ID, 516 - Rkey: rkey, 517 - Raw: recb, 518 - }).Error; err != nil { 519 - return err 520 - } 521 - 522 - return nil 523 - } 524 - 525 - func (b *PostgresBackend) HandleCreateListitem(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 526 - var rec bsky.GraphListitem 527 - if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 528 - return err 529 - } 530 - if !b.anyRelevantIdents(repo.Did) { 531 - return nil 532 - } 533 - 534 - created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 535 - if err != nil { 536 - return fmt.Errorf("invalid timestamp: %w", err) 537 - } 538 - 539 - subj, err := b.getOrCreateRepo(ctx, rec.Subject) 540 - if err != nil { 541 - return err 542 - } 543 - 544 - list, err := b.getOrCreateList(ctx, rec.List) 545 - if err != nil { 546 - return err 547 - } 548 - 549 - if err := b.db.Create(&ListItem{ 550 - Created: created.Time(), 551 - Indexed: time.Now(), 552 - Author: repo.ID, 553 - Rkey: rkey, 554 - Subject: subj.ID, 555 - List: list.ID, 556 - }).Error; err != nil { 557 - return err 558 - } 559 - 560 - return nil 561 - } 562 - 563 - func (b *PostgresBackend) HandleCreateListblock(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 564 - var rec bsky.GraphListblock 565 - if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 566 - return err 567 - } 568 - 569 - if !b.anyRelevantIdents(repo.Did, rec.Subject) { 570 - return nil 571 - } 572 - 573 - created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 574 - if err != nil { 575 - return fmt.Errorf("invalid timestamp: %w", err) 576 - } 577 - 578 - list, err := b.getOrCreateList(ctx, rec.Subject) 579 - if err != nil { 580 - return err 581 - } 582 - 583 - if err := b.db.Create(&ListBlock{ 584 - Created: created.Time(), 585 - Indexed: time.Now(), 586 - Author: repo.ID, 587 - Rkey: rkey, 588 - List: list.ID, 589 - }).Error; err != nil { 590 - return err 591 - } 592 - 593 - return nil 594 - } 595 - 596 - func (b *PostgresBackend) HandleCreateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error { 597 - if !b.anyRelevantIdents(repo.Did) { 598 - return nil 599 - } 600 - 601 - if err := b.db.Create(&Profile{ 602 - //Created: created.Time(), 603 - Indexed: time.Now(), 604 - Repo: repo.ID, 605 - Raw: recb, 606 - Rev: rev, 607 - }).Error; err != nil { 608 - return err 609 - } 610 - 611 - return nil 612 - } 613 - 614 - func (b *PostgresBackend) HandleUpdateProfile(ctx context.Context, repo *Repo, rkey, rev string, recb []byte, cc cid.Cid) error { 615 - if !b.anyRelevantIdents(repo.Did) { 616 - return nil 617 - } 618 - 619 - if err := b.db.Create(&Profile{ 620 - Indexed: time.Now(), 621 - Repo: repo.ID, 622 - Raw: recb, 623 - Rev: rev, 624 - }).Error; err != nil { 625 - return err 626 - } 627 - 628 - return nil 629 - } 630 - 631 - func (b *PostgresBackend) HandleCreateFeedGenerator(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 632 - if !b.anyRelevantIdents(repo.Did) { 633 - return nil 634 - } 635 - 636 - var rec bsky.FeedGenerator 637 - if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 638 - return err 639 - } 640 - 641 - created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 642 - if err != nil { 643 - return fmt.Errorf("invalid timestamp: %w", err) 644 - } 645 - 646 - if err := b.db.Create(&FeedGenerator{ 647 - Created: created.Time(), 648 - Indexed: time.Now(), 649 - Author: repo.ID, 650 - Rkey: rkey, 651 - Did: rec.Did, 652 - Raw: recb, 653 - }).Error; err != nil { 654 - return err 655 - } 656 - 657 - return nil 658 - } 659 - 660 - func (b *PostgresBackend) HandleCreateThreadgate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 661 - if !b.anyRelevantIdents(repo.Did) { 662 - return nil 663 - } 664 - var rec bsky.FeedThreadgate 665 - if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 666 - return err 667 - } 668 - 669 - created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 670 - if err != nil { 671 - return fmt.Errorf("invalid timestamp: %w", err) 672 - } 673 - 674 - pid, err := b.postIDForUri(ctx, rec.Post) 675 - if err != nil { 676 - return err 677 - } 678 - 679 - if err := b.db.Create(&ThreadGate{ 680 - Created: created.Time(), 681 - Indexed: time.Now(), 682 - Author: repo.ID, 683 - Rkey: rkey, 684 - Post: pid, 685 - }).Error; err != nil { 686 - return err 687 - } 688 - 689 - return nil 690 - } 691 - 692 - func (b *PostgresBackend) HandleCreateChatDeclaration(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 693 - // TODO: maybe track these? 694 - return nil 695 - } 696 - 697 - func (b *PostgresBackend) HandleCreatePostGate(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 698 - if !b.anyRelevantIdents(repo.Did) { 699 - return nil 700 - } 701 - var rec bsky.FeedPostgate 702 - if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 703 - return err 704 - } 705 - created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 706 - if err != nil { 707 - return fmt.Errorf("invalid timestamp: %w", err) 708 - } 709 - 710 - refPost, err := b.postInfoForUri(ctx, rec.Post) 711 - if err != nil { 712 - return err 713 - } 714 - 715 - if err := b.db.Create(&PostGate{ 716 - Created: created.Time(), 717 - Indexed: time.Now(), 718 - Author: repo.ID, 719 - Rkey: rkey, 720 - Subject: refPost.ID, 721 - Raw: recb, 722 - }).Error; err != nil { 723 - return err 724 - } 725 - 726 - return nil 727 - } 728 - 729 - func (b *PostgresBackend) HandleCreateStarterPack(ctx context.Context, repo *Repo, rkey string, recb []byte, cc cid.Cid) error { 730 - if !b.anyRelevantIdents(repo.Did) { 731 - return nil 732 - } 733 - var rec bsky.GraphStarterpack 734 - if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 735 - return err 736 - } 737 - created, err := syntax.ParseDatetimeLenient(rec.CreatedAt) 738 - if err != nil { 739 - return fmt.Errorf("invalid timestamp: %w", err) 740 - } 741 - 742 - list, err := b.getOrCreateList(ctx, rec.List) 743 - if err != nil { 744 - return err 745 - } 746 - 747 - if err := b.db.Create(&StarterPack{ 748 - Created: created.Time(), 749 - Indexed: time.Now(), 750 - Author: repo.ID, 751 - Rkey: rkey, 752 - Raw: recb, 753 - List: list.ID, 754 - }).Error; err != nil { 755 - return err 756 - } 757 - 758 - return nil 759 - } 760 - 761 - func (b *PostgresBackend) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error { 762 - start := time.Now() 763 - 764 - rr, err := b.getOrCreateRepo(ctx, repo) 765 - if err != nil { 766 - return fmt.Errorf("get user failed: %w", err) 767 - } 768 - 769 - lrev, err := b.revForRepo(rr) 770 - if err != nil { 771 - return err 772 - } 773 - if lrev != "" { 774 - if rev < lrev { 775 - //slog.Info("skipping old rev create", "did", rr.Did, "rev", rev, "oldrev", lrev, "path", path) 776 - return nil 777 - } 778 - } 779 - 780 - parts := strings.Split(path, "/") 781 - if len(parts) != 2 { 782 - return fmt.Errorf("invalid path in HandleCreate: %q", path) 783 - } 784 - col := parts[0] 785 - rkey := parts[1] 786 - 787 - defer func() { 788 - handleOpHist.WithLabelValues("update", col).Observe(float64(time.Since(start).Milliseconds())) 789 - }() 790 - 791 - if rkey == "" { 792 - fmt.Printf("messed up path: %q\n", rkey) 793 - } 794 - 795 - switch col { 796 - /* 797 - case "app.bsky.feed.post": 798 - if err := s.HandleCreatePost(ctx, rr, rkey, *rec, *cid); err != nil { 799 - return err 800 - } 801 - case "app.bsky.feed.like": 802 - if err := s.HandleCreateLike(ctx, rr, rkey, *rec, *cid); err != nil { 803 - return err 804 - } 805 - case "app.bsky.feed.repost": 806 - if err := s.HandleCreateRepost(ctx, rr, rkey, *rec, *cid); err != nil { 807 - return err 808 - } 809 - case "app.bsky.graph.follow": 810 - if err := s.HandleCreateFollow(ctx, rr, rkey, *rec, *cid); err != nil { 811 - return err 812 - } 813 - case "app.bsky.graph.block": 814 - if err := s.HandleCreateBlock(ctx, rr, rkey, *rec, *cid); err != nil { 815 - return err 816 - } 817 - case "app.bsky.graph.list": 818 - if err := s.HandleCreateList(ctx, rr, rkey, *rec, *cid); err != nil { 819 - return err 820 - } 821 - case "app.bsky.graph.listitem": 822 - if err := s.HandleCreateListitem(ctx, rr, rkey, *rec, *cid); err != nil { 823 - return err 824 - } 825 - case "app.bsky.graph.listblock": 826 - if err := s.HandleCreateListblock(ctx, rr, rkey, *rec, *cid); err != nil { 827 - return err 828 - } 829 - */ 830 - case "app.bsky.actor.profile": 831 - if err := b.HandleUpdateProfile(ctx, rr, rkey, rev, *rec, *cid); err != nil { 832 - return err 833 - } 834 - /* 835 - case "app.bsky.feed.generator": 836 - if err := s.HandleCreateFeedGenerator(ctx, rr, rkey, *rec, *cid); err != nil { 837 - return err 838 - } 839 - case "app.bsky.feed.threadgate": 840 - if err := s.HandleCreateThreadgate(ctx, rr, rkey, *rec, *cid); err != nil { 841 - return err 842 - } 843 - case "chat.bsky.actor.declaration": 844 - if err := s.HandleCreateChatDeclaration(ctx, rr, rkey, *rec, *cid); err != nil { 845 - return err 846 - } 847 - */ 848 - default: 849 - slog.Debug("unrecognized record type in update", "repo", repo, "path", path, "rev", rev) 850 - } 851 - 852 - return nil 853 - } 854 - 855 - func (b *PostgresBackend) HandleDelete(ctx context.Context, repo string, rev string, path string) error { 856 - start := time.Now() 857 - 858 - rr, err := b.getOrCreateRepo(ctx, repo) 859 - if err != nil { 860 - return fmt.Errorf("get user failed: %w", err) 861 - } 862 - 863 - lrev, ok := b.revCache.Get(rr.ID) 864 - if ok { 865 - if rev < lrev { 866 - //slog.Info("skipping old rev delete", "did", rr.Did, "rev", rev, "oldrev", lrev) 867 - return nil 868 - } 869 - } 870 - 871 - parts := strings.Split(path, "/") 872 - if len(parts) != 2 { 873 - return fmt.Errorf("invalid path in HandleDelete: %q", path) 874 - } 875 - col := parts[0] 876 - rkey := parts[1] 877 - 878 - defer func() { 879 - handleOpHist.WithLabelValues("create", col).Observe(float64(time.Since(start).Milliseconds())) 880 - }() 881 - 882 - switch col { 883 - case "app.bsky.feed.post": 884 - if err := b.HandleDeletePost(ctx, rr, rkey); err != nil { 885 - return err 886 - } 887 - case "app.bsky.feed.like": 888 - if err := b.HandleDeleteLike(ctx, rr, rkey); err != nil { 889 - return err 890 - } 891 - case "app.bsky.feed.repost": 892 - if err := b.HandleDeleteRepost(ctx, rr, rkey); err != nil { 893 - return err 894 - } 895 - case "app.bsky.graph.follow": 896 - if err := b.HandleDeleteFollow(ctx, rr, rkey); err != nil { 897 - return err 898 - } 899 - case "app.bsky.graph.block": 900 - if err := b.HandleDeleteBlock(ctx, rr, rkey); err != nil { 901 - return err 902 - } 903 - case "app.bsky.graph.list": 904 - if err := b.HandleDeleteList(ctx, rr, rkey); err != nil { 905 - return err 906 - } 907 - case "app.bsky.graph.listitem": 908 - if err := b.HandleDeleteListitem(ctx, rr, rkey); err != nil { 909 - return err 910 - } 911 - case "app.bsky.graph.listblock": 912 - if err := b.HandleDeleteListblock(ctx, rr, rkey); err != nil { 913 - return err 914 - } 915 - case "app.bsky.actor.profile": 916 - if err := b.HandleDeleteProfile(ctx, rr, rkey); err != nil { 917 - return err 918 - } 919 - case "app.bsky.feed.generator": 920 - if err := b.HandleDeleteFeedGenerator(ctx, rr, rkey); err != nil { 921 - return err 922 - } 923 - case "app.bsky.feed.threadgate": 924 - if err := b.HandleDeleteThreadgate(ctx, rr, rkey); err != nil { 925 - return err 926 - } 927 - default: 928 - slog.Warn("delete unrecognized record type", "repo", repo, "path", path, "rev", rev) 929 - } 930 - 931 - b.revCache.Add(rr.ID, rev) 932 - return nil 933 - } 934 - 935 - func (b *PostgresBackend) HandleDeletePost(ctx context.Context, repo *Repo, rkey string) error { 936 - var p Post 937 - if err := b.db.Find(&p, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 938 - return err 939 - } 940 - 941 - if p.ID == 0 { 942 - //slog.Warn("delete of unknown post record", "repo", repo.Did, "rkey", rkey) 943 - return nil 944 - } 945 - 946 - if err := b.db.Delete(&Post{}, p.ID).Error; err != nil { 947 - return err 948 - } 949 - 950 - return nil 951 - } 952 - 953 - func (b *PostgresBackend) HandleDeleteLike(ctx context.Context, repo *Repo, rkey string) error { 954 - var like Like 955 - if err := b.db.Find(&like, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 956 - return err 957 - } 958 - 959 - if like.ID == 0 { 960 - //slog.Warn("delete of missing like", "repo", repo.Did, "rkey", rkey) 961 - return nil 962 - } 963 - 964 - if err := b.db.Exec("DELETE FROM likes WHERE id = ?", like.ID).Error; err != nil { 965 - return err 966 - } 967 - 968 - return nil 969 - } 970 - 971 - func (b *PostgresBackend) HandleDeleteRepost(ctx context.Context, repo *Repo, rkey string) error { 972 - var repost Repost 973 - if err := b.db.Find(&repost, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 974 - return err 975 - } 976 - 977 - if repost.ID == 0 { 978 - //return fmt.Errorf("delete of missing repost: %s %s", repo.Did, rkey) 979 - return nil 980 - } 981 - 982 - if err := b.db.Exec("DELETE FROM reposts WHERE id = ?", repost.ID).Error; err != nil { 983 - return err 984 - } 985 - 986 - return nil 987 - } 988 - 989 - func (b *PostgresBackend) HandleDeleteFollow(ctx context.Context, repo *Repo, rkey string) error { 990 - var follow Follow 991 - if err := b.db.Find(&follow, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 992 - return err 993 - } 994 - 995 - if follow.ID == 0 { 996 - //slog.Warn("delete of missing follow", "repo", repo.Did, "rkey", rkey) 997 - return nil 998 - } 999 - 1000 - if err := b.db.Exec("DELETE FROM follows WHERE id = ?", follow.ID).Error; err != nil { 1001 - return err 1002 - } 1003 - 1004 - return nil 1005 - } 1006 - 1007 - func (b *PostgresBackend) HandleDeleteBlock(ctx context.Context, repo *Repo, rkey string) error { 1008 - var block Block 1009 - if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1010 - return err 1011 - } 1012 - 1013 - if block.ID == 0 { 1014 - //slog.Warn("delete of missing block", "repo", repo.Did, "rkey", rkey) 1015 - return nil 1016 - } 1017 - 1018 - if err := b.db.Exec("DELETE FROM blocks WHERE id = ?", block.ID).Error; err != nil { 1019 - return err 1020 - } 1021 - 1022 - return nil 1023 - } 1024 - 1025 - func (b *PostgresBackend) HandleDeleteList(ctx context.Context, repo *Repo, rkey string) error { 1026 - var list List 1027 - if err := b.db.Find(&list, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1028 - return err 1029 - } 1030 - 1031 - if list.ID == 0 { 1032 - return nil 1033 - //return fmt.Errorf("delete of missing list: %s %s", repo.Did, rkey) 1034 - } 1035 - 1036 - if err := b.db.Exec("DELETE FROM lists WHERE id = ?", list.ID).Error; err != nil { 1037 - return err 1038 - } 1039 - 1040 - return nil 1041 - } 1042 - 1043 - func (b *PostgresBackend) HandleDeleteListitem(ctx context.Context, repo *Repo, rkey string) error { 1044 - var item ListItem 1045 - if err := b.db.Find(&item, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1046 - return err 1047 - } 1048 - 1049 - if item.ID == 0 { 1050 - return nil 1051 - //return fmt.Errorf("delete of missing listitem: %s %s", repo.Did, rkey) 1052 - } 1053 - 1054 - if err := b.db.Exec("DELETE FROM list_items WHERE id = ?", item.ID).Error; err != nil { 1055 - return err 1056 - } 1057 - 1058 - return nil 1059 - } 1060 - 1061 - func (b *PostgresBackend) HandleDeleteListblock(ctx context.Context, repo *Repo, rkey string) error { 1062 - var block ListBlock 1063 - if err := b.db.Find(&block, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1064 - return err 1065 - } 1066 - 1067 - if block.ID == 0 { 1068 - return nil 1069 - //return fmt.Errorf("delete of missing listblock: %s %s", repo.Did, rkey) 1070 - } 1071 - 1072 - if err := b.db.Exec("DELETE FROM list_blocks WHERE id = ?", block.ID).Error; err != nil { 1073 - return err 1074 - } 1075 - 1076 - return nil 1077 - } 1078 - 1079 - func (b *PostgresBackend) HandleDeleteFeedGenerator(ctx context.Context, repo *Repo, rkey string) error { 1080 - var feedgen FeedGenerator 1081 - if err := b.db.Find(&feedgen, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1082 - return err 1083 - } 1084 - 1085 - if feedgen.ID == 0 { 1086 - return nil 1087 - //return fmt.Errorf("delete of missing feedgen: %s %s", repo.Did, rkey) 1088 - } 1089 - 1090 - if err := b.db.Exec("DELETE FROM feed_generators WHERE id = ?", feedgen.ID).Error; err != nil { 1091 - return err 1092 - } 1093 - 1094 - return nil 1095 - } 1096 - 1097 - func (b *PostgresBackend) HandleDeleteThreadgate(ctx context.Context, repo *Repo, rkey string) error { 1098 - var threadgate ThreadGate 1099 - if err := b.db.Find(&threadgate, "author = ? AND rkey = ?", repo.ID, rkey).Error; err != nil { 1100 - return err 1101 - } 1102 - 1103 - if threadgate.ID == 0 { 1104 - return nil 1105 - //return fmt.Errorf("delete of missing threadgate: %s %s", repo.Did, rkey) 1106 - } 1107 - 1108 - if err := b.db.Exec("DELETE FROM thread_gates WHERE id = ?", threadgate.ID).Error; err != nil { 1109 - return err 1110 - } 1111 - 1112 - return nil 1113 - } 1114 - 1115 - func (b *PostgresBackend) HandleDeleteProfile(ctx context.Context, repo *Repo, rkey string) error { 1116 - var profile Profile 1117 - if err := b.db.Find(&profile, "repo = ?", repo.ID).Error; err != nil { 1118 - return err 1119 - } 1120 - 1121 - if profile.ID == 0 { 1122 - return nil 1123 - } 1124 - 1125 - if err := b.db.Exec("DELETE FROM profiles WHERE id = ?", profile.ID).Error; err != nil { 1126 - return err 1127 - } 1128 - 1129 - return nil 1130 - }
+34 -33
handlers.go
··· 17 17 "github.com/labstack/gommon/log" 18 18 "github.com/whyrusleeping/market/models" 19 19 20 + "github.com/whyrusleeping/konbini/backend" 20 21 . "github.com/whyrusleeping/konbini/models" 21 22 ) 22 23 ··· 56 57 57 58 func (s *Server) handleGetRelevantDids(e echo.Context) error { 58 59 return e.JSON(200, map[string]any{ 59 - "dids": s.backend.relevantDids, 60 + "dids": s.backend.GetRelevantDids(), 60 61 }) 61 62 } 62 63 ··· 105 106 106 107 postUri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", did, rkey) 107 108 108 - p, err := s.backend.getPostByUri(ctx, postUri, "*") 109 + p, err := s.backend.GetPostByUri(ctx, postUri, "*") 109 110 if err != nil { 110 111 return err 111 112 } ··· 134 135 return err 135 136 } 136 137 137 - r, err := s.backend.getOrCreateRepo(ctx, accdid) 138 + r, err := s.backend.GetOrCreateRepo(ctx, accdid) 138 139 if err != nil { 139 140 return err 140 141 } 141 142 142 143 var profile models.Profile 143 - if err := s.backend.db.Find(&profile, "repo = ?", r.ID).Error; err != nil { 144 + if err := s.db.Find(&profile, "repo = ?", r.ID).Error; err != nil { 144 145 return err 145 146 } 146 147 ··· 169 170 return err 170 171 } 171 172 172 - r, err := s.backend.getOrCreateRepo(ctx, accdid) 173 + r, err := s.backend.GetOrCreateRepo(ctx, accdid) 173 174 if err != nil { 174 175 return err 175 176 } ··· 188 189 } 189 190 190 191 var dbposts []models.Post 191 - if err := s.backend.db.Raw("SELECT * FROM posts WHERE author = ? AND created < ? ORDER BY created DESC LIMIT ?", r.ID, tcursor, limit).Scan(&dbposts).Error; err != nil { 192 + if err := s.db.Raw("SELECT * FROM posts WHERE author = ? AND created < ? ORDER BY created DESC LIMIT ?", r.ID, tcursor, limit).Scan(&dbposts).Error; err != nil { 192 193 return err 193 194 } 194 195 ··· 258 259 func (s *Server) handleGetFollowingFeed(e echo.Context) error { 259 260 ctx := e.Request().Context() 260 261 261 - myr, err := s.backend.getOrCreateRepo(ctx, s.mydid) 262 + myr, err := s.backend.GetOrCreateRepo(ctx, s.mydid) 262 263 if err != nil { 263 264 return err 264 265 } ··· 276 277 tcursor = t 277 278 } 278 279 var dbposts []models.Post 279 - if err := s.backend.db.Raw("select * from posts where reply_to = 0 AND author IN (select subject from follows where author = ?) AND created < ? order by created DESC limit ?", myr.ID, tcursor, limit).Scan(&dbposts).Error; err != nil { 280 + if err := s.db.Raw("select * from posts where reply_to = 0 AND author IN (select subject from follows where author = ?) AND created < ? order by created DESC limit ?", myr.ID, tcursor, limit).Scan(&dbposts).Error; err != nil { 280 281 return err 281 282 } 282 283 ··· 296 297 297 298 func (s *Server) getAuthorInfo(ctx context.Context, r *models.Repo) (*authorInfo, error) { 298 299 var profile models.Profile 299 - if err := s.backend.db.Find(&profile, "repo = ?", r.ID).Error; err != nil { 300 + if err := s.db.Find(&profile, "repo = ?", r.ID).Error; err != nil { 300 301 return nil, err 301 302 } 302 303 ··· 333 334 334 335 go func() { 335 336 defer wg.Done() 336 - if err := s.backend.db.Raw("SELECT count(*) FROM likes WHERE subject = ?", pid).Scan(&pc.Likes).Error; err != nil { 337 + if err := s.db.Raw("SELECT count(*) FROM likes WHERE subject = ?", pid).Scan(&pc.Likes).Error; err != nil { 337 338 slog.Error("failed to get likes count", "post", pid, "error", err) 338 339 } 339 340 }() 340 341 341 342 go func() { 342 343 defer wg.Done() 343 - if err := s.backend.db.Raw("SELECT count(*) FROM reposts WHERE subject = ?", pid).Scan(&pc.Reposts).Error; err != nil { 344 + if err := s.db.Raw("SELECT count(*) FROM reposts WHERE subject = ?", pid).Scan(&pc.Reposts).Error; err != nil { 344 345 slog.Error("failed to get reposts count", "post", pid, "error", err) 345 346 } 346 347 }() 347 348 348 349 go func() { 349 350 defer wg.Done() 350 - if err := s.backend.db.Raw("SELECT count(*) FROM posts WHERE reply_to = ?", pid).Scan(&pc.Replies).Error; err != nil { 351 + if err := s.db.Raw("SELECT count(*) FROM posts WHERE reply_to = ?", pid).Scan(&pc.Replies).Error; err != nil { 351 352 slog.Error("failed to get replies count", "post", pid, "error", err) 352 353 } 353 354 }() ··· 366 367 go func(ix int) { 367 368 defer wg.Done() 368 369 p := dbposts[ix] 369 - r, err := s.backend.getRepoByID(ctx, p.Author) 370 + r, err := s.backend.GetRepoByID(ctx, p.Author) 370 371 if err != nil { 371 372 fmt.Println("failed to get repo: ", err) 372 373 posts[ix] = postResponse{ ··· 434 435 435 436 func (s *Server) checkViewerLike(ctx context.Context, pid uint) *viewerLike { 436 437 var like Like 437 - if err := s.backend.db.Raw("SELECT * FROM likes WHERE subject = ? AND author = ?", pid, s.myrepo.ID).Scan(&like).Error; err != nil { 438 + if err := s.db.Raw("SELECT * FROM likes WHERE subject = ? AND author = ?", pid, s.myrepo.ID).Scan(&like).Error; err != nil { 438 439 slog.Error("failed to lookup like", "error", err) 439 440 return nil 440 441 } ··· 511 512 quotedURI := embedRecord.Record.Uri 512 513 quotedCid := embedRecord.Record.Cid 513 514 514 - quotedPost, err := s.backend.getPostByUri(ctx, quotedURI, "*") 515 + quotedPost, err := s.backend.GetPostByUri(ctx, quotedURI, "*") 515 516 if err != nil { 516 517 slog.Warn("failed to get quoted post", "uri", quotedURI, "error", err) 517 518 s.addMissingPost(ctx, quotedURI) ··· 529 530 return s.buildQuoteFallback(quotedURI, quotedCid) 530 531 } 531 532 532 - quotedRepo, err := s.backend.getRepoByID(ctx, quotedPost.Author) 533 + quotedRepo, err := s.backend.GetRepoByID(ctx, quotedPost.Author) 533 534 if err != nil { 534 535 slog.Warn("failed to get quoted post author", "error", err) 535 536 return s.buildQuoteFallback(quotedURI, quotedCid) ··· 576 577 577 578 // Get the requested post to find the thread root 578 579 var requestedPost models.Post 579 - if err := s.backend.db.Find(&requestedPost, "id = ?", postID).Error; err != nil { 580 + if err := s.db.Find(&requestedPost, "id = ?", postID).Error; err != nil { 580 581 return err 581 582 } 582 583 ··· 595 596 // Get all posts in this thread 596 597 var dbposts []models.Post 597 598 query := "SELECT * FROM posts WHERE id = ? OR in_thread = ? ORDER BY created ASC" 598 - if err := s.backend.db.Raw(query, rootPostID, rootPostID).Scan(&dbposts).Error; err != nil { 599 + if err := s.db.Raw(query, rootPostID, rootPostID).Scan(&dbposts).Error; err != nil { 599 600 return err 600 601 } 601 602 602 603 // Build response for each post 603 604 posts := []postResponse{} 604 605 for _, p := range dbposts { 605 - r, err := s.backend.getRepoByID(ctx, p.Author) 606 + r, err := s.backend.GetRepoByID(ctx, p.Author) 606 607 if err != nil { 607 608 return err 608 609 } ··· 676 677 677 678 // Get all likes for this post 678 679 var likes []models.Like 679 - if err := s.backend.db.Find(&likes, "subject = ?", postID).Error; err != nil { 680 + if err := s.db.Find(&likes, "subject = ?", postID).Error; err != nil { 680 681 return err 681 682 } 682 683 683 684 users := []engagementUser{} 684 685 for _, like := range likes { 685 - r, err := s.backend.getRepoByID(ctx, like.Author) 686 + r, err := s.backend.GetRepoByID(ctx, like.Author) 686 687 if err != nil { 687 688 slog.Error("failed to get repo for like author", "error", err) 688 689 continue ··· 697 698 698 699 // Get profile if available 699 700 var profile models.Profile 700 - s.backend.db.Find(&profile, "repo = ?", r.ID) 701 + s.db.Find(&profile, "repo = ?", r.ID) 701 702 702 703 var prof *bsky.ActorProfile 703 704 if len(profile.Raw) > 0 { ··· 736 737 737 738 // Get all reposts for this post 738 739 var reposts []models.Repost 739 - if err := s.backend.db.Find(&reposts, "subject = ?", postID).Error; err != nil { 740 + if err := s.db.Find(&reposts, "subject = ?", postID).Error; err != nil { 740 741 return err 741 742 } 742 743 743 744 users := []engagementUser{} 744 745 for _, repost := range reposts { 745 - r, err := s.backend.getRepoByID(ctx, repost.Author) 746 + r, err := s.backend.GetRepoByID(ctx, repost.Author) 746 747 if err != nil { 747 748 slog.Error("failed to get repo for repost author", "error", err) 748 749 continue ··· 757 758 758 759 // Get profile if available 759 760 var profile models.Profile 760 - s.backend.db.Find(&profile, "repo = ?", r.ID) 761 + s.db.Find(&profile, "repo = ?", r.ID) 761 762 762 763 var prof *bsky.ActorProfile 763 764 if len(profile.Raw) > 0 { ··· 796 797 797 798 // Get all replies to this post 798 799 var replies []models.Post 799 - if err := s.backend.db.Find(&replies, "reply_to = ?", postID).Error; err != nil { 800 + if err := s.db.Find(&replies, "reply_to = ?", postID).Error; err != nil { 800 801 return err 801 802 } 802 803 ··· 810 811 } 811 812 seen[reply.Author] = true 812 813 813 - r, err := s.backend.getRepoByID(ctx, reply.Author) 814 + r, err := s.backend.GetRepoByID(ctx, reply.Author) 814 815 if err != nil { 815 816 slog.Error("failed to get repo for reply author", "error", err) 816 817 continue ··· 825 826 826 827 // Get profile if available 827 828 var profile models.Profile 828 - s.backend.db.Find(&profile, "repo = ?", r.ID) 829 + s.db.Find(&profile, "repo = ?", r.ID) 829 830 830 831 var prof *bsky.ActorProfile 831 832 if len(profile.Raw) > 0 { ··· 932 933 query := `SELECT * FROM notifications WHERE "for" = ?` 933 934 if cursorID > 0 { 934 935 query += ` AND id < ?` 935 - if err := s.backend.db.Raw(query+" ORDER BY created_at DESC LIMIT ?", s.myrepo.ID, cursorID, limit).Scan(&notifications).Error; err != nil { 936 + if err := s.db.Raw(query+" ORDER BY created_at DESC LIMIT ?", s.myrepo.ID, cursorID, limit).Scan(&notifications).Error; err != nil { 936 937 return err 937 938 } 938 939 } else { 939 - if err := s.backend.db.Raw(query+" ORDER BY created_at DESC LIMIT ?", s.myrepo.ID, limit).Scan(&notifications).Error; err != nil { 940 + if err := s.db.Raw(query+" ORDER BY created_at DESC LIMIT ?", s.myrepo.ID, limit).Scan(&notifications).Error; err != nil { 940 941 return err 941 942 } 942 943 } ··· 945 946 results := []notificationResponse{} 946 947 for _, notif := range notifications { 947 948 // Get author info 948 - author, err := s.backend.getRepoByID(ctx, notif.Author) 949 + author, err := s.backend.GetRepoByID(ctx, notif.Author) 949 950 if err != nil { 950 951 slog.Error("failed to get repo for notification author", "error", err) 951 952 continue ··· 966 967 } 967 968 968 969 // Try to get source post preview for reply/mention notifications 969 - if notif.Kind == NotifKindReply || notif.Kind == NotifKindMention { 970 + if notif.Kind == backend.NotifKindReply || notif.Kind == backend.NotifKindMention { 970 971 // Parse URI to get post 971 - p, err := s.backend.getPostByUri(ctx, notif.Source, "*") 972 + p, err := s.backend.GetPostByUri(ctx, notif.Source, "*") 972 973 if err == nil && p.Raw != nil && len(p.Raw) > 0 { 973 974 var fp bsky.FeedPost 974 975 if err := fp.UnmarshalCBOR(bytes.NewReader(p.Raw)); err == nil {
+2 -2
hydration/hydrator.go
··· 10 10 type Hydrator struct { 11 11 db *gorm.DB 12 12 dir identity.Directory 13 - backend backend.RecordTracker 13 + backend *backend.PostgresBackend 14 14 } 15 15 16 16 // NewHydrator creates a new Hydrator 17 - func NewHydrator(db *gorm.DB, dir identity.Directory, backend backend.RecordTracker) *Hydrator { 17 + func NewHydrator(db *gorm.DB, dir identity.Directory, backend *backend.PostgresBackend) *Hydrator { 18 18 return &Hydrator{ 19 19 db: db, 20 20 dir: dir,
+30 -7
hydration/post.go
··· 39 39 ctx, span := tracer.Start(ctx, "hydratePost") 40 40 defer span.End() 41 41 42 - // Query post from database 43 - var dbPost models.Post 44 - err := h.db.Raw(`SELECT * FROM posts 45 - WHERE author = (SELECT id FROM repos WHERE did = ?) 46 - AND rkey = ? 47 - `, extractDIDFromURI(uri), extractRkeyFromURI(uri)).Scan(&dbPost).Error 42 + autoFetch, _ := ctx.Value("auto-fetch").(bool) 48 43 44 + authorDid := extractDIDFromURI(uri) 45 + r, err := h.backend.GetOrCreateRepo(ctx, authorDid) 49 46 if err != nil { 47 + return nil, err 48 + } 49 + 50 + // Query post from database 51 + var dbPost models.Post 52 + if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ? `, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil { 50 53 return nil, fmt.Errorf("failed to query post: %w", err) 51 54 } 52 55 53 56 if dbPost.NotFound || len(dbPost.Raw) == 0 { 54 - return nil, fmt.Errorf("post not found") 57 + if autoFetch { 58 + h.AddMissingRecord(uri, true) 59 + if err := h.db.Raw(`SELECT * FROM posts WHERE author = ? AND rkey = ? `, r.ID, extractRkeyFromURI(uri)).Scan(&dbPost).Error; err != nil { 60 + return nil, fmt.Errorf("failed to query post: %w", err) 61 + } 62 + if dbPost.NotFound || len(dbPost.Raw) == 0 { 63 + return nil, fmt.Errorf("post not found") 64 + } 65 + } else { 66 + return nil, fmt.Errorf("post not found") 67 + } 55 68 } 56 69 57 70 // Unmarshal post record ··· 69 82 // Get engagement counts 70 83 var likes, reposts, replies int 71 84 wg.Go(func() { 85 + _, span := tracer.Start(ctx, "likeCounts") 86 + defer span.End() 72 87 h.db.Raw("SELECT COUNT(*) FROM likes WHERE subject = ?", dbPost.ID).Scan(&likes) 73 88 }) 74 89 wg.Go(func() { 90 + _, span := tracer.Start(ctx, "repostCounts") 91 + defer span.End() 75 92 h.db.Raw("SELECT COUNT(*) FROM reposts WHERE subject = ?", dbPost.ID).Scan(&reposts) 76 93 }) 77 94 wg.Go(func() { 95 + _, span := tracer.Start(ctx, "replyCounts") 96 + defer span.End() 78 97 h.db.Raw("SELECT COUNT(*) FROM posts WHERE reply_to = ?", dbPost.ID).Scan(&replies) 79 98 }) 80 99 ··· 82 101 var likeRkey string 83 102 if viewerDID != "" { 84 103 wg.Go(func() { 104 + _, span := tracer.Start(ctx, "viewerLikeState") 105 + defer span.End() 85 106 h.db.Raw(` 86 107 SELECT l.rkey FROM likes l 87 108 WHERE l.subject = ? ··· 174 195 if embed == nil { 175 196 return nil 176 197 } 198 + _, span := tracer.Start(ctx, "formatEmbed") 199 + defer span.End() 177 200 178 201 result := &bsky.FeedDefs_PostView_Embed{} 179 202
+17 -42
main.go
··· 25 25 "github.com/bluesky-social/indigo/util/cliutil" 26 26 xrpclib "github.com/bluesky-social/indigo/xrpc" 27 27 "github.com/gorilla/websocket" 28 - lru "github.com/hashicorp/golang-lru/v2" 29 28 "github.com/ipfs/go-cid" 30 29 "github.com/jackc/pgx/v5/pgxpool" 31 30 "github.com/prometheus/client_golang/prometheus" 32 31 "github.com/prometheus/client_golang/prometheus/promauto" 33 32 "github.com/urfave/cli/v2" 33 + "github.com/whyrusleeping/konbini/backend" 34 34 "github.com/whyrusleeping/konbini/xrpc" 35 35 "go.opentelemetry.io/otel" 36 36 "go.opentelemetry.io/otel/attribute" ··· 38 38 "go.opentelemetry.io/otel/sdk/resource" 39 39 tracesdk "go.opentelemetry.io/otel/sdk/trace" 40 40 semconv "go.opentelemetry.io/otel/semconv/v1.20.0" 41 + "gorm.io/gorm" 41 42 "gorm.io/gorm/logger" 42 43 43 44 . "github.com/whyrusleeping/konbini/models" 44 45 ) 45 - 46 - var handleOpHist = promauto.NewHistogramVec(prometheus.HistogramOpts{ 47 - Name: "handle_op_duration", 48 - Help: "A histogram of op handling durations", 49 - Buckets: prometheus.ExponentialBuckets(1, 2, 15), 50 - }, []string{"op", "collection"}) 51 46 52 47 var firehoseCursorGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ 53 48 Name: "firehose_cursor", ··· 134 129 db.AutoMigrate(Image{}) 135 130 db.AutoMigrate(PostGate{}) 136 131 db.AutoMigrate(StarterPack{}) 137 - db.AutoMigrate(SyncInfo{}) 132 + db.AutoMigrate(backend.SyncInfo{}) 138 133 db.AutoMigrate(Notification{}) 139 134 db.AutoMigrate(SequenceTracker{}) 135 + db.Exec("CREATE INDEX IF NOT EXISTS reposts_subject_idx ON reposts (subject)") 136 + db.Exec("CREATE INDEX IF NOT EXISTS posts_reply_to_idx ON posts (reply_to)") 140 137 141 138 ctx := context.TODO() 142 139 143 - rc, _ := lru.New2Q[string, *Repo](1_000_000) 144 - pc, _ := lru.New2Q[string, cachedPostInfo](1_000_000) 145 - revc, _ := lru.New2Q[uint, string](1_000_000) 146 - 147 140 cfg, err := pgxpool.ParseConfig(cctx.String("db-url")) 148 141 if err != nil { 149 142 return err ··· 206 199 dir: dir, 207 200 208 201 missingRecords: make(chan MissingRecord, 1024), 202 + db: db, 209 203 } 210 204 fmt.Println("MY DID: ", s.mydid) 211 205 212 - pgb := &PostgresBackend{ 213 - relevantDids: make(map[string]bool), 214 - s: s, 215 - db: db, 216 - postInfoCache: pc, 217 - repoCache: rc, 218 - revCache: revc, 219 - pgx: pool, 206 + pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, nil) 207 + if err != nil { 208 + return err 220 209 } 210 + 221 211 s.backend = pgb 222 212 223 - myrepo, err := s.backend.getOrCreateRepo(ctx, mydid) 213 + myrepo, err := s.backend.GetOrCreateRepo(ctx, mydid) 224 214 if err != nil { 225 215 return fmt.Errorf("failed to get repo record for our own did: %w", err) 226 216 } 227 217 s.myrepo = myrepo 228 218 229 - if err := s.backend.loadRelevantDids(); err != nil { 219 + if err := s.backend.LoadRelevantDids(); err != nil { 230 220 return fmt.Errorf("failed to load relevant dids set: %w", err) 231 221 } 232 222 ··· 264 254 } 265 255 266 256 type Server struct { 267 - backend *PostgresBackend 257 + backend *backend.PostgresBackend 268 258 269 259 dir identity.Directory 270 260 ··· 277 267 278 268 mpLk sync.Mutex 279 269 missingRecords chan MissingRecord 270 + 271 + db *gorm.DB 280 272 } 281 273 282 274 func (s *Server) getXrpcClient() (*xrpclib.Client, error) { ··· 332 324 s.lastSeq = evt.Seq 333 325 334 326 if evt.Seq%1000 == 0 { 335 - if err := storeLastSeq(s.backend.db, "firehose_seq", evt.Seq); err != nil { 327 + if err := storeLastSeq(s.db, "firehose_seq", evt.Seq); err != nil { 336 328 fmt.Println("failed to store seqno: ", err) 337 329 } 338 330 } ··· 392 384 return resp.DID.String(), nil 393 385 } 394 386 395 - const ( 396 - NotifKindReply = "reply" 397 - NotifKindLike = "like" 398 - NotifKindMention = "mention" 399 - NotifKindRepost = "repost" 400 - ) 401 - 402 - func (s *Server) AddNotification(ctx context.Context, forUser, author uint, recordUri string, recordCid cid.Cid, kind string) error { 403 - return s.backend.db.Create(&Notification{ 404 - For: forUser, 405 - Author: author, 406 - Source: recordUri, 407 - SourceCid: recordCid.String(), 408 - Kind: kind, 409 - }).Error 410 - } 411 - 412 387 func (s *Server) rescanRepo(ctx context.Context, did string) error { 413 388 resp, err := s.dir.LookupDID(ctx, syntax.DID(did)) 414 389 if err != nil { 415 390 return err 416 391 } 417 392 418 - s.backend.addRelevantDid(did) 393 + s.backend.AddRelevantDid(did) 419 394 420 395 c := &xrpclib.Client{ 421 396 Host: resp.PDSEndpoint(),
+6 -6
missing.go
··· 97 97 } 98 98 99 99 func (s *Server) fetchMissingProfile(ctx context.Context, did string) error { 100 - s.backend.addRelevantDid(did) 100 + s.backend.AddRelevantDid(did) 101 101 102 - repo, err := s.backend.getOrCreateRepo(ctx, did) 102 + repo, err := s.backend.GetOrCreateRepo(ctx, did) 103 103 if err != nil { 104 104 return err 105 105 } ··· 146 146 collection := puri.Collection().String() 147 147 rkey := puri.RecordKey().String() 148 148 149 - s.backend.addRelevantDid(did) 149 + s.backend.AddRelevantDid(did) 150 150 151 - repo, err := s.backend.getOrCreateRepo(ctx, did) 151 + repo, err := s.backend.GetOrCreateRepo(ctx, did) 152 152 if err != nil { 153 153 return err 154 154 } ··· 194 194 did := puri.Authority().String() 195 195 collection := puri.Collection().String() 196 196 rkey := puri.RecordKey().String() 197 - s.backend.addRelevantDid(did) 197 + s.backend.AddRelevantDid(did) 198 198 199 - repo, err := s.backend.getOrCreateRepo(ctx, did) 199 + repo, err := s.backend.GetOrCreateRepo(ctx, did) 200 200 if err != nil { 201 201 return err 202 202 }
-446
pgbackend.go
··· 1 - package main 2 - 3 - import ( 4 - "context" 5 - "errors" 6 - "fmt" 7 - "strings" 8 - "time" 9 - 10 - "github.com/bluesky-social/indigo/api/atproto" 11 - "github.com/bluesky-social/indigo/api/bsky" 12 - "github.com/bluesky-social/indigo/atproto/syntax" 13 - "github.com/bluesky-social/indigo/util" 14 - "github.com/jackc/pgx/v5" 15 - "github.com/jackc/pgx/v5/pgconn" 16 - "github.com/whyrusleeping/market/models" 17 - "gorm.io/gorm" 18 - "gorm.io/gorm/clause" 19 - "gorm.io/gorm/logger" 20 - 21 - . "github.com/whyrusleeping/konbini/models" 22 - ) 23 - 24 - func (b *PostgresBackend) DidToID(ctx context.Context, did string) (uint, error) { 25 - r, err := b.getOrCreateRepo(ctx, did) 26 - if err != nil { 27 - return 0, err 28 - } 29 - 30 - return r.ID, nil 31 - } 32 - 33 - func (b *PostgresBackend) getOrCreateRepo(ctx context.Context, did string) (*Repo, error) { 34 - r, ok := b.repoCache.Get(did) 35 - if !ok { 36 - b.reposLk.Lock() 37 - 38 - r, ok = b.repoCache.Get(did) 39 - if !ok { 40 - r = &Repo{} 41 - r.Did = did 42 - b.repoCache.Add(did, r) 43 - } 44 - 45 - b.reposLk.Unlock() 46 - } 47 - 48 - r.Lk.Lock() 49 - defer r.Lk.Unlock() 50 - if r.Setup { 51 - return r, nil 52 - } 53 - 54 - row := b.pgx.QueryRow(ctx, "SELECT id, created_at, did FROM repos WHERE did = $1", did) 55 - 56 - err := row.Scan(&r.ID, &r.CreatedAt, &r.Did) 57 - if err == nil { 58 - // found it! 59 - r.Setup = true 60 - return r, nil 61 - } 62 - 63 - if err != pgx.ErrNoRows { 64 - return nil, err 65 - } 66 - 67 - r.Did = did 68 - if err := b.db.Create(r).Error; err != nil { 69 - return nil, err 70 - } 71 - 72 - r.Setup = true 73 - 74 - return r, nil 75 - } 76 - 77 - func (b *PostgresBackend) getOrCreateList(ctx context.Context, uri string) (*List, error) { 78 - puri, err := util.ParseAtUri(uri) 79 - if err != nil { 80 - return nil, err 81 - } 82 - 83 - r, err := b.getOrCreateRepo(ctx, puri.Did) 84 - if err != nil { 85 - return nil, err 86 - } 87 - 88 - // TODO: needs upsert treatment when we actually find the list 89 - var list List 90 - if err := b.db.FirstOrCreate(&list, map[string]any{ 91 - "author": r.ID, 92 - "rkey": puri.Rkey, 93 - }).Error; err != nil { 94 - return nil, err 95 - } 96 - return &list, nil 97 - } 98 - 99 - type cachedPostInfo struct { 100 - ID uint 101 - Author uint 102 - } 103 - 104 - func (b *PostgresBackend) postIDForUri(ctx context.Context, uri string) (uint, error) { 105 - // getPostByUri implicitly fills the cache 106 - p, err := b.postInfoForUri(ctx, uri) 107 - if err != nil { 108 - return 0, err 109 - } 110 - 111 - return p.ID, nil 112 - } 113 - 114 - func (b *PostgresBackend) postInfoForUri(ctx context.Context, uri string) (cachedPostInfo, error) { 115 - v, ok := b.postInfoCache.Get(uri) 116 - if ok { 117 - return v, nil 118 - } 119 - 120 - // getPostByUri implicitly fills the cache 121 - p, err := b.getOrCreatePostBare(ctx, uri) 122 - if err != nil { 123 - return cachedPostInfo{}, err 124 - } 125 - 126 - return cachedPostInfo{ID: p.ID, Author: p.Author}, nil 127 - } 128 - 129 - func (b *PostgresBackend) tryLoadPostInfo(ctx context.Context, uid uint, rkey string) (*Post, error) { 130 - var p Post 131 - q := "SELECT id, author FROM posts WHERE author = $1 AND rkey = $2" 132 - if err := b.pgx.QueryRow(ctx, q, uid, rkey).Scan(&p.ID, &p.Author); err != nil { 133 - if errors.Is(err, pgx.ErrNoRows) { 134 - return nil, nil 135 - } 136 - return nil, err 137 - } 138 - 139 - return &p, nil 140 - } 141 - 142 - func (b *PostgresBackend) getOrCreatePostBare(ctx context.Context, uri string) (*Post, error) { 143 - puri, err := util.ParseAtUri(uri) 144 - if err != nil { 145 - return nil, err 146 - } 147 - 148 - r, err := b.getOrCreateRepo(ctx, puri.Did) 149 - if err != nil { 150 - return nil, err 151 - } 152 - 153 - post, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey) 154 - if err != nil { 155 - return nil, err 156 - } 157 - 158 - if post == nil { 159 - post = &Post{ 160 - Rkey: puri.Rkey, 161 - Author: r.ID, 162 - NotFound: true, 163 - } 164 - 165 - err := b.pgx.QueryRow(ctx, "INSERT INTO posts (rkey, author, not_found) VALUES ($1, $2, $3) RETURNING id", puri.Rkey, r.ID, true).Scan(&post.ID) 166 - if err != nil { 167 - pgErr, ok := err.(*pgconn.PgError) 168 - if !ok || pgErr.Code != "23505" { 169 - return nil, err 170 - } 171 - 172 - out, err := b.tryLoadPostInfo(ctx, r.ID, puri.Rkey) 173 - if err != nil { 174 - return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err) 175 - } 176 - if out == nil { 177 - return nil, fmt.Errorf("postgres is lying to us: %d %s", r.ID, puri.Rkey) 178 - } 179 - 180 - post = out 181 - } 182 - 183 - } 184 - 185 - b.postInfoCache.Add(uri, cachedPostInfo{ 186 - ID: post.ID, 187 - Author: post.Author, 188 - }) 189 - 190 - return post, nil 191 - } 192 - 193 - func (b *PostgresBackend) getPostByUri(ctx context.Context, uri string, fields string) (*Post, error) { 194 - puri, err := util.ParseAtUri(uri) 195 - if err != nil { 196 - return nil, err 197 - } 198 - 199 - r, err := b.getOrCreateRepo(ctx, puri.Did) 200 - if err != nil { 201 - return nil, err 202 - } 203 - 204 - q := "SELECT " + fields + " FROM posts WHERE author = ? AND rkey = ?" 205 - 206 - var post Post 207 - if err := b.db.Raw(q, r.ID, puri.Rkey).Scan(&post).Error; err != nil { 208 - return nil, err 209 - } 210 - 211 - if post.ID == 0 { 212 - post.Rkey = puri.Rkey 213 - post.Author = r.ID 214 - post.NotFound = true 215 - 216 - if err := b.db.Session(&gorm.Session{ 217 - Logger: logger.Default.LogMode(logger.Silent), 218 - }).Create(&post).Error; err != nil { 219 - if !errors.Is(err, gorm.ErrDuplicatedKey) { 220 - return nil, err 221 - } 222 - if err := b.db.Find(&post, "author = ? AND rkey = ?", r.ID, puri.Rkey).Error; err != nil { 223 - return nil, fmt.Errorf("got duplicate post and still couldnt find it: %w", err) 224 - } 225 - } 226 - 227 - } 228 - 229 - b.postInfoCache.Add(uri, cachedPostInfo{ 230 - ID: post.ID, 231 - Author: post.Author, 232 - }) 233 - 234 - return &post, nil 235 - } 236 - 237 - func (b *PostgresBackend) revForRepo(rr *Repo) (string, error) { 238 - lrev, ok := b.revCache.Get(rr.ID) 239 - if ok { 240 - return lrev, nil 241 - } 242 - 243 - var rev string 244 - if err := b.pgx.QueryRow(context.TODO(), "SELECT COALESCE(rev, '') FROM sync_infos WHERE repo = $1", rr.ID).Scan(&rev); err != nil { 245 - if errors.Is(err, pgx.ErrNoRows) { 246 - return "", nil 247 - } 248 - return "", err 249 - } 250 - 251 - if rev != "" { 252 - b.revCache.Add(rr.ID, rev) 253 - } 254 - return rev, nil 255 - } 256 - 257 - func (b *PostgresBackend) ensureFollowsScraped(ctx context.Context, user string) error { 258 - r, err := b.getOrCreateRepo(ctx, user) 259 - if err != nil { 260 - return err 261 - } 262 - 263 - var si SyncInfo 264 - if err := b.db.Find(&si, "repo = ?", r.ID).Error; err != nil { 265 - return err 266 - } 267 - 268 - // not found 269 - if si.Repo == 0 { 270 - if err := b.db.Create(&SyncInfo{ 271 - Repo: r.ID, 272 - }).Error; err != nil { 273 - return err 274 - } 275 - } 276 - 277 - if si.FollowsSynced { 278 - return nil 279 - } 280 - 281 - var follows []Follow 282 - var cursor string 283 - for { 284 - resp, err := atproto.RepoListRecords(ctx, b.s.client, "app.bsky.graph.follow", cursor, 100, b.s.mydid, false) 285 - if err != nil { 286 - return err 287 - } 288 - 289 - for _, rec := range resp.Records { 290 - if fol, ok := rec.Value.Val.(*bsky.GraphFollow); ok { 291 - fr, err := b.getOrCreateRepo(ctx, fol.Subject) 292 - if err != nil { 293 - return err 294 - } 295 - 296 - puri, err := syntax.ParseATURI(rec.Uri) 297 - if err != nil { 298 - return err 299 - } 300 - 301 - follows = append(follows, Follow{ 302 - Created: time.Now(), 303 - Indexed: time.Now(), 304 - Rkey: puri.RecordKey().String(), 305 - Author: r.ID, 306 - Subject: fr.ID, 307 - }) 308 - } 309 - } 310 - 311 - if resp.Cursor == nil || len(resp.Records) == 0 { 312 - break 313 - } 314 - cursor = *resp.Cursor 315 - } 316 - 317 - if err := b.db.Clauses(clause.OnConflict{DoNothing: true}).CreateInBatches(follows, 200).Error; err != nil { 318 - return err 319 - } 320 - 321 - if err := b.db.Model(SyncInfo{}).Where("repo = ?", r.ID).Update("follows_synced", true).Error; err != nil { 322 - return err 323 - } 324 - 325 - fmt.Println("Got follows: ", len(follows)) 326 - 327 - return nil 328 - } 329 - 330 - func (b *PostgresBackend) loadRelevantDids() error { 331 - ctx := context.TODO() 332 - 333 - if err := b.ensureFollowsScraped(ctx, b.s.mydid); err != nil { 334 - return fmt.Errorf("failed to scrape follows: %w", err) 335 - } 336 - 337 - r, err := b.getOrCreateRepo(ctx, b.s.mydid) 338 - if err != nil { 339 - return err 340 - } 341 - 342 - var dids []string 343 - if err := b.db.Raw("select did from follows left join repos on follows.subject = repos.id where follows.author = ?", r.ID).Scan(&dids).Error; err != nil { 344 - return err 345 - } 346 - 347 - b.relevantDids[b.s.mydid] = true 348 - for _, d := range dids { 349 - fmt.Println("adding did: ", d) 350 - b.relevantDids[d] = true 351 - } 352 - 353 - return nil 354 - } 355 - 356 - type SyncInfo struct { 357 - Repo uint `gorm:"index"` 358 - FollowsSynced bool 359 - Rev string 360 - } 361 - 362 - func (b *PostgresBackend) checkPostExists(ctx context.Context, repo *Repo, rkey string) (bool, error) { 363 - var id uint 364 - var notfound bool 365 - if err := b.pgx.QueryRow(ctx, "SELECT id, not_found FROM posts WHERE author = $1 AND rkey = $2", repo.ID, rkey).Scan(&id, &notfound); err != nil { 366 - if errors.Is(err, pgx.ErrNoRows) { 367 - return false, nil 368 - } 369 - return false, err 370 - } 371 - 372 - if id != 0 && !notfound { 373 - return true, nil 374 - } 375 - 376 - return false, nil 377 - } 378 - 379 - func (b *PostgresBackend) addRelevantDid(did string) { 380 - b.rdLk.Lock() 381 - defer b.rdLk.Unlock() 382 - b.relevantDids[did] = true 383 - } 384 - 385 - func (b *PostgresBackend) didIsRelevant(did string) bool { 386 - b.rdLk.Lock() 387 - defer b.rdLk.Unlock() 388 - return b.relevantDids[did] 389 - } 390 - 391 - func (b *PostgresBackend) anyRelevantIdents(idents ...string) bool { 392 - for _, id := range idents { 393 - if strings.HasPrefix(id, "did:") { 394 - if b.didIsRelevant(id) { 395 - return true 396 - } 397 - } else if strings.HasPrefix(id, "at://") { 398 - puri, err := syntax.ParseATURI(id) 399 - if err != nil { 400 - continue 401 - } 402 - 403 - if b.didIsRelevant(puri.Authority().String()) { 404 - return true 405 - } 406 - } 407 - } 408 - 409 - return false 410 - } 411 - 412 - func (b *PostgresBackend) getRepoByID(ctx context.Context, id uint) (*models.Repo, error) { 413 - var r models.Repo 414 - if err := b.db.Find(&r, "id = ?", id).Error; err != nil { 415 - return nil, err 416 - } 417 - 418 - return &r, nil 419 - } 420 - 421 - func (b *PostgresBackend) TrackMissingRecord(identifier string, wait bool) { 422 - b.s.addMissingRecord(context.TODO(), MissingRecord{ 423 - Type: inferRecordType(identifier), 424 - Identifier: identifier, 425 - Wait: wait, 426 - }) 427 - } 428 - 429 - // inferRecordType determines the record type based on the identifier format 430 - func inferRecordType(identifier string) MissingRecordType { 431 - if strings.HasPrefix(identifier, "did:") { 432 - return MissingRecordTypeProfile 433 - } 434 - 435 - if strings.HasPrefix(identifier, "at://") { 436 - if strings.Contains(identifier, "/app.bsky.feed.post/") { 437 - return MissingRecordTypePost 438 - } 439 - if strings.Contains(identifier, "/app.bsky.feed.generator/") { 440 - return MissingRecordTypeFeedGenerator 441 - } 442 - } 443 - 444 - // Default to post if we can't determine 445 - return MissingRecordTypePost 446 - }
+36 -13
xrpc/feed/getAuthorFeed.go
··· 10 10 "time" 11 11 12 12 "github.com/bluesky-social/indigo/api/bsky" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 13 14 "github.com/labstack/echo/v4" 14 15 "github.com/whyrusleeping/konbini/hydration" 15 16 "github.com/whyrusleeping/konbini/views" ··· 140 141 go func(i int, row postRow) { 141 142 defer wg.Done() 142 143 143 - postInfo, err := hydrator.HydratePost(ctx, row.URI, viewer) 144 + puri, err := syntax.ParseATURI(row.URI) 144 145 if err != nil { 145 - if strings.Contains(err.Error(), "post not found") { 146 - hydrator.AddMissingRecord(row.URI, true) 147 - postInfo, err = hydrator.HydratePost(ctx, row.URI, viewer) 148 - if err != nil { 149 - slog.Error("failed to hydrate post after fetch missing", "uri", row.URI, "error", err) 146 + slog.Error("row had invalid uri", "uri", row.URI, "error", err) 147 + return 148 + } 149 + 150 + var subwg sync.WaitGroup 151 + 152 + var postInfo *hydration.PostInfo 153 + subwg.Go(func() { 154 + pi, err := hydrator.HydratePost(ctx, row.URI, viewer) 155 + if err != nil { 156 + if strings.Contains(err.Error(), "post not found") { 157 + hydrator.AddMissingRecord(row.URI, true) 158 + pi, err = hydrator.HydratePost(ctx, row.URI, viewer) 159 + if err != nil { 160 + slog.Error("failed to hydrate post after fetch missing", "uri", row.URI, "error", err) 161 + return 162 + } 163 + } else { 164 + slog.Warn("failed to hydrate post", "uri", row.URI, "error", err) 150 165 return 151 166 } 152 - } else { 153 - slog.Warn("failed to hydrate post", "uri", row.URI, "error", err) 167 + } 168 + postInfo = pi 169 + }) 170 + 171 + var authorInfo *hydration.ActorInfo 172 + subwg.Go(func() { 173 + ai, err := hydrator.HydrateActor(ctx, puri.Authority().String()) 174 + if err != nil { 175 + hydrator.AddMissingRecord(postInfo.Author, false) 176 + slog.Warn("failed to hydrate author", "did", postInfo.Author, "error", err) 154 177 return 155 178 } 156 - } 179 + authorInfo = ai 180 + }) 157 181 158 - authorInfo, err := hydrator.HydrateActor(ctx, postInfo.Author) 159 - if err != nil { 160 - hydrator.AddMissingRecord(postInfo.Author, false) 161 - slog.Warn("failed to hydrate author", "did", postInfo.Author, "error", err) 182 + subwg.Wait() 183 + 184 + if postInfo == nil || authorInfo == nil { 162 185 return 163 186 } 164 187
+5 -1
xrpc/server.go
··· 1 1 package xrpc 2 2 3 3 import ( 4 + "context" 4 5 "log/slog" 5 6 "net/http" 6 7 7 8 "github.com/bluesky-social/indigo/atproto/identity" 8 9 "github.com/labstack/echo/v4" 9 10 "github.com/labstack/echo/v4/middleware" 11 + "github.com/whyrusleeping/konbini/backend" 10 12 "github.com/whyrusleeping/konbini/hydration" 13 + "github.com/whyrusleeping/konbini/models" 11 14 "github.com/whyrusleeping/konbini/xrpc/actor" 12 15 "github.com/whyrusleeping/konbini/xrpc/feed" 13 16 "github.com/whyrusleeping/konbini/xrpc/graph" ··· 32 35 // Add methods as needed for data access 33 36 34 37 TrackMissingRecord(identifier string, wait bool) 38 + GetOrCreateRepo(ctx context.Context, did string) (*models.Repo, error) 35 39 } 36 40 37 41 // NewServer creates a new XRPC server 38 - func NewServer(db *gorm.DB, dir identity.Directory, backend Backend) *Server { 42 + func NewServer(db *gorm.DB, dir identity.Directory, backend *backend.PostgresBackend) *Server { 39 43 e := echo.New() 40 44 e.HidePort = true 41 45 e.HideBanner = true
+1
xrpc/unspecced/getPostThreadV2.go
··· 17 17 // HandleGetPostThreadV2 implements app.bsky.unspecced.getPostThreadV2 18 18 func HandleGetPostThreadV2(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error { 19 19 ctx := c.Request().Context() 20 + ctx = context.WithValue(ctx, "auto-fetch", true) 20 21 21 22 // Parse parameters 22 23 anchorRaw := c.QueryParam("anchor")