a love letter to tangled (android, iOS, and a search API)

feat: resync

refactor server to use register funcs

+390 -63
+1
docs/README.md
··· 7 7 - [`reference/api.md`](reference/api.md) — Go search API service 8 8 - [`reference/app.md`](reference/app.md) — Ionic Vue mobile app 9 9 - [`reference/lexicons.md`](reference/lexicons.md) — Tangled AT Protocol record types 10 + - [`reference/resync.md`](reference/resync.md) — Backfill and repo-resync recovery playbook 10 11 11 12 ## Specs 12 13
+185
docs/reference/resync.md
··· 1 + --- 2 + title: Backfill & Resync Playbook 3 + updated: 2026-03-25 4 + --- 5 + 6 + Twister's search index has three recovery paths. Choose based on what broke. 7 + 8 + | Situation | Recovery path | 9 + | ----------------------------------------------------- | -------------------------------------------- | 10 + | FTS index corrupted or drifted from stored documents | `twister reindex` | 11 + | Documents missing — never received via Tap | `twister backfill` + let the indexer consume | 12 + | Documents missing — received but fields empty/wrong | `twister enrich` | 13 + | Full index loss — DB dropped or migrated | backfill then reindex then enrich | 14 + | Tap cursor too far ahead — events skipped after a gap | cursor reset via `sync_state` table | 15 + 16 + --- 17 + 18 + ## Paths Overview 19 + 20 + **Tap** is the authoritative ingest and backfill path. Documents reach the index 21 + when the `indexer` consumes events from Tap. Completeness depends on which DIDs 22 + Tap is tracking. 23 + 24 + **Read-through indexing** closes gaps on demand: when the API fetches a record 25 + not yet in the index, it enqueues a background job. This supplements Tap but is 26 + not a substitute for it. 27 + 28 + **JetStream** feeds only the activity cache (`/activity`). It does not contribute 29 + to the search index. 30 + 31 + --- 32 + 33 + ## Commands 34 + 35 + ### `twister indexer` 36 + 37 + Runs the Tap consumer. Must be running continuously for real-time indexing. 38 + Persists cursor to `sync_state` table under consumer name `indexer-tap-v1`. 39 + 40 + ### `twister backfill` 41 + 42 + Discovers users via follow graph from seed DIDs/handles, checks Tap status for 43 + each, and registers untracked repos with Tap `/repos/add`. 44 + 45 + ```sh 46 + # dry-run first 47 + twister backfill --seeds seeds.txt --max-hops 2 --dry-run 48 + 49 + # real run 50 + twister backfill --seeds seeds.txt --max-hops 2 \ 51 + --concurrency 5 --batch-size 10 --batch-delay 1s 52 + ``` 53 + 54 + Safe to re-run. Discovery deduplicates and `repos/add` is idempotent. 55 + 56 + ### `twister reindex` 57 + 58 + Re-upserts stored documents into the FTS table and runs `optimize`. Does not 59 + re-fetch from upstream — only re-processes what is already in the DB. 60 + 61 + ```sh 62 + twister reindex # all documents 63 + twister reindex --collection sh.tangled.repo 64 + twister reindex --did did:plc:abc123 65 + twister reindex --dry-run # preview without writing 66 + ``` 67 + 68 + Run this when: FTS results are stale after a schema migration, after a bulk 69 + document import, or whenever search quality seems inconsistent with stored data. 70 + 71 + ### `twister enrich` 72 + 73 + Resolves missing `author_handle`, `repo_name`, and `web_url` via XRPC for 74 + documents already in the DB. 75 + 76 + ```sh 77 + twister enrich # all documents 78 + twister enrich --collection sh.tangled.repo.issue 79 + twister enrich --did did:plc:abc123 80 + twister enrich --dry-run 81 + ``` 82 + 83 + Run this when: search results show documents with empty author handles, or 84 + after deploying enrichment logic changes. 85 + 86 + --- 87 + 88 + ## Scenario Playbooks 89 + 90 + ### FTS index out of sync 91 + 92 + Documents exist in the DB but search returns wrong/stale results. 93 + 94 + ```sh 95 + twister reindex --dry-run # confirm scope 96 + twister reindex # re-upsert + FTS optimize 97 + ``` 98 + 99 + Verify with `GET /search?q=<known-term>`. 100 + 101 + ### Documents missing from search 102 + 103 + Fetch a known record directly. If it returns from `/actors/{handle}/repos/{repo}` 104 + but does not appear in `/search`, the document was never indexed. 105 + 106 + 1. Check if the DID is tracked by Tap. If not, run `backfill`: 107 + 108 + ```sh 109 + twister backfill --seeds <handle-or-did> --max-hops 0 110 + ``` 111 + 112 + 2. Once Tap is tracking the DID, the `indexer` will deliver historical events. 113 + Monitor progress via `GET /admin/status` (requires `ENABLE_ADMIN_ENDPOINTS=true`). 114 + 115 + 3. If you need the record indexed immediately, fetch it through the API — the 116 + read-through indexer will enqueue it automatically. 117 + 118 + ### Enrichment gaps 119 + 120 + Documents appear in search but `author_handle` or `repo_name` is empty. 121 + 122 + ```sh 123 + twister enrich --dry-run # preview what would be resolved 124 + twister enrich # apply 125 + twister reindex # re-sync FTS after field updates 126 + ``` 127 + 128 + ### Full index recovery 129 + 130 + Use this sequence after a DB drop, migration to a new Turso database, or other 131 + full-loss event. 132 + 133 + 1. Confirm migrations ran: `twister api --local` performs `store.Migrate` on startup. 134 + 2. Register repos with Tap: 135 + 136 + ```sh 137 + twister backfill --seeds seeds.txt --max-hops 2 --dry-run 138 + twister backfill --seeds seeds.txt --max-hops 2 139 + ``` 140 + 141 + 3. Start the indexer and let it consume: `twister indexer` 142 + 4. Once backfill is complete, enrich fields and re-sync FTS: 143 + 144 + ```sh 145 + twister enrich 146 + twister reindex 147 + ``` 148 + 149 + 5. Verify: `GET /admin/status` for cursor progress, `GET /readyz` for DB health. 150 + 151 + ### Tap cursor reset 152 + 153 + If the indexer cursor is ahead of what Tap will deliver (e.g., after a Tap 154 + instance reset), events will be skipped until the cursor catches up. 155 + 156 + To reset the cursor and reprocess from the beginning of Tap's retention window: 157 + 158 + ```sql 159 + DELETE FROM sync_state WHERE consumer_name = 'indexer-tap-v1'; 160 + ``` 161 + 162 + Then restart the `indexer`. It will start from the head of the stream and 163 + process all events Tap delivers. 164 + 165 + > **Note:** This does not cause duplicate documents — `UpsertDocument` is 166 + > idempotent. It may reprocess a large backlog depending on Tap retention. 167 + 168 + --- 169 + 170 + ## Checking Status 171 + 172 + With `ENABLE_ADMIN_ENDPOINTS=true`: 173 + 174 + ```sh 175 + curl -H "Authorization: Bearer $ADMIN_AUTH_TOKEN" \ 176 + http://localhost:8080/admin/status 177 + ``` 178 + 179 + Response includes: 180 + 181 + - `tap.cursor` — last Tap event ID processed by the indexer 182 + - `tap.updated_at` — when the cursor was last advanced 183 + - `jetstream.cursor` — JetStream timestamp cursor (activity cache only) 184 + - `documents` — total searchable document count 185 + - `pending_jobs` — read-through indexing jobs not yet processed
+2 -2
docs/roadmap.md
··· 26 26 - [x] Add a JetStream cache consumer with a persisted timestamp cursor 27 27 - [x] Seed the JetStream cursor to `now - 24h` on first boot and rewind slightly on reconnect 28 28 - [x] Store and serve bounded recent activity from the local cache 29 - - [ ] Keep Tap as the authoritative indexing and bulk backfill path 30 - - [ ] Define a controlled backfill and repo-resync playbook for recovery (`docs/references/resync.md`) 29 + - [x] Keep Tap as the authoritative indexing and bulk backfill path 30 + - [x] Define a controlled backfill and repo-resync playbook for recovery (`docs/reference/resync.md`) 31 31 32 32 ## API: Constellation Integration 33 33
+10
packages/api/README.md
··· 174 174 | `GET /identity/did/{did}` | `https://plc.directory/{did}` or `/.well-known/did.json` | 175 175 | `GET /backlinks/count` | Constellation `getBacklinksCount` (cached) | 176 176 | `WS /activity/stream` | `wss://jetstream2.us-east.bsky.network/subscribe` | 177 + 178 + ## Admin endpoints 179 + 180 + Available when `ENABLE_ADMIN_ENDPOINTS=true`. Require `Authorization: Bearer <ADMIN_AUTH_TOKEN>` when 181 + `ADMIN_AUTH_TOKEN` is set. 182 + 183 + | Route | Description | 184 + | ---------------------- | -------------------------------------------------------- | 185 + | `GET /admin/status` | Tap cursor, JetStream cursor, document count, job queue | 186 + | `POST /admin/reindex` | Re-sync all (or filtered) documents into the FTS index |
+74 -61
packages/api/internal/api/api.go
··· 18 18 "tangled.org/desertthunder.dev/twister/internal/reindex" 19 19 "tangled.org/desertthunder.dev/twister/internal/search" 20 20 "tangled.org/desertthunder.dev/twister/internal/store" 21 - "tangled.org/desertthunder.dev/twister/internal/view" 22 21 "tangled.org/desertthunder.dev/twister/internal/xrpc" 23 22 ) 24 23 ··· 47 46 } 48 47 49 48 // Handler returns the HTTP handler with all routes registered. 50 - // 51 - // TODO: refactor this to have a func router() that returns [http.Handler] (mux), 52 - // then registerXXX routes 53 49 func (s *Server) Handler() http.Handler { 54 - mux := http.NewServeMux() 55 - 56 - mux.HandleFunc("GET /healthz", s.handleHealthz) 57 - mux.HandleFunc("GET /readyz", s.handleReadyz) 58 - mux.HandleFunc("GET /oauth/client-metadata.json", s.handleOAuthClientMetadata) 59 - mux.HandleFunc("GET /search", s.handleSearch) 60 - mux.HandleFunc("GET /search/keyword", s.handleSearchKeyword) 61 - 62 - mux.HandleFunc("GET /documents/{id}", s.handleGetDocument) 63 - mux.HandleFunc("GET /profiles/{did}/summary", s.handleProfileSummary) 64 - 65 - mux.HandleFunc("GET /backlinks/count", s.handleBacklinksCount) 66 - mux.HandleFunc("GET /activity", s.handleActivity) 67 - mux.HandleFunc("GET /activity/stream", s.handleActivityStream) 68 - mux.HandleFunc("GET /identity/resolve", s.handleResolveHandle) 69 - mux.HandleFunc("GET /identity/did/{did}", s.handleDidDocument) 70 - mux.HandleFunc("GET /xrpc/knot/{knotHost}/{nsid}", s.handleKnotProxy) 71 - mux.HandleFunc("GET /xrpc/pds/{pds}/{nsid}", s.handlePdsProxy) 72 - mux.HandleFunc("GET /xrpc/bsky/{nsid}", s.handleBskyProxy) 73 - 74 - mux.HandleFunc("GET /actors/{handle}", s.handleGetActor) 75 - mux.HandleFunc("GET /actors/{handle}/repos", s.handleListActorRepos) 76 - mux.HandleFunc("GET /actors/{handle}/repos/{repo}", s.handleGetActorRepo) 77 - mux.HandleFunc("GET /actors/{handle}/repos/{repo}/tree", s.handleRepoTree) 78 - mux.HandleFunc("GET /actors/{handle}/repos/{repo}/blob", s.handleRepoBlob) 79 - mux.HandleFunc("GET /actors/{handle}/repos/{repo}/log", s.handleRepoLog) 80 - mux.HandleFunc("GET /actors/{handle}/repos/{repo}/branches", s.handleRepoBranches) 81 - mux.HandleFunc("GET /actors/{handle}/repos/{repo}/default-branch", s.handleRepoDefaultBranch) 82 - mux.HandleFunc("GET /actors/{handle}/repos/{repo}/languages", s.handleRepoLanguages) 83 - mux.HandleFunc("GET /actors/{handle}/repos/{repo}/tags", s.handleRepoTags) 84 - mux.HandleFunc("GET /actors/{handle}/repos/{repo}/diff", s.handleRepoDiff) 85 - mux.HandleFunc("GET /actors/{handle}/repos/{repo}/compare", s.handleRepoCompare) 86 - mux.HandleFunc("GET /actors/{handle}/repos/{repo}/issues", s.handleRepoIssues) 87 - mux.HandleFunc("GET /actors/{handle}/repos/{repo}/pulls", s.handleRepoPulls) 88 - mux.HandleFunc("GET /actors/{handle}/issues", s.handleActorIssues) 89 - mux.HandleFunc("GET /actors/{handle}/pulls", s.handleActorPulls) 90 - mux.HandleFunc("GET /actors/{handle}/following", s.handleActorFollowing) 91 - mux.HandleFunc("GET /actors/{handle}/strings", s.handleActorStrings) 92 - mux.HandleFunc("GET /issues/{handle}/{rkey}", s.handleIssueDetail) 93 - mux.HandleFunc("GET /issues/{handle}/{rkey}/comments", s.handleIssueComments) 94 - mux.HandleFunc("GET /pulls/{handle}/{rkey}", s.handlePullDetail) 95 - mux.HandleFunc("GET /pulls/{handle}/{rkey}/comments", s.handlePullComments) 96 - 97 - if s.cfg.EnableAdminEndpoints { 98 - mux.HandleFunc("POST /admin/reindex", s.handleAdminReindex) 99 - } 100 - 101 - site := view.Handler() 102 - mux.Handle("GET /static/", site) 103 - mux.Handle("GET /docs", site) 104 - mux.Handle("GET /docs/search", site) 105 - mux.Handle("GET /docs/documents", site) 106 - mux.Handle("GET /docs/health", site) 107 - mux.Handle("GET /{$}", site) 108 - 109 - return s.withMiddleware(mux) 50 + return s.router() 110 51 } 111 52 112 53 // Run starts the HTTP server and blocks until ctx is cancelled. ··· 406 347 writeJSON(w, http.StatusOK, summary) 407 348 } 408 349 350 + const tapConsumerName = "indexer-tap-v1" 351 + 352 + // handleAdminStatus returns Tap cursor, JetStream cursor, document count, and pending job count. 353 + // Route: GET /admin/status 354 + func (s *Server) handleAdminStatus(w http.ResponseWriter, r *http.Request) { 355 + if s.cfg.AdminAuthToken != "" { 356 + token := strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ") 357 + if token != s.cfg.AdminAuthToken { 358 + writeJSON(w, http.StatusUnauthorized, errorBody("unauthorized", "invalid admin token")) 359 + return 360 + } 361 + } 362 + 363 + tapState, err := s.store.GetSyncState(r.Context(), tapConsumerName) 364 + if err != nil { 365 + s.log.Error("admin status: get tap sync state failed", slog.String("error", err.Error())) 366 + writeJSON(w, http.StatusInternalServerError, errorBody("db_error", "failed to fetch tap sync state")) 367 + return 368 + } 369 + 370 + jsState, err := s.store.GetSyncState(r.Context(), jetstreamConsumerName) 371 + if err != nil { 372 + s.log.Error("admin status: get jetstream sync state failed", slog.String("error", err.Error())) 373 + writeJSON(w, http.StatusInternalServerError, errorBody("db_error", "failed to fetch jetstream sync state")) 374 + return 375 + } 376 + 377 + docs, err := s.store.CountDocuments(r.Context()) 378 + if err != nil { 379 + s.log.Error("admin status: count documents failed", slog.String("error", err.Error())) 380 + writeJSON(w, http.StatusInternalServerError, errorBody("db_error", "failed to count documents")) 381 + return 382 + } 383 + 384 + pending, err := s.store.CountPendingIndexingJobs(r.Context()) 385 + if err != nil { 386 + s.log.Error("admin status: count pending jobs failed", slog.String("error", err.Error())) 387 + writeJSON(w, http.StatusInternalServerError, errorBody("db_error", "failed to count pending jobs")) 388 + return 389 + } 390 + 391 + type syncStateJSON struct { 392 + Cursor string `json:"cursor"` 393 + HighWaterMark string `json:"high_water_mark,omitempty"` 394 + UpdatedAt string `json:"updated_at,omitempty"` 395 + } 396 + 397 + tapJSON := syncStateJSON{} 398 + if tapState != nil { 399 + tapJSON = syncStateJSON{ 400 + Cursor: tapState.Cursor, 401 + HighWaterMark: tapState.HighWaterMark, 402 + UpdatedAt: tapState.UpdatedAt, 403 + } 404 + } 405 + 406 + jsJSON := syncStateJSON{} 407 + if jsState != nil { 408 + jsJSON = syncStateJSON{ 409 + Cursor: jsState.Cursor, 410 + HighWaterMark: jsState.HighWaterMark, 411 + UpdatedAt: jsState.UpdatedAt, 412 + } 413 + } 414 + 415 + writeJSON(w, http.StatusOK, map[string]any{ 416 + "tap": tapJSON, 417 + "jetstream": jsJSON, 418 + "documents": docs, 419 + "pending_jobs": pending, 420 + }) 421 + } 422 + 409 423 // knownActivityParams is the whitelist of accepted query parameters for the activity endpoint. 410 424 var knownActivityParams = map[string]bool{ 411 425 "limit": true, "offset": true, ··· 477 491 "events": out, 478 492 }) 479 493 } 480 -
+118
packages/api/internal/api/router.go
··· 1 + package api 2 + 3 + import ( 4 + "net/http" 5 + 6 + "tangled.org/desertthunder.dev/twister/internal/view" 7 + ) 8 + 9 + func (s *Server) router() http.Handler { 10 + mux := http.NewServeMux() 11 + 12 + mux.HandleFunc("GET /healthz", s.handleHealthz) 13 + mux.HandleFunc("GET /readyz", s.handleReadyz) 14 + mux.HandleFunc("GET /oauth/client-metadata.json", s.handleOAuthClientMetadata) 15 + 16 + mux.HandleFunc("GET /documents/{id}", s.handleGetDocument) 17 + mux.HandleFunc("GET /profiles/{did}/summary", s.handleProfileSummary) 18 + mux.HandleFunc("GET /backlinks/count", s.handleBacklinksCount) 19 + 20 + s.registerSearchRoutes(mux) 21 + s.registerActivityRoutes(mux) 22 + s.registerIdentityRoutes(mux) 23 + s.registerXRPCRoutes(mux) 24 + s.registerActorRoutes(mux) 25 + s.registerIssueRoutes(mux) 26 + s.registerPullRoutes(mux) 27 + s.registerAdminRoutes(mux) 28 + s.registerSiteRoutes(mux) 29 + 30 + return s.withMiddleware(mux) 31 + } 32 + 33 + func (s *Server) registerSearchRoutes(mux *http.ServeMux) { 34 + mux.HandleFunc("GET /search", s.handleSearch) 35 + mux.HandleFunc("GET /search/keyword", s.handleSearchKeyword) 36 + } 37 + 38 + func (s *Server) registerActivityRoutes(mux *http.ServeMux) { 39 + mux.HandleFunc("GET /activity", s.handleActivity) 40 + mux.HandleFunc("GET /activity/stream", s.handleActivityStream) 41 + } 42 + 43 + func (s *Server) registerIdentityRoutes(mux *http.ServeMux) { 44 + mux.HandleFunc("GET /identity/resolve", s.handleResolveHandle) 45 + mux.HandleFunc("GET /identity/did/{did}", s.handleDidDocument) 46 + } 47 + 48 + func (s *Server) registerXRPCRoutes(mux *http.ServeMux) { 49 + mux.HandleFunc("GET /xrpc/knot/{knotHost}/{nsid}", s.handleKnotProxy) 50 + mux.HandleFunc("GET /xrpc/pds/{pds}/{nsid}", s.handlePdsProxy) 51 + mux.HandleFunc("GET /xrpc/bsky/{nsid}", s.handleBskyProxy) 52 + } 53 + 54 + func (s *Server) registerActorRepoRoutes(mux *http.ServeMux) { 55 + const base = "GET /actors/{handle}/repos/{repo}" 56 + 57 + mux.HandleFunc(base, s.handleGetActorRepo) 58 + mux.HandleFunc(base+"/tree", s.handleRepoTree) 59 + mux.HandleFunc(base+"/blob", s.handleRepoBlob) 60 + mux.HandleFunc(base+"/log", s.handleRepoLog) 61 + mux.HandleFunc(base+"/branches", s.handleRepoBranches) 62 + mux.HandleFunc(base+"/default-branch", s.handleRepoDefaultBranch) 63 + mux.HandleFunc(base+"/languages", s.handleRepoLanguages) 64 + mux.HandleFunc(base+"/tags", s.handleRepoTags) 65 + mux.HandleFunc(base+"/diff", s.handleRepoDiff) 66 + mux.HandleFunc(base+"/compare", s.handleRepoCompare) 67 + mux.HandleFunc(base+"/issues", s.handleRepoIssues) 68 + mux.HandleFunc(base+"/pulls", s.handleRepoPulls) 69 + } 70 + 71 + func (s *Server) registerActorRoutes(mux *http.ServeMux) { 72 + mux.HandleFunc("GET /actors/{handle}", s.handleGetActor) 73 + mux.HandleFunc("GET /actors/{handle}/repos", s.handleListActorRepos) 74 + mux.HandleFunc("GET /actors/{handle}/issues", s.handleActorIssues) 75 + mux.HandleFunc("GET /actors/{handle}/pulls", s.handleActorPulls) 76 + mux.HandleFunc("GET /actors/{handle}/following", s.handleActorFollowing) 77 + mux.HandleFunc("GET /actors/{handle}/strings", s.handleActorStrings) 78 + 79 + s.registerActorRepoRoutes(mux) 80 + } 81 + 82 + func (s *Server) registerIssueRoutes(mux *http.ServeMux) { 83 + const base = "GET /issues/{handle}/{rkey}" 84 + 85 + mux.HandleFunc(base, s.handleIssueDetail) 86 + mux.HandleFunc(base+"/comments", s.handleIssueComments) 87 + } 88 + 89 + func (s *Server) registerPullRoutes(mux *http.ServeMux) { 90 + const base = "GET /pulls/{handle}/{rkey}" 91 + 92 + mux.HandleFunc(base, s.handlePullDetail) 93 + mux.HandleFunc(base+"/comments", s.handlePullComments) 94 + } 95 + 96 + func (s *Server) registerAdminRoutes(mux *http.ServeMux) { 97 + if !s.cfg.EnableAdminEndpoints { 98 + return 99 + } 100 + mux.HandleFunc("GET /admin/status", s.handleAdminStatus) 101 + mux.HandleFunc("POST /admin/reindex", s.handleAdminReindex) 102 + } 103 + 104 + func (s *Server) registerDocsRoutes(mux *http.ServeMux, h http.Handler) { 105 + mux.Handle("GET /docs", h) 106 + mux.Handle("GET /docs/search", h) 107 + mux.Handle("GET /docs/documents", h) 108 + mux.Handle("GET /docs/health", h) 109 + } 110 + 111 + func (s *Server) registerSiteRoutes(mux *http.ServeMux) { 112 + site := view.Handler() 113 + mux.Handle("GET /static/", site) 114 + 115 + s.registerDocsRoutes(mux, site) 116 + 117 + mux.Handle("GET /{$}", site) 118 + }