A locally focused bluesky appview

some parallization for the getThreadV2 api

Changed files
+60 -14
backend
hydration
xrpc
unspecced
+19
backend/backend.go
··· 43 43 repoCache *lru.TwoQueueCache[string, *Repo] 44 44 reposLk sync.Mutex 45 45 46 + didByIDCache *lru.TwoQueueCache[uint, string] 47 + 46 48 postInfoCache *lru.TwoQueueCache[string, cachedPostInfo] 47 49 } 48 50 ··· 56 58 rc, _ := lru.New2Q[string, *Repo](1_000_000) 57 59 pc, _ := lru.New2Q[string, cachedPostInfo](1_000_000) 58 60 revc, _ := lru.New2Q[uint, string](1_000_000) 61 + dbic, _ := lru.New2Q[uint, string](1_000_000) 59 62 60 63 b := &PostgresBackend{ 61 64 client: client, ··· 67 70 repoCache: rc, 68 71 postInfoCache: pc, 69 72 revCache: revc, 73 + didByIDCache: dbic, 70 74 } 71 75 72 76 r, err := b.GetOrCreateRepo(context.TODO(), mydid) ··· 363 367 } 364 368 365 369 return &r, nil 370 + } 371 + 372 + func (b *PostgresBackend) DidFromID(ctx context.Context, uid uint) (string, error) { 373 + val, ok := b.didByIDCache.Get(uid) 374 + if ok { 375 + return val, nil 376 + } 377 + 378 + r, err := b.GetRepoByID(ctx, uid) 379 + if err != nil { 380 + return "", err 381 + } 382 + 383 + b.didByIDCache.Add(uid, r.Did) 384 + return r.Did, nil 366 385 } 367 386 368 387 func (b *PostgresBackend) checkPostExists(ctx context.Context, repo *Repo, rkey string) (bool, error) {
+2 -2
hydration/utils.go
··· 30 30 } 31 31 32 32 func (h *Hydrator) UriForPost(ctx context.Context, p *models.Post) (string, error) { 33 - r, err := h.backend.GetRepoByID(ctx, p.Author) 33 + did, err := h.backend.DidFromID(ctx, p.Author) 34 34 if err != nil { 35 35 return "", err 36 36 } 37 37 38 - return fmt.Sprintf("at://%s/app.bsky.feed.post/%s", r.Did, p.Rkey), nil 38 + return fmt.Sprintf("at://%s/app.bsky.feed.post/%s", did, p.Rkey), nil 39 39 }
+1
main.go
··· 135 135 db.AutoMigrate(SequenceTracker{}) 136 136 db.Exec("CREATE INDEX IF NOT EXISTS reposts_subject_idx ON reposts (subject)") 137 137 db.Exec("CREATE INDEX IF NOT EXISTS posts_reply_to_idx ON posts (reply_to)") 138 + db.Exec("CREATE INDEX IF NOT EXISTS posts_in_thread_idx ON posts (in_thread)") 138 139 139 140 ctx := context.TODO() 140 141
+38 -12
xrpc/unspecced/getPostThreadV2.go
··· 7 7 "log/slog" 8 8 "net/http" 9 9 "strconv" 10 + "sync" 10 11 11 12 "github.com/bluesky-social/indigo/api/bsky" 12 13 "github.com/labstack/echo/v4" 13 14 "github.com/whyrusleeping/konbini/hydration" 14 15 "github.com/whyrusleeping/konbini/views" 15 16 "github.com/whyrusleeping/market/models" 17 + "go.opentelemetry.io/otel" 16 18 "gorm.io/gorm" 17 19 ) 20 + 21 + var tracer = otel.Tracer("xrpc/unspecced") 18 22 19 23 // HandleGetPostThreadV2 implements app.bsky.unspecced.getPostThreadV2 20 24 func HandleGetPostThreadV2(c echo.Context, db *gorm.DB, hydrator *hydration.Hydrator) error { 21 - ctx := c.Request().Context() 25 + ctx, span := tracer.Start(c.Request().Context(), "getPostThreadV2") 26 + defer span.End() 22 27 ctx = context.WithValue(ctx, "auto-fetch", true) 23 28 24 29 // Parse parameters ··· 152 157 return nil, nil 153 158 } 154 159 155 - var out []*bsky.UnspeccedGetPostThreadV2_ThreadItem 156 - for _, child := range curnode.children { 157 - out = append(out, buildThreadItem(ctx, hydrator, child, depth+1, viewer)) 158 - if child.missing { 159 - continue 160 - } 160 + type parThreadResults struct { 161 + node *bsky.UnspeccedGetPostThreadV2_ThreadItem 162 + children []*bsky.UnspeccedGetPostThreadV2_ThreadItem 163 + } 164 + 165 + results := make([]parThreadResults, len(curnode.children)) 166 + 167 + var wg sync.WaitGroup 168 + for i := range curnode.children { 169 + ix := i 170 + wg.Go(func() { 171 + child := curnode.children[ix] 172 + 173 + results[ix].node = buildThreadItem(ctx, hydrator, child, depth+1, viewer) 174 + if child.missing { 175 + return 176 + } 177 + 178 + sub, err := collectReplies(ctx, hydrator, child, depth+1, below-1, branchingFactor, sort, viewer) 179 + if err != nil { 180 + slog.Error("failed to collect replies", "node", child.uri, "error", err) 181 + return 182 + } 183 + 184 + results[ix].children = sub 185 + }) 186 + } 161 187 162 - sub, err := collectReplies(ctx, hydrator, child, depth+1, below-1, branchingFactor, sort, viewer) 163 - if err != nil { 164 - return nil, err 165 - } 188 + wg.Wait() 166 189 167 - out = append(out, sub...) 190 + var out []*bsky.UnspeccedGetPostThreadV2_ThreadItem 191 + for _, res := range results { 192 + out = append(out, res.node) 193 + out = append(out, res.children...) 168 194 } 169 195 170 196 return out, nil