a love letter to tangled (android, iOS, and a search API)

feat: indexing job management and auditing

- indexing audit logs
- add admin checks and improved indexing checks.

+2046 -928
+34 -4
README.md
··· 63 63 just api-dev remote 64 64 ``` 65 65 66 + Run the API smoke checks from the repo root: 67 + 68 + ```bash 69 + uv run --project packages/scripts/api twister-api-smoke 70 + ``` 71 + 72 + To verify admin endpoints as well, ensure `ADMIN_AUTH_TOKEN` is present in the 73 + environment before running the smoke script. 74 + 66 75 To run the indexer in local file mode as well: 67 76 68 77 ```bash ··· 91 100 VITE_TWISTER_API_BASE_URL=http://localhost:8080 92 101 ``` 93 102 103 + ### Local API DB 104 + 105 + The experimental local API database lives at `packages/api/twister-dev.db`. 106 + Treat it as disposable unless you explicitly back it up. 107 + 108 + Operational rules: 109 + 110 + 1. Stop the API before copying or restoring the file. 111 + 2. Copy `twister-dev.db` and any matching `-wal` or `-shm` sidecars together. 112 + 3. Prefer restore-or-rebuild over manual repair if the DB looks suspect. 113 + 4. Let the file grow during experiments, then compact or delete it afterward. 114 + 115 + Useful local commands: 116 + 117 + ```bash 118 + cd packages/api 119 + du -h twister-dev.db* 120 + ls -lh twister-dev.db* 121 + ``` 122 + 94 123 ## Infrastructure Setup 95 124 96 125 ### Turso ··· 139 168 - `HTTP_BIND_ADDR` 140 169 - `SEARCH_DEFAULT_LIMIT` 141 170 - `SEARCH_MAX_LIMIT` 171 + - `ENABLE_ADMIN_ENDPOINTS` 172 + - `ADMIN_AUTH_TOKEN` 173 + - `READ_THROUGH_MODE` 174 + - `READ_THROUGH_COLLECTIONS` 175 + - `READ_THROUGH_MAX_ATTEMPTS` 142 176 143 177 Set these indexer-specific variables: 144 178 ··· 157 191 3. Verify API readiness and indexer health. 158 192 4. Run `twister backfill` with your seed file. 159 193 5. Treat the environment as search-ready only after historical backfill completes. 160 - 161 - ## Docs 162 - 163 - - Index: [`docs/README.md`](docs/README.md)
+1 -1
apps/twisted/README.md
··· 6 6 7 7 - Node.js 20+ 8 8 - pnpm 9 - - The Twister API running locally (see `packages/api/README.md`) 9 + - The Twister API running locally 10 10 11 11 ## Running locally 12 12
+2 -1
docs/adr/storage.md
··· 127 127 3. Prefer restore-or-rebuild over repair if the DB becomes suspect. 128 128 4. Allow the file to grow during active experiments, then compact or delete it afterward. 129 129 130 - The concrete local backup, restore, and disk-growth procedures live in [packages/api/README.md](/Users/owais/Projects/Twisted/packages/api/README.md). 130 + The concrete local backup, restore, and disk-growth procedures live in 131 + [docs/reference/api.md](/Users/owais/Projects/Twisted/docs/reference/api.md). 131 132 132 133 ## Migration Path To Production Turso 133 134
+57 -2
docs/reference/api.md
··· 61 61 62 62 When `ENABLE_ADMIN_ENDPOINTS=true` with a configured `ADMIN_AUTH_TOKEN`: 63 63 64 + - **`GET /admin/status`** — Tap cursor, JetStream cursor, document count, and 65 + read-through queue status 66 + - **`GET /admin/indexing/jobs`** — List queue rows, filtered by `status`, 67 + `source`, or `document` 68 + - **`GET /admin/indexing/audit`** — List append-only audit rows, filtered by 69 + `source`, `decision`, or `document` 70 + - **`POST /admin/indexing/enqueue`** — Queue a single record by explicit body 64 71 - **`POST /admin/reindex`** — Trigger FTS re-sync 65 72 73 + ### Smoke Checks 74 + 75 + Smoke checks for the API surface live in `packages/scripts/api/`. 76 + 77 + From the repo root: 78 + 79 + ```sh 80 + uv run --project packages/scripts/api twister-api-smoke 81 + ``` 82 + 83 + If `ADMIN_AUTH_TOKEN` is present in the environment, the smoke script can also 84 + verify `GET /admin/status`. 85 + 66 86 ### Static Content 67 87 68 88 The API also serves a search site with live search and API documentation at `/` and `/docs*`, built with Alpine.js (no build step, embedded in `internal/view/`). ··· 81 101 82 102 **record_state** — Issue and PR state cache (open/closed/merged). Keyed by subject AT-URI. 83 103 104 + **indexing_jobs** — Durable read-through/admin queue with status, lease owner, 105 + lease expiry, retry counters, and terminal states (`failed`, `dead_letter`). 106 + 107 + **indexing_audit** — Append-only record of enqueue decisions, retries, skips, 108 + completions, and dead letters. 109 + 84 110 **document_embeddings** — Vector storage (768-dim F32_BLOB with DiskANN cosine index). Schema ready but not yet populated. 85 111 86 112 **embedding_jobs** — Async embedding job queue. Schema ready but worker not yet active. ··· 94 120 3. Normalize into a document (extract title, body, summary, metadata) 95 121 4. Optionally enrich via XRPC (resolve author handle, repo name, web URL) 96 122 5. Upsert into the database (auto-syncs FTS) 97 - 6. Advance cursor and acknowledge to Tap 123 + 6. Persist the Tap cursor and then acknowledge the event 124 + 125 + The indexer resumes from its last cursor on restart and replays idempotently. 126 + It logs status every 30 seconds and uses exponential backoff (1s–5s) for 127 + transient failures. 98 128 99 - The indexer resumes from its last cursor on restart (no duplicate processing). It logs status every 30 seconds and uses exponential backoff (1s–5s) for transient failures. 129 + Read-through indexing is `missing` by default. Only allowed collections can be 130 + queued, detail reads queue single focal records, and bulk list handlers no 131 + longer enqueue whole collections. 100 132 101 133 ## Record Normalizers 102 134 ··· 140 172 | `TAP_URL` | — | Tap WebSocket URL | 141 173 | `TAP_AUTH_PASSWORD` | — | Tap admin password | 142 174 | `INDEXED_COLLECTIONS` | all | Collection allowlist (CSV, supports wildcards) | 175 + | `READ_THROUGH_MODE` | missing | `off`, `missing`, or `broad` | 176 + | `READ_THROUGH_COLLECTIONS` | `INDEXED_COLLECTIONS` | Read-through allowlist | 177 + | `READ_THROUGH_MAX_ATTEMPTS`| 5 | Retries before `dead_letter` | 143 178 | `HTTP_BIND_ADDR` | `:8080` | API server bind address | 144 179 | `INDEXER_HEALTH_ADDR` | `:9090` | Indexer health probe address | 145 180 | `LOG_LEVEL` | info | debug/info/warn/error | ··· 159 194 - **tap** — Tap instance (external dependency) 160 195 161 196 All services share the same Turso database. The API and indexer are separate deployments of the same binary with different subcommands. 197 + 198 + ## Experimental Local DB 199 + 200 + The local development database lives at `packages/api/twister-dev.db` when the 201 + API runs with `--local`. 202 + 203 + Operational rules: 204 + 205 + 1. Stop the API before backup or restore. 206 + 2. Copy `twister-dev.db` and any matching `-wal` or `-shm` files together. 207 + 3. Prefer restore-or-rebuild over repair if the file becomes suspect. 208 + 4. Let the DB grow during active experiments, then compact or delete it later. 209 + 210 + Useful local inspection: 211 + 212 + ```sh 213 + cd packages/api 214 + du -h twister-dev.db* 215 + ls -lh twister-dev.db* 216 + ```
+13 -11
docs/reference/resync.md
··· 21 21 when the `indexer` consumes events from Tap. Completeness depends on which DIDs 22 22 Tap is tracking. 23 23 24 - **Read-through indexing** closes gaps on demand: when the API fetches a record 25 - not yet in the index, it enqueues a background job. This supplements Tap but is 26 - not a substitute for it. 24 + **Read-through indexing** now runs in `missing` mode by default: when the API 25 + fetches a record that is absent or stale, and the collection is allowed, it 26 + enqueues a background job. Bulk list reads no longer enqueue entire collections. 27 27 28 28 **JetStream** feeds only the activity cache (`/activity`). It does not contribute 29 29 to the search index. ··· 110 110 ``` 111 111 112 112 2. Once Tap is tracking the DID, the `indexer` will deliver historical events. 113 - Monitor progress via `GET /admin/status` (requires `ENABLE_ADMIN_ENDPOINTS=true`). 113 + Monitor progress via `GET /admin/status` and inspect backlog or failures with 114 + `GET /admin/indexing/jobs` and `GET /admin/indexing/audit`. 114 115 115 - 3. If you need the record indexed immediately, fetch it through the API — the 116 - read-through indexer will enqueue it automatically. 116 + 3. If you need the record indexed immediately, fetch the detail endpoint through 117 + the API or enqueue it explicitly with `POST /admin/indexing/enqueue`. 117 118 118 119 ### Enrichment gaps 119 120 ··· 178 179 179 180 Response includes: 180 181 181 - - `tap.cursor` — last Tap event ID processed by the indexer 182 - - `tap.updated_at` — when the cursor was last advanced 183 - - `jetstream.cursor` — JetStream timestamp cursor (activity cache only) 184 - - `documents` — total searchable document count 185 - - `pending_jobs` — read-through indexing jobs not yet processed 182 + - `tap.cursor` and `tap.updated_at` 183 + - `jetstream.cursor` and `jetstream.updated_at` 184 + - `documents` 185 + - `read_through.pending`, `processing`, `completed`, `failed`, `dead_letter` 186 + - `read_through.oldest_pending_age_s` and `oldest_running_age_s` 187 + - `read_through.last_completed_at` and `last_processed_at`
+9 -4
docs/specs/search.md
··· 96 96 - the production backend choice is documented with explicit tradeoffs 97 97 - the chosen production backend has a migration path from the experimental local setup 98 98 99 - The concrete local DB operating procedure lives in `packages/api/README.md`. 99 + The concrete local DB operating procedure lives in `docs/reference/api.md`. 100 100 The production migration path is documented in `docs/adr/storage.md`. 101 101 102 102 ### Read-Through Indexing 103 103 104 - When the API fetches a repo, issue, PR, profile, or similar record directly from upstream, it should enqueue background indexing work if that record is not already searchable. Tap remains the primary ingest path; read-through indexing only closes gaps. 104 + When the API fetches a repo, issue, PR, profile, or similar detail record 105 + directly from upstream, it should enqueue background indexing work only when 106 + that record is missing or stale. Tap remains the primary ingest path; 107 + read-through indexing only closes gaps. 105 108 106 109 Requirements: 107 110 108 111 - add a durable job table for on-demand indexing 109 112 - deduplicate jobs by stable document identity 110 113 - reuse the existing normalization and upsert path 111 - - trigger jobs from the handlers that already fetch upstream records 114 + - trigger jobs from detail handlers that already fetch upstream records 115 + - do not enqueue whole collections from list or browse handlers 112 116 113 117 Acceptance: 114 118 115 119 - a fetched-but-missing record becomes searchable shortly after the first successful API read 116 120 - repeated page views do not create unbounded duplicate work 121 + - queue state and terminal failures are inspectable through admin endpoints 117 122 - failures are visible through logs and smoke tests 118 123 119 124 ### Activity Cache ··· 215 220 } 216 221 ``` 217 222 218 - ## Pragmatic Search Strategy 223 + ## Search Strategy 219 224 220 225 Indexing via Tap is useful but has proven unreliable for maintaining complete, up-to-date coverage. The approach: 221 226
+2 -185
packages/api/README.md
··· 1 - # Twister 2 - 3 - Tap-based indexing and search API for Tangled. Acts as a proxy layer between the Twisted app and all upstream AT Protocol services (knots, PDS, Bluesky, Constellation, Jetstream). 4 - 5 - ## Requirements 6 - 7 - - Go 1.25+ 8 - - A Turso database (or local SQLite for development) 9 - 10 - ## Running locally 11 - 12 - ```sh 13 - cd packages/api 14 - 15 - # Start the API server with a local SQLite database (twister-dev.db) 16 - go run . api --local 17 - ``` 18 - 19 - The server listens on `:8080` by default. Logs are printed as text when `--local` is set. 20 - 21 - ## API Smoke Tests 22 - 23 - Smoke checks for the API surface live in a uv-managed Python project at 24 - `packages/scripts/api/`. 25 - 26 - From the repo root: 27 - 28 - ```sh 29 - uv run --project packages/scripts/api twister-api-smoke 30 - ``` 31 - 32 - Optional base URL override: 33 - 34 - ```sh 35 - TWISTER_API_BASE_URL=http://localhost:8080 \ 36 - uv run --project packages/scripts/api twister-api-smoke 37 - ``` 38 - 39 - ## Experimental Local DB Operations 40 - 41 - The experimental local database lives at `packages/api/twister-dev.db` when you run Twister from `packages/api` with `--local`. 42 - 43 - This database is for local experimentation only. Treat it as disposable unless you explicitly back it up. 44 - 45 - ### Backup 46 - 47 - Recommended procedure: 48 - 49 - 1. Stop the Twister process using the local DB. 50 - 2. Copy the database file and any SQLite sidecar files if they exist. 51 - 52 - Example: 53 - 54 - ```sh 55 - cd packages/api 56 - mkdir -p backups 57 - timestamp="$(date +%Y%m%d-%H%M%S)" 58 - cp twister-dev.db "backups/twister-dev-${timestamp}.db" 59 - test -f twister-dev.db-wal && cp twister-dev.db-wal "backups/twister-dev-${timestamp}.db-wal" 60 - test -f twister-dev.db-shm && cp twister-dev.db-shm "backups/twister-dev-${timestamp}.db-shm" 61 - ``` 62 - 63 - For this experimental DB, stop-and-copy is preferred over hot backup complexity. 64 - 65 - ### Restore 66 - 67 - Recommended procedure: 1 + # Twister API 68 2 69 - 1. Stop the Twister process. 70 - 2. Move the current local DB aside if you want to keep it. 71 - 3. Copy the backup file back to `twister-dev.db`. 72 - 4. Restore matching `-wal` and `-shm` files only if they were captured with the same backup set. 73 - 74 - Example: 75 - 76 - ```sh 77 - cd packages/api 78 - mv twister-dev.db "twister-dev.db.broken.$(date +%Y%m%d-%H%M%S)" 2>/dev/null || true 79 - cp backups/twister-dev-YYYYMMDD-HHMMSS.db twister-dev.db 80 - ``` 81 - 82 - After restore, restart Twister and let the app run migrations normally. 83 - 84 - ### Disk Growth 85 - 86 - The local DB will grow during experimentation because of: 87 - 88 - - indexed documents 89 - - FTS tables 90 - - activity cache rows 91 - - repeated backfill or reindex runs 92 - 93 - Recommended operating procedure: 94 - 95 - 1. Check file growth periodically. 96 - 2. Delete and rebuild the experimental DB freely when the dataset is no longer useful. 97 - 3. Run `VACUUM` only when you intentionally want to compact a long-lived local DB. 98 - 4. Keep old backups out of the repo and rotate them manually. 99 - 100 - Example inspection commands: 101 - 102 - ```sh 103 - cd packages/api 104 - du -h twister-dev.db* 105 - ls -lh twister-dev.db* 106 - ``` 107 - 108 - For experimental use, the simplest policy is usually: 109 - 110 - - back up anything worth keeping 111 - - remove the DB when the experiment is over 112 - - let Twister rebuild from migrations and backfill paths 113 - 114 - ### Failure Recovery Rule 115 - 116 - If the experimental DB becomes suspicious or inconsistent, prefer restore-or-rebuild over manual repair. This is a developer convenience database, not the source of truth. 117 - 118 - ## Environment variables 119 - 120 - Copy `.env.example` to `.env` in the repo root (or `packages/api/`). The server loads `.env`, `../.env`, and `../../.env` automatically. 121 - 122 - | Variable | Default | Description | 123 - | -------------------------- | -------------------------------------- | ------------------------------------------------------- | 124 - | `TURSO_DATABASE_URL` | — | Turso/libSQL connection URL (required unless `--local`) | 125 - | `TURSO_AUTH_TOKEN` | — | Auth token (required for non-file URLs) | 126 - | `HTTP_BIND_ADDR` | `:8080` | Address the HTTP server listens on | 127 - | `LOG_LEVEL` | `info` | Log level (`debug`, `info`, `warn`, `error`) | 128 - | `LOG_FORMAT` | `json` | Log format (`json` or `text`) | 129 - | `SEARCH_DEFAULT_LIMIT` | `20` | Default result count for search | 130 - | `SEARCH_MAX_LIMIT` | `100` | Maximum result count for search | 131 - | `ENABLE_ADMIN_ENDPOINTS` | `false` | Expose `/admin/*` endpoints | 132 - | `ADMIN_AUTH_TOKEN` | — | Bearer token required for admin endpoints | 133 - | `CONSTELLATION_URL` | `https://constellation.microcosm.blue` | Constellation API base URL | 134 - | `CONSTELLATION_USER_AGENT` | `twister/1.0 …` | User-Agent sent to Constellation | 135 - | `TAP_URL` | — | Tap firehose URL (indexer only) | 136 - | `TAP_AUTH_PASSWORD` | — | Tap auth password (indexer only) | 137 - | `INDEXED_COLLECTIONS` | — | Comma-separated AT collections to index | 138 - 139 - ## CLI commands 140 - 141 - ```sh 142 - twister api # Start the HTTP API server 143 - twister indexer # Start the Tap firehose consumer 144 - twister backfill # Seed the index from upstream APIs 145 - twister reindex # Re-process existing documents (re-syncs FTS) 146 - twister enrich # Backfill RepoName, AuthorHandle, WebURL on existing documents 147 - ``` 148 - 149 - ### enrich 150 - 151 - Resolves missing `author_handle`, `repo_name`, and `web_url` fields on documents already 152 - in the database. Run this after deploying enrichment changes or when search results show 153 - documents with empty author handles. 154 - 155 - ```sh 156 - twister enrich --local # all documents 157 - twister enrich --local --collection sh.tangled.repo 158 - twister enrich --local --did did:plc:abc123 159 - twister enrich --local --dry-run # preview without writing 160 - ``` 161 - 162 - Flags: `--collection`, `--did`, `--document`, `--dry-run`, `--concurrency` (default 5). 163 - 164 - ## Proxy endpoints 165 - 166 - The API proxies all upstream AT Protocol and social-graph requests so the app has a single origin: 167 - 168 - | Route | Upstream | 169 - | ------------------------------- | ------------------------------------------------------------- | 170 - | `GET /proxy/knot/{host}/{nsid}` | `https://{host}/xrpc/{nsid}` | 171 - | `GET /proxy/pds/{host}/{nsid}` | `https://{host}/xrpc/{nsid}` | 172 - | `GET /proxy/bsky/{nsid}` | `https://public.api.bsky.app/xrpc/{nsid}` | 173 - | `GET /identity/resolve` | `https://bsky.social/xrpc/com.atproto.identity.resolveHandle` | 174 - | `GET /identity/did/{did}` | `https://plc.directory/{did}` or `/.well-known/did.json` | 175 - | `GET /backlinks/count` | Constellation `getBacklinksCount` (cached) | 176 - | `WS /activity/stream` | `wss://jetstream2.us-east.bsky.network/subscribe` | 177 - 178 - ## Admin endpoints 179 - 180 - Available when `ENABLE_ADMIN_ENDPOINTS=true`. Require `Authorization: Bearer <ADMIN_AUTH_TOKEN>` when 181 - `ADMIN_AUTH_TOKEN` is set. 182 - 183 - | Route | Description | 184 - | ---------------------- | -------------------------------------------------------- | 185 - | `GET /admin/status` | Tap cursor, JetStream cursor, document count, job queue | 186 - | `POST /admin/reindex` | Re-sync all (or filtered) documents into the FTS index | 3 + This package's operational docs live in [`doc.go`](./doc.go).
+149
packages/api/doc.go
··· 1 + // Twister is the Tap-backed indexing and search API for Tangled. 2 + // 3 + // It proxies upstream AT Protocol services such as knots, PDS endpoints, 4 + // Bluesky, Constellation, and Jetstream so the app can use a single origin. 5 + // 6 + // Requirements 7 + // 8 + // - Go 1.25+ 9 + // - A Turso database, or local SQLite for development 10 + // 11 + // Running locally 12 + // 13 + // cd packages/api 14 + // go run . api --local 15 + // 16 + // The local API listens on :8080 by default and uses packages/api/twister-dev.db. 17 + // Logs are printed as text when --local is set. 18 + // 19 + // # API smoke tests 20 + // 21 + // Smoke checks live in packages/scripts/api/. From the repo root: 22 + // 23 + // uv run --project packages/scripts/api twister-api-smoke 24 + // 25 + // Optional base URL override: 26 + // 27 + // TWISTER_API_BASE_URL=http://localhost:8080 \ 28 + // uv run --project packages/scripts/api twister-api-smoke 29 + // 30 + // # Experimental local DB operations 31 + // 32 + // The experimental local database lives at packages/api/twister-dev.db when 33 + // you run Twister with --local. Treat it as disposable unless you explicitly 34 + // back it up. 35 + // 36 + // Backup: 37 + // 38 + // 1. Stop the Twister process using the local DB. 39 + // 2. Copy the database file and any SQLite sidecar files if they exist. 40 + // 41 + // Example: 42 + // 43 + // cd packages/api 44 + // mkdir -p backups 45 + // timestamp="$(date +%Y%m%d-%H%M%S)" 46 + // cp twister-dev.db "backups/twister-dev-${timestamp}.db" 47 + // test -f twister-dev.db-wal && cp twister-dev.db-wal "backups/twister-dev-${timestamp}.db-wal" 48 + // test -f twister-dev.db-shm && cp twister-dev.db-shm "backups/twister-dev-${timestamp}.db-shm" 49 + // 50 + // Restore: 51 + // 52 + // 1. Stop the Twister process. 53 + // 2. Move the current local DB aside if you want to keep it. 54 + // 3. Copy the backup file back to twister-dev.db. 55 + // 4. Restore matching -wal and -shm files only if they came from the same set. 56 + // 57 + // Example: 58 + // 59 + // cd packages/api 60 + // mv twister-dev.db "twister-dev.db.broken.$(date +%Y%m%d-%H%M%S)" 2>/dev/null || true 61 + // cp backups/twister-dev-YYYYMMDD-HHMMSS.db twister-dev.db 62 + // 63 + // Disk growth: 64 + // 65 + // The local DB grows because of indexed documents, FTS tables, activity cache 66 + // rows, and repeated backfill or reindex runs. 67 + // 68 + // Recommended operating procedure: 69 + // 70 + // 1. Check file growth periodically. 71 + // 2. Delete and rebuild the DB freely when the dataset is no longer useful. 72 + // 3. Run VACUUM only when you intentionally want to compact a long-lived DB. 73 + // 4. Keep old backups out of the repo and rotate them manually. 74 + // 75 + // Inspection commands: 76 + // 77 + // cd packages/api 78 + // du -h twister-dev.db* 79 + // ls -lh twister-dev.db* 80 + // 81 + // Failure recovery: prefer restore-or-rebuild over manual repair if the 82 + // experimental DB becomes 83 + // suspicious or inconsistent. It is a developer convenience database, not the 84 + // source of truth. 85 + // 86 + // # Environment variables 87 + // 88 + // Copy .env.example to .env in the repo root or packages/api/. The server loads 89 + // .env, ../.env, and ../../.env automatically. 90 + // 91 + // - TURSO_DATABASE_URL: Turso/libSQL connection URL, required unless --local 92 + // - TURSO_AUTH_TOKEN: auth token, required for non-file URLs 93 + // - HTTP_BIND_ADDR: default :8080 94 + // - LOG_LEVEL: debug, info, warn, or error; default info 95 + // - LOG_FORMAT: json or text; default json 96 + // - SEARCH_DEFAULT_LIMIT: default 20 97 + // - SEARCH_MAX_LIMIT: default 100 98 + // - ENABLE_ADMIN_ENDPOINTS: default false 99 + // - ADMIN_AUTH_TOKEN: bearer token for admin endpoints 100 + // - CONSTELLATION_URL: default https://constellation.microcosm.blue 101 + // - CONSTELLATION_USER_AGENT: user-agent sent to Constellation 102 + // - TAP_URL: Tap firehose URL, indexer only 103 + // - TAP_AUTH_PASSWORD: Tap auth password, indexer only 104 + // - INDEXED_COLLECTIONS: comma-separated AT collections to index 105 + // - READ_THROUGH_MODE: off, missing, or broad; default missing 106 + // - READ_THROUGH_COLLECTIONS: read-through allowlist, default INDEXED_COLLECTIONS 107 + // - READ_THROUGH_MAX_ATTEMPTS: max retries before dead_letter, default 5 108 + // 109 + // CLI commands 110 + // 111 + // twister api 112 + // twister indexer 113 + // twister backfill 114 + // twister reindex 115 + // twister enrich 116 + // 117 + // Enrich: 118 + // 119 + // Resolves missing author_handle, repo_name, and web_url fields on documents 120 + // already in the database. 121 + // 122 + // twister enrich --local 123 + // twister enrich --local --collection sh.tangled.repo 124 + // twister enrich --local --did did:plc:abc123 125 + // twister enrich --local --dry-run 126 + // 127 + // Flags: --collection, --did, --document, --dry-run, --concurrency (default 5). 128 + // 129 + // Proxy endpoints 130 + // 131 + // - GET /proxy/knot/{host}/{nsid} -> https://{host}/xrpc/{nsid} 132 + // - GET /proxy/pds/{host}/{nsid} -> https://{host}/xrpc/{nsid} 133 + // - GET /proxy/bsky/{nsid} -> https://public.api.bsky.app/xrpc/{nsid} 134 + // - GET /identity/resolve -> https://bsky.social/xrpc/com.atproto.identity.resolveHandle 135 + // - GET /identity/did/{did} -> https://plc.directory/{did} or /.well-known/did.json 136 + // - GET /backlinks/count -> Constellation getBacklinksCount, cached 137 + // - WS /activity/stream -> wss://jetstream2.us-east.bsky.network/subscribe 138 + // 139 + // # Admin endpoints 140 + // 141 + // Admin routes require ENABLE_ADMIN_ENDPOINTS=true. If ADMIN_AUTH_TOKEN is set, 142 + // requests must send Authorization: Bearer <ADMIN_AUTH_TOKEN>. 143 + // 144 + // - GET /admin/status: cursor state, queue counts, oldest ages, last activity 145 + // - GET /admin/indexing/jobs: inspect queue rows by status, source, or document 146 + // - GET /admin/indexing/audit: inspect append-only indexing audit rows 147 + // - POST /admin/indexing/enqueue: queue one explicit record for indexing 148 + // - POST /admin/reindex: re-sync all or filtered documents into the FTS index 149 + package main
+28 -14
packages/api/internal/api/actors.go
··· 427 427 return 428 428 } 429 429 430 - issues, stateMap, err := s.fetchIssuesAndStates(r, repo.PDS, repo.DID) 430 + issues, stateMap, _, err := s.fetchIssuesAndStates(r, repo.PDS, repo.DID) 431 431 if err != nil { 432 432 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch issues")) 433 433 return ··· 465 465 return 466 466 } 467 467 468 - pulls, statusMap, err := s.fetchPullsAndStatuses(r, repo.PDS, repo.DID) 468 + pulls, statusMap, _, err := s.fetchPullsAndStatuses(r, repo.PDS, repo.DID) 469 469 if err != nil { 470 470 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch pulls")) 471 471 return ··· 504 504 return 505 505 } 506 506 507 - issues, stateMap, err := s.fetchIssuesAndStates(r, actor.PDS, actor.DID) 507 + issues, stateMap, _, err := s.fetchIssuesAndStates(r, actor.PDS, actor.DID) 508 508 if err != nil { 509 509 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch issues")) 510 510 return ··· 535 535 return 536 536 } 537 537 538 - pulls, statusMap, err := s.fetchPullsAndStatuses(r, actor.PDS, actor.DID) 538 + pulls, statusMap, _, err := s.fetchPullsAndStatuses(r, actor.PDS, actor.DID) 539 539 if err != nil { 540 540 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch pulls")) 541 541 return ··· 632 632 } 633 633 s.enqueueXRPCRecord(r.Context(), rec.URI, rec.CID, rec.Value) 634 634 635 - _, stateMap, err := s.fetchIssuesAndStates(r, actor.PDS, actor.DID) 635 + _, stateMap, states, err := s.fetchIssuesAndStates(r, actor.PDS, actor.DID) 636 636 if err != nil { 637 637 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch issue states")) 638 638 return 639 639 } 640 + for _, entry := range states { 641 + if issue, _ := entry.Value["issue"].(string); issue == rec.URI { 642 + s.syncStateEntry(r.Context(), entry) 643 + break 644 + } 645 + } 640 646 641 647 writeJSON(w, http.StatusOK, issueEntry{ 642 648 recordEntry: recordEntry{URI: rec.URI, CID: rec.CID, Value: rec.Value}, ··· 703 709 } 704 710 s.enqueueXRPCRecord(r.Context(), rec.URI, rec.CID, rec.Value) 705 711 706 - _, statusMap, err := s.fetchPullsAndStatuses(r, actor.PDS, actor.DID) 712 + _, statusMap, statuses, err := s.fetchPullsAndStatuses(r, actor.PDS, actor.DID) 707 713 if err != nil { 708 714 writeJSON(w, http.StatusBadGateway, errorBody("upstream_error", "failed to fetch pull statuses")) 709 715 return 710 716 } 717 + for _, entry := range statuses { 718 + if pull, _ := entry.Value["pull"].(string); pull == rec.URI { 719 + s.syncStateEntry(r.Context(), entry) 720 + break 721 + } 722 + } 711 723 712 724 writeJSON(w, http.StatusOK, pullEntry{ 713 725 recordEntry: recordEntry{URI: rec.URI, CID: rec.CID, Value: rec.Value}, ··· 755 767 }) 756 768 } 757 769 758 - func (s *Server) fetchIssuesAndStates(r *http.Request, pds, did string) ([]xrpc.ListRecordEntry, map[string]string, error) { 770 + func (s *Server) fetchIssuesAndStates( 771 + r *http.Request, pds, did string, 772 + ) ([]xrpc.ListRecordEntry, map[string]string, []xrpc.ListRecordEntry, error) { 759 773 issueCh := make(chan []xrpc.ListRecordEntry, 1) 760 774 stateCh := make(chan []xrpc.ListRecordEntry, 1) 761 775 errCh := make(chan error, 2) ··· 786 800 case e := <-stateCh: 787 801 states = e 788 802 case err := <-errCh: 789 - return nil, nil, err 803 + return nil, nil, nil, err 790 804 } 791 805 } 792 806 793 807 stateMap := make(map[string]string, len(states)) 794 - s.enqueueXRPCList(r.Context(), states) 795 808 for _, e := range states { 796 809 issueURI, _ := e.Value["issue"].(string) 797 810 state, _ := e.Value["state"].(string) ··· 800 813 } 801 814 } 802 815 803 - return issues, stateMap, nil 816 + return issues, stateMap, states, nil 804 817 } 805 818 806 - func (s *Server) fetchPullsAndStatuses(r *http.Request, pds, did string) ([]xrpc.ListRecordEntry, map[string]string, error) { 819 + func (s *Server) fetchPullsAndStatuses( 820 + r *http.Request, pds, did string, 821 + ) ([]xrpc.ListRecordEntry, map[string]string, []xrpc.ListRecordEntry, error) { 807 822 pullCh := make(chan []xrpc.ListRecordEntry, 1) 808 823 statusCh := make(chan []xrpc.ListRecordEntry, 1) 809 824 errCh := make(chan error, 2) ··· 834 849 case e := <-statusCh: 835 850 statuses = e 836 851 case err := <-errCh: 837 - return nil, nil, err 852 + return nil, nil, nil, err 838 853 } 839 854 } 840 855 841 856 statusMap := make(map[string]string, len(statuses)) 842 - s.enqueueXRPCList(r.Context(), statuses) 843 857 for _, e := range statuses { 844 858 pullURI, _ := e.Value["pull"].(string) 845 859 status, _ := e.Value["status"].(string) ··· 848 862 } 849 863 } 850 864 851 - return pulls, statusMap, nil 865 + return pulls, statusMap, statuses, nil 852 866 } 853 867 854 868 type bskyProfileResponse struct {
+97
packages/api/internal/api/admin_indexing.go
··· 1 + package api 2 + 3 + import ( 4 + "encoding/json" 5 + "net/http" 6 + 7 + "tangled.org/desertthunder.dev/twister/internal/store" 8 + ) 9 + 10 + func (s *Server) handleAdminIndexingJobs(w http.ResponseWriter, r *http.Request) { 11 + if !s.authorizeAdmin(w, r) { 12 + return 13 + } 14 + limit, err := intParam(r, "limit", 50) 15 + if err != nil { 16 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_parameter", "limit must be numeric")) 17 + return 18 + } 19 + offset, err := intParam(r, "offset", 0) 20 + if err != nil { 21 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_parameter", "offset must be numeric")) 22 + return 23 + } 24 + jobs, err := s.store.ListIndexingJobs(r.Context(), store.IndexingJobFilter{ 25 + Status: r.URL.Query().Get("status"), 26 + Source: r.URL.Query().Get("source"), 27 + DocumentID: r.URL.Query().Get("document"), 28 + Limit: limit, 29 + Offset: offset, 30 + }) 31 + if err != nil { 32 + writeJSON(w, http.StatusInternalServerError, errorBody("db_error", "failed to list jobs")) 33 + return 34 + } 35 + writeJSON(w, http.StatusOK, map[string]any{"jobs": jobs, "limit": limit, "offset": offset}) 36 + } 37 + 38 + func (s *Server) handleAdminIndexingAudit(w http.ResponseWriter, r *http.Request) { 39 + if !s.authorizeAdmin(w, r) { 40 + return 41 + } 42 + limit, err := intParam(r, "limit", 50) 43 + if err != nil { 44 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_parameter", "limit must be numeric")) 45 + return 46 + } 47 + offset, err := intParam(r, "offset", 0) 48 + if err != nil { 49 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_parameter", "offset must be numeric")) 50 + return 51 + } 52 + entries, err := s.store.ListIndexingAudit(r.Context(), store.IndexingAuditFilter{ 53 + Source: r.URL.Query().Get("source"), 54 + Decision: r.URL.Query().Get("decision"), 55 + DocumentID: r.URL.Query().Get("document"), 56 + Limit: limit, 57 + Offset: offset, 58 + }) 59 + if err != nil { 60 + writeJSON(w, http.StatusInternalServerError, errorBody("db_error", "failed to list audit")) 61 + return 62 + } 63 + writeJSON(w, http.StatusOK, map[string]any{"audit": entries, "limit": limit, "offset": offset}) 64 + } 65 + 66 + func (s *Server) handleAdminIndexingEnqueue(w http.ResponseWriter, r *http.Request) { 67 + if !s.authorizeAdmin(w, r) { 68 + return 69 + } 70 + var body struct { 71 + URI string `json:"uri"` 72 + CID string `json:"cid"` 73 + Value map[string]any `json:"value"` 74 + } 75 + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { 76 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_body", "expected JSON body")) 77 + return 78 + } 79 + if body.URI == "" || body.CID == "" || body.Value == nil { 80 + writeJSON(w, http.StatusBadRequest, errorBody("invalid_body", "uri, cid, and value are required")) 81 + return 82 + } 83 + s.enqueueRecordForIndexing(r.Context(), store.IndexSourceAdmin, body.URI, body.CID, body.Value) 84 + writeJSON(w, http.StatusAccepted, map[string]string{"status": "accepted"}) 85 + } 86 + 87 + func (s *Server) authorizeAdmin(w http.ResponseWriter, r *http.Request) bool { 88 + if s.cfg.AdminAuthToken == "" { 89 + return true 90 + } 91 + token := r.Header.Get("Authorization") 92 + if token == "Bearer "+s.cfg.AdminAuthToken { 93 + return true 94 + } 95 + writeJSON(w, http.StatusUnauthorized, errorBody("unauthorized", "invalid admin token")) 96 + return false 97 + }
+28 -9
packages/api/internal/api/api.go
··· 14 14 15 15 "tangled.org/desertthunder.dev/twister/internal/config" 16 16 "tangled.org/desertthunder.dev/twister/internal/constellation" 17 + idx "tangled.org/desertthunder.dev/twister/internal/index" 17 18 "tangled.org/desertthunder.dev/twister/internal/normalize" 18 19 "tangled.org/desertthunder.dev/twister/internal/reindex" 19 20 "tangled.org/desertthunder.dev/twister/internal/search" ··· 30 31 constellation *constellation.Client 31 32 xrpc *xrpc.Client 32 33 registry *normalize.Registry 34 + policy idx.Policy 35 + processor *idx.Processor 36 + workerID string 33 37 } 34 38 35 39 // New creates a new API server. 36 40 func New(searchRepo *search.Repository, st store.Store, cfg *config.Config, log *slog.Logger, constellation *constellation.Client, xrpcClient *xrpc.Client) *Server { 41 + registry := normalize.NewRegistry() 42 + policy := idx.NewPolicy(cfg.IndexedCollections, cfg.ReadThroughCollections, cfg.ReadThroughMode) 37 43 return &Server{ 38 44 search: searchRepo, 39 45 store: st, ··· 41 47 log: log, 42 48 constellation: constellation, 43 49 xrpc: xrpcClient, 44 - registry: normalize.NewRegistry(), 50 + registry: registry, 51 + policy: policy, 52 + processor: idx.NewProcessor(st, registry, xrpcClient, policy, log), 53 + workerID: newWorkerID("api"), 45 54 } 46 55 } 47 56 ··· 349 358 350 359 const tapConsumerName = "indexer-tap-v1" 351 360 352 - // handleAdminStatus returns Tap cursor, JetStream cursor, document count, and pending job count. 361 + // handleAdminStatus returns cursor and queue status for indexing subsystems. 353 362 // Route: GET /admin/status 354 363 func (s *Server) handleAdminStatus(w http.ResponseWriter, r *http.Request) { 355 364 if s.cfg.AdminAuthToken != "" { ··· 381 390 return 382 391 } 383 392 384 - pending, err := s.store.CountPendingIndexingJobs(r.Context()) 393 + stats, err := s.store.GetIndexingJobStats(r.Context()) 385 394 if err != nil { 386 - s.log.Error("admin status: count pending jobs failed", slog.String("error", err.Error())) 387 - writeJSON(w, http.StatusInternalServerError, errorBody("db_error", "failed to count pending jobs")) 395 + s.log.Error("admin status: queue stats failed", slog.String("error", err.Error())) 396 + writeJSON(w, http.StatusInternalServerError, errorBody("db_error", "failed to fetch queue stats")) 388 397 return 389 398 } 390 399 ··· 413 422 } 414 423 415 424 writeJSON(w, http.StatusOK, map[string]any{ 416 - "tap": tapJSON, 417 - "jetstream": jsJSON, 418 - "documents": docs, 419 - "pending_jobs": pending, 425 + "tap": tapJSON, 426 + "jetstream": jsJSON, 427 + "documents": docs, 428 + "read_through": map[string]any{ 429 + "pending": stats.Pending, 430 + "processing": stats.Processing, 431 + "completed": stats.Completed, 432 + "failed": stats.Failed, 433 + "dead_letter": stats.DeadLetter, 434 + "oldest_pending_age_s": ageSeconds(stats.OldestPendingAt), 435 + "oldest_running_age_s": ageSeconds(stats.OldestRunningAt), 436 + "last_completed_at": stats.LastCompletedAt, 437 + "last_processed_at": stats.LastProcessedAt, 438 + }, 420 439 }) 421 440 } 422 441
+15
packages/api/internal/api/helpers.go
··· 149 149 } 150 150 return msg 151 151 } 152 + 153 + func newWorkerID(prefix string) string { 154 + return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano()) 155 + } 156 + 157 + func ageSeconds(ts string) int64 { 158 + if ts == "" { 159 + return 0 160 + } 161 + parsed, err := time.Parse(time.RFC3339, ts) 162 + if err != nil { 163 + return 0 164 + } 165 + return int64(time.Since(parsed).Seconds()) 166 + }
+193 -146
packages/api/internal/api/readthrough.go
··· 8 8 "sync" 9 9 "time" 10 10 11 + idx "tangled.org/desertthunder.dev/twister/internal/index" 11 12 "tangled.org/desertthunder.dev/twister/internal/normalize" 12 13 "tangled.org/desertthunder.dev/twister/internal/store" 13 14 "tangled.org/desertthunder.dev/twister/internal/xrpc" ··· 15 16 16 17 const ( 17 18 readThroughIdlePoll = 1 * time.Second 19 + readThroughLeaseDuration = 30 * time.Second 18 20 readThroughStatusInterval = 30 * time.Second 19 - maxIndexingAttempts = 10 20 21 ) 21 22 22 23 func (s *Server) runReadThroughIndexer(ctx context.Context) { 24 + if s.policy.ReadThroughMode() == idx.ReadThroughOff { 25 + s.log.Info("read-through indexer worker disabled") 26 + return 27 + } 28 + 23 29 ticker := time.NewTicker(readThroughIdlePoll) 24 30 defer ticker.Stop() 25 31 ··· 34 40 return 35 41 } 36 42 37 - job, err := s.store.ClaimIndexingJob(ctx) 43 + leaseUntil := time.Now().UTC().Add(readThroughLeaseDuration).Format(time.RFC3339) 44 + job, err := s.store.ClaimIndexingJob(ctx, s.workerID, leaseUntil) 38 45 if err != nil { 39 46 s.log.Warn("read-through claim failed", slog.String("error", err.Error())) 40 - select { 41 - case <-ctx.Done(): 42 - return 43 - case <-ticker.C: 44 - } 47 + <-ticker.C 45 48 continue 46 49 } 47 50 if job == nil { ··· 53 56 continue 54 57 } 55 58 56 - if err := s.processReadThroughJob(ctx, job); err != nil { 57 - if job.Attempts+1 >= maxIndexingAttempts { 58 - s.log.Error("read-through job exceeded max attempts; discarding", 59 - slog.String("document_id", job.DocumentID), 60 - slog.Int("attempts", job.Attempts+1), 61 - slog.String("last_error", err.Error()), 62 - ) 63 - _ = s.store.CompleteIndexingJob(ctx, job.DocumentID) 64 - continue 65 - } 66 - nextDelay := retryDelay(job.Attempts + 1) 67 - nextAt := time.Now().UTC().Add(nextDelay).Format(time.RFC3339) 68 - retryErr := s.store.RetryIndexingJob(ctx, job.DocumentID, nextAt, truncateErr(err)) 69 - if retryErr != nil { 70 - s.log.Error("read-through retry update failed", 71 - slog.String("document_id", job.DocumentID), 72 - slog.String("error", retryErr.Error()), 73 - ) 74 - continue 75 - } 76 - s.log.Warn("read-through job failed; scheduled retry", 77 - slog.String("document_id", job.DocumentID), 78 - slog.Int("attempt", job.Attempts+1), 79 - slog.Duration("retry_in", nextDelay), 80 - slog.String("error", err.Error()), 81 - ) 59 + result, err := s.processReadThroughJob(ctx, job) 60 + if err != nil { 61 + s.handleReadThroughFailure(ctx, job, err) 82 62 continue 83 63 } 84 - 85 64 if err := s.store.CompleteIndexingJob(ctx, job.DocumentID); err != nil { 86 65 s.log.Error("read-through complete failed", 87 66 slog.String("document_id", job.DocumentID), ··· 90 69 continue 91 70 } 92 71 93 - s.log.Debug("read-through job completed", slog.String("document_id", job.DocumentID)) 72 + s.appendIndexingAudit(ctx, store.IndexingAuditInput{ 73 + Source: job.Source, 74 + DocumentID: job.DocumentID, 75 + Collection: job.Collection, 76 + CID: job.CID, 77 + Decision: result.Decision, 78 + Attempt: job.Attempts, 79 + }) 94 80 mu.Lock() 95 81 processedTick++ 96 82 mu.Unlock() ··· 110 96 *processedTick = 0 111 97 mu.Unlock() 112 98 113 - pending, err := s.store.CountPendingIndexingJobs(ctx) 99 + stats, err := s.store.GetIndexingJobStats(ctx) 114 100 if err != nil { 115 - s.log.Warn("read-through status: count failed", slog.String("error", err.Error())) 101 + s.log.Warn("read-through status: stats failed", slog.String("error", err.Error())) 116 102 continue 117 103 } 118 104 s.log.Info("read-through indexer status", 119 105 slog.Int64("jobs_processed", n), 120 - slog.Int64("jobs_pending", pending), 106 + slog.Int64("pending", stats.Pending), 107 + slog.Int64("processing", stats.Processing), 108 + slog.Int64("failed", stats.Failed), 109 + slog.Int64("dead_letter", stats.DeadLetter), 121 110 ) 122 111 } 123 112 } 124 113 } 125 114 126 - func (s *Server) processReadThroughJob(ctx context.Context, job *store.IndexingJob) error { 115 + func (s *Server) processReadThroughJob( 116 + ctx context.Context, job *store.IndexingJob, 117 + ) (*idx.Result, error) { 118 + doc, err := s.store.GetDocument(ctx, job.DocumentID) 119 + if err != nil { 120 + return nil, fmt.Errorf("get document: %w", err) 121 + } 122 + if doc != nil && doc.CID == job.CID && doc.DeletedAt == "" { 123 + return &idx.Result{ 124 + Decision: "skip_already_indexed", 125 + DocumentID: job.DocumentID, 126 + Collection: job.Collection, 127 + CID: job.CID, 128 + }, nil 129 + } 130 + 127 131 record := map[string]any{} 128 132 if err := json.Unmarshal([]byte(job.RecordJSON), &record); err != nil { 129 - return fmt.Errorf("decode record json: %w", err) 133 + return nil, &idx.PermanentError{Decision: "decode_record_json", Err: err} 130 134 } 131 135 132 136 event := normalize.TapRecordEvent{ ··· 140 144 Record: record, 141 145 }, 142 146 } 143 - 144 - if handler, ok := s.registry.StateHandler(job.Collection); ok { 145 - update, err := handler.HandleState(event) 146 - if err != nil { 147 - return fmt.Errorf("state normalize: %w", err) 148 - } 149 - if err := s.store.UpdateRecordState(ctx, update.SubjectURI, update.State); err != nil { 150 - return fmt.Errorf("update state: %w", err) 151 - } 152 - return nil 153 - } 147 + return s.processor.ProcessRecord(ctx, job.Source, event) 148 + } 154 149 155 - adapter, ok := s.registry.Adapter(job.Collection) 156 - if !ok { 157 - return nil 150 + func (s *Server) handleReadThroughFailure( 151 + ctx context.Context, job *store.IndexingJob, err error, 152 + ) { 153 + if perr, ok := idx.IsPermanent(err); ok { 154 + _ = s.store.FailIndexingJob(ctx, job.DocumentID, store.IndexingJobDeadLetter, truncateErr(perr)) 155 + s.appendIndexingAudit(ctx, store.IndexingAuditInput{ 156 + Source: job.Source, 157 + DocumentID: job.DocumentID, 158 + Collection: job.Collection, 159 + CID: job.CID, 160 + Decision: perr.Decision, 161 + Attempt: job.Attempts + 1, 162 + Error: perr.Error(), 163 + }) 164 + return 158 165 } 159 166 160 - doc, err := adapter.Normalize(event) 161 - if err != nil { 162 - return fmt.Errorf("normalize record: %w", err) 167 + if job.Attempts+1 >= s.cfg.ReadThroughMaxAttempts { 168 + _ = s.store.FailIndexingJob(ctx, job.DocumentID, store.IndexingJobDeadLetter, truncateErr(err)) 169 + s.appendIndexingAudit(ctx, store.IndexingAuditInput{ 170 + Source: job.Source, 171 + DocumentID: job.DocumentID, 172 + Collection: job.Collection, 173 + CID: job.CID, 174 + Decision: "dead_letter", 175 + Attempt: job.Attempts + 1, 176 + Error: err.Error(), 177 + }) 178 + return 163 179 } 164 180 165 - handle, err := s.store.GetIdentityHandle(ctx, job.DID) 166 - if err != nil { 167 - return fmt.Errorf("lookup identity handle: %w", err) 181 + nextDelay := retryDelay(job.Attempts + 1) 182 + nextAt := time.Now().UTC().Add(nextDelay).Format(time.RFC3339) 183 + if retryErr := s.store.RetryIndexingJob(ctx, job.DocumentID, nextAt, truncateErr(err)); retryErr != nil { 184 + s.log.Error("read-through retry update failed", 185 + slog.String("document_id", job.DocumentID), 186 + slog.String("error", retryErr.Error()), 187 + ) 188 + return 168 189 } 169 - if handle != "" { 170 - doc.AuthorHandle = handle 171 - if doc.RecordType == "profile" { 172 - doc.Title = handle 173 - } 174 - } 175 - 176 - s.enrichDocument(ctx, doc, record) 190 + s.appendIndexingAudit(ctx, store.IndexingAuditInput{ 191 + Source: job.Source, 192 + DocumentID: job.DocumentID, 193 + Collection: job.Collection, 194 + CID: job.CID, 195 + Decision: "retry_scheduled", 196 + Attempt: job.Attempts + 1, 197 + Error: err.Error(), 198 + }) 199 + } 177 200 178 - if err := s.store.UpsertDocument(ctx, doc); err != nil { 179 - return fmt.Errorf("upsert document: %w", err) 180 - } 201 + func (s *Server) enqueueXRPCRecord(ctx context.Context, uri, cid string, value map[string]any) { 202 + s.enqueueRecordForIndexing(ctx, store.IndexSourceReadThrough, uri, cid, value) 203 + } 181 204 182 - return nil 205 + func (s *Server) enqueueXRPCList(context.Context, []xrpc.ListRecordEntry) { 183 206 } 184 207 185 - // enrichDocument fills RepoName, AuthorHandle, and WebURL via XRPC when possible. 186 - // Failures are logged but never block indexing. 187 - func (s *Server) enrichDocument(ctx context.Context, doc *store.Document, record map[string]any) { 188 - if s.xrpc == nil { 208 + func (s *Server) enqueueRecordForIndexing( 209 + ctx context.Context, source, uri, cid string, value map[string]any, 210 + ) { 211 + did, collection, rkey, err := normalize.ParseATURI(uri) 212 + if err != nil { 213 + s.appendIndexingAudit(ctx, store.IndexingAuditInput{ 214 + Source: source, 215 + DocumentID: uri, 216 + Collection: collection, 217 + CID: cid, 218 + Decision: "skip_invalid_uri", 219 + Error: err.Error(), 220 + }) 189 221 return 190 222 } 191 223 192 - if doc.RepoDID != "" && doc.RepoName == "" { 193 - repoURI := repoURIFromRecord(record) 194 - if repoURI != "" { 195 - _, _, repoRKey, err := normalize.ParseATURI(repoURI) 196 - if err == nil && repoRKey != "" { 197 - name, err := s.xrpc.ResolveRepoName(ctx, doc.RepoDID, repoRKey) 198 - if err == nil { 199 - doc.RepoName = name 200 - } else { 201 - s.log.Debug("read-through enrich: resolve repo name failed", 202 - slog.String("doc_id", doc.ID), 203 - slog.String("repo_did", doc.RepoDID), 204 - slog.String("error", err.Error()), 205 - ) 206 - } 207 - } 208 - } 224 + documentID := normalize.StableID(did, collection, rkey) 225 + if source == store.IndexSourceReadThrough && s.policy.ReadThroughMode() == idx.ReadThroughOff { 226 + s.appendIndexingAudit(ctx, store.IndexingAuditInput{ 227 + Source: source, DocumentID: documentID, Collection: collection, CID: cid, 228 + Decision: "skip_mode_off", 229 + }) 230 + return 209 231 } 210 - 211 - if doc.AuthorHandle == "" && doc.DID != "" { 212 - info, err := s.xrpc.ResolveIdentity(ctx, doc.DID) 213 - if err == nil && info.Handle != "" { 214 - doc.AuthorHandle = info.Handle 215 - if doc.RecordType == "profile" { 216 - doc.Title = info.Handle 217 - } 218 - } else if err != nil { 219 - s.log.Debug("read-through enrich: resolve author handle failed", 220 - slog.String("doc_id", doc.ID), 221 - slog.String("did", doc.DID), 222 - slog.String("error", err.Error()), 223 - ) 224 - } 232 + if !s.policy.Allows(source, collection) { 233 + s.appendIndexingAudit(ctx, store.IndexingAuditInput{ 234 + Source: source, DocumentID: documentID, Collection: collection, CID: cid, 235 + Decision: "skip_collection", 236 + }) 237 + return 225 238 } 226 - 227 - if doc.WebURL == "" { 228 - ownerHandle := doc.AuthorHandle 229 - if doc.RepoDID != "" && doc.RepoDID != doc.DID { 230 - if h, err := s.store.GetIdentityHandle(ctx, doc.RepoDID); err == nil && h != "" { 231 - ownerHandle = h 232 - } else if info, err := s.xrpc.ResolveIdentity(ctx, doc.RepoDID); err == nil && info.Handle != "" { 233 - ownerHandle = info.Handle 234 - } 239 + if source == store.IndexSourceReadThrough && s.policy.ReadThroughMode() == idx.ReadThroughMissing { 240 + if s.shouldSkipReadThrough(ctx, documentID, cid) { 241 + s.appendIndexingAudit(ctx, store.IndexingAuditInput{ 242 + Source: source, DocumentID: documentID, Collection: collection, CID: cid, 243 + Decision: "skip_already_indexed", 244 + }) 245 + return 235 246 } 236 - doc.WebURL = xrpc.BuildWebURL(ownerHandle, doc.RepoName, doc.RecordType, doc.RKey) 237 247 } 238 - } 239 248 240 - // repoURIFromRecord extracts the repo AT-URI from common record fields. 241 - // Issues store it in rec["repo"]; pulls store it in rec["target"]["repo"]. 242 - func repoURIFromRecord(record map[string]any) string { 243 - if uri, _ := record["repo"].(string); uri != "" { 244 - return uri 245 - } 246 - if target, _ := record["target"].(map[string]any); target != nil { 247 - if uri, _ := target["repo"].(string); uri != "" { 248 - return uri 249 - } 250 - } 251 - return "" 252 - } 253 - 254 - func (s *Server) enqueueXRPCRecord(ctx context.Context, uri, cid string, value map[string]any) { 255 - did, collection, rkey, err := normalize.ParseATURI(uri) 256 - if err != nil { 257 - s.log.Debug("read-through skip invalid at-uri", slog.String("uri", uri), slog.String("error", err.Error())) 258 - return 259 - } 260 249 payload, err := json.Marshal(value) 261 250 if err != nil { 262 - s.log.Debug("read-through skip unmarshalable record", slog.String("uri", uri), slog.String("error", err.Error())) 251 + s.appendIndexingAudit(ctx, store.IndexingAuditInput{ 252 + Source: source, DocumentID: documentID, Collection: collection, CID: cid, 253 + Decision: "skip_unmarshalable_record", Error: err.Error(), 254 + }) 263 255 return 264 256 } 265 257 input := store.IndexingJobInput{ 266 - DocumentID: normalize.StableID(did, collection, rkey), 258 + DocumentID: documentID, 267 259 DID: did, 268 260 Collection: collection, 269 261 RKey: rkey, 270 262 CID: cid, 271 263 RecordJSON: string(payload), 264 + Source: source, 272 265 } 273 266 if err := s.store.EnqueueIndexingJob(ctx, input); err != nil { 274 - s.log.Warn("enqueue read-through indexing job failed", 275 - slog.String("document_id", input.DocumentID), 267 + s.log.Warn("enqueue indexing job failed", 268 + slog.String("document_id", documentID), 276 269 slog.String("error", err.Error()), 277 270 ) 271 + return 278 272 } 273 + s.appendIndexingAudit(ctx, store.IndexingAuditInput{ 274 + Source: source, DocumentID: documentID, Collection: collection, CID: cid, 275 + Decision: "enqueued", 276 + }) 279 277 } 280 278 281 - func (s *Server) enqueueXRPCList(ctx context.Context, entries []xrpc.ListRecordEntry) { 282 - for _, e := range entries { 283 - s.enqueueXRPCRecord(ctx, e.URI, e.CID, e.Value) 279 + func (s *Server) shouldSkipReadThrough(ctx context.Context, documentID, cid string) bool { 280 + doc, err := s.store.GetDocument(ctx, documentID) 281 + if err == nil && doc != nil && doc.CID == cid && doc.DeletedAt == "" { 282 + return true 283 + } 284 + job, err := s.store.GetIndexingJob(ctx, documentID) 285 + if err != nil || job == nil { 286 + return false 287 + } 288 + return job.CID == cid 289 + } 290 + 291 + func (s *Server) syncStateEntry(ctx context.Context, entry xrpc.ListRecordEntry) { 292 + did, collection, rkey, err := normalize.ParseATURI(entry.URI) 293 + if err != nil { 294 + return 295 + } 296 + event := normalize.TapRecordEvent{ 297 + Type: "record", 298 + Record: &normalize.TapRecord{ 299 + DID: did, 300 + Collection: collection, 301 + RKey: rkey, 302 + Action: "create", 303 + CID: entry.CID, 304 + Record: entry.Value, 305 + }, 306 + } 307 + result, err := s.processor.ProcessRecord(ctx, store.IndexSourceReadThrough, event) 308 + if err != nil { 309 + s.handleReadThroughFailure(ctx, &store.IndexingJob{ 310 + DocumentID: normalize.StableID(did, collection, rkey), 311 + Collection: collection, 312 + CID: entry.CID, 313 + Source: store.IndexSourceReadThrough, 314 + }, err) 315 + return 316 + } 317 + if result != nil { 318 + s.appendIndexingAudit(ctx, store.IndexingAuditInput{ 319 + Source: store.IndexSourceReadThrough, 320 + DocumentID: result.DocumentID, 321 + Collection: result.Collection, 322 + CID: result.CID, 323 + Decision: result.Decision, 324 + }) 325 + } 326 + } 327 + 328 + func (s *Server) appendIndexingAudit(ctx context.Context, input store.IndexingAuditInput) { 329 + if err := s.store.AppendIndexingAudit(ctx, input); err != nil { 330 + s.log.Debug("append indexing audit failed", slog.String("error", err.Error())) 284 331 } 285 332 }
+156
packages/api/internal/api/readthrough_test.go
··· 1 + package api 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "errors" 8 + "io" 9 + "log/slog" 10 + "net/http" 11 + "net/http/httptest" 12 + "testing" 13 + 14 + "tangled.org/desertthunder.dev/twister/internal/config" 15 + idx "tangled.org/desertthunder.dev/twister/internal/index" 16 + "tangled.org/desertthunder.dev/twister/internal/normalize" 17 + "tangled.org/desertthunder.dev/twister/internal/store" 18 + "tangled.org/desertthunder.dev/twister/internal/xrpc" 19 + ) 20 + 21 + func newAPITestServer(st *apiTestStore, client *xrpc.Client) *Server { 22 + cfg := &config.Config{ 23 + IndexedCollections: "sh.tangled.*", 24 + ReadThroughCollections: "sh.tangled.*", 25 + ReadThroughMode: "missing", 26 + ReadThroughMaxAttempts: 5, 27 + } 28 + log := slog.New(slog.NewTextHandler(io.Discard, nil)) 29 + policy := idx.NewPolicy(cfg.IndexedCollections, cfg.ReadThroughCollections, cfg.ReadThroughMode) 30 + registry := normalize.NewRegistry() 31 + return &Server{ 32 + store: st, cfg: cfg, log: log, xrpc: client, registry: registry, 33 + policy: policy, processor: idx.NewProcessor(st, registry, client, policy, log), workerID: "api-test", 34 + } 35 + } 36 + 37 + func TestEnqueueXRPCRecordSkipsExistingDocument(t *testing.T) { 38 + st := newAPITestStore() 39 + st.docs["did:plc:alice|sh.tangled.repo|repo1"] = &store.Document{ 40 + ID: "did:plc:alice|sh.tangled.repo|repo1", CID: "cid-1", 41 + } 42 + srv := newAPITestServer(st, nil) 43 + 44 + srv.enqueueXRPCRecord(context.Background(), "at://did:plc:alice/sh.tangled.repo/repo1", "cid-1", map[string]any{"name": "repo1"}) 45 + 46 + if len(st.jobs) != 0 { 47 + t.Fatalf("expected no jobs, got %#v", st.jobs) 48 + } 49 + if len(st.audits) == 0 || st.audits[0].Decision != "skip_already_indexed" { 50 + t.Fatalf("unexpected audit rows: %#v", st.audits) 51 + } 52 + } 53 + 54 + func TestEnqueueXRPCRecordOnlyQueuesOncePerCID(t *testing.T) { 55 + st := newAPITestStore() 56 + srv := newAPITestServer(st, nil) 57 + uri := "at://did:plc:alice/sh.tangled.repo/repo1" 58 + 59 + srv.enqueueXRPCRecord(context.Background(), uri, "cid-1", map[string]any{"name": "repo1"}) 60 + srv.enqueueXRPCRecord(context.Background(), uri, "cid-1", map[string]any{"name": "repo1"}) 61 + 62 + if len(st.jobs) != 1 { 63 + t.Fatalf("expected one queued job, got %#v", st.jobs) 64 + } 65 + if st.jobs["did:plc:alice|sh.tangled.repo|repo1"].Status != store.IndexingJobPending { 66 + t.Fatalf("unexpected job state: %#v", st.jobs["did:plc:alice|sh.tangled.repo|repo1"]) 67 + } 68 + } 69 + 70 + func TestHandleActorFollowingDoesNotEnqueueBulkList(t *testing.T) { 71 + var upstream *httptest.Server 72 + upstream = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 73 + switch { 74 + case r.URL.Path == "/xrpc/com.atproto.identity.resolveHandle": 75 + _ = json.NewEncoder(w).Encode(map[string]string{"did": "did:plc:alice"}) 76 + case r.URL.Path == "/did%3Aplc%3Aalice" || r.URL.Path == "/did:plc:alice": 77 + _ = json.NewEncoder(w).Encode(map[string]any{ 78 + "id": "did:plc:alice", 79 + "alsoKnownAs": []string{"at://alice.tangled.org"}, 80 + "service": []map[string]string{{ 81 + "type": "AtprotoPersonalDataServer", "serviceEndpoint": upstream.URL, 82 + }}, 83 + }) 84 + case r.URL.Path == "/xrpc/com.atproto.repo.listRecords": 85 + _ = json.NewEncoder(w).Encode(map[string]any{ 86 + "records": []map[string]any{{ 87 + "uri": "at://did:plc:alice/sh.tangled.graph.follow/1", 88 + "cid": "cid-1", 89 + "value": map[string]any{"subject": "did:plc:bob"}, 90 + }}, 91 + }) 92 + default: 93 + http.NotFound(w, r) 94 + } 95 + })) 96 + defer upstream.Close() 97 + 98 + client := xrpc.NewClient( 99 + xrpc.WithHTTPClient(upstream.Client()), 100 + xrpc.WithIdentityService(upstream.URL), 101 + xrpc.WithPLCDirectory(upstream.URL), 102 + ) 103 + st := newAPITestStore() 104 + srv := newAPITestServer(st, client) 105 + mux := http.NewServeMux() 106 + mux.HandleFunc("GET /actors/{handle}/following", srv.handleActorFollowing) 107 + 108 + req := httptest.NewRequest(http.MethodGet, "/actors/alice.tangled.org/following", nil) 109 + rec := httptest.NewRecorder() 110 + mux.ServeHTTP(rec, req) 111 + 112 + if rec.Code != http.StatusOK { 113 + t.Fatalf("status: got %d body=%s", rec.Code, rec.Body.String()) 114 + } 115 + if len(st.jobs) != 0 { 116 + t.Fatalf("expected no queued jobs from list handler, got %#v", st.jobs) 117 + } 118 + } 119 + 120 + func TestDeadLetterJobVisibleThroughAdminEndpoints(t *testing.T) { 121 + st := newAPITestStore() 122 + job := &store.IndexingJob{ 123 + DocumentID: "did:plc:alice|sh.tangled.repo|repo1", 124 + Collection: "sh.tangled.repo", 125 + CID: "cid-1", 126 + Source: store.IndexSourceReadThrough, 127 + Status: store.IndexingJobProcessing, 128 + } 129 + st.jobs[job.DocumentID] = job 130 + srv := newAPITestServer(st, nil) 131 + 132 + srv.handleReadThroughFailure(context.Background(), job, &idx.PermanentError{ 133 + Decision: "normalize_failed", 134 + Err: errors.New("boom"), 135 + }) 136 + 137 + req := httptest.NewRequest(http.MethodGet, "/admin/indexing/jobs", nil) 138 + rec := httptest.NewRecorder() 139 + srv.handleAdminIndexingJobs(rec, req) 140 + if rec.Code != http.StatusOK { 141 + t.Fatalf("jobs status: got %d body=%s", rec.Code, rec.Body.String()) 142 + } 143 + if st.jobs[job.DocumentID].Status != store.IndexingJobDeadLetter { 144 + t.Fatalf("expected dead letter status, got %#v", st.jobs[job.DocumentID]) 145 + } 146 + 147 + req = httptest.NewRequest(http.MethodGet, "/admin/indexing/audit?document="+job.DocumentID, nil) 148 + rec = httptest.NewRecorder() 149 + srv.handleAdminIndexingAudit(rec, req) 150 + if rec.Code != http.StatusOK { 151 + t.Fatalf("audit status: got %d body=%s", rec.Code, rec.Body.String()) 152 + } 153 + if !bytes.Contains(rec.Body.Bytes(), []byte("normalize_failed")) { 154 + t.Fatalf("expected normalize_failed in audit body: %s", rec.Body.String()) 155 + } 156 + }
+3
packages/api/internal/api/router.go
··· 98 98 return 99 99 } 100 100 mux.HandleFunc("GET /admin/status", s.handleAdminStatus) 101 + mux.HandleFunc("GET /admin/indexing/jobs", s.handleAdminIndexingJobs) 102 + mux.HandleFunc("GET /admin/indexing/audit", s.handleAdminIndexingAudit) 103 + mux.HandleFunc("POST /admin/indexing/enqueue", s.handleAdminIndexingEnqueue) 101 104 mux.HandleFunc("POST /admin/reindex", s.handleAdminReindex) 102 105 } 103 106
+165
packages/api/internal/api/test_store_test.go
··· 1 + package api 2 + 3 + import ( 4 + "context" 5 + "sort" 6 + "time" 7 + 8 + "tangled.org/desertthunder.dev/twister/internal/store" 9 + ) 10 + 11 + type apiTestStore struct { 12 + docs map[string]*store.Document 13 + jobs map[string]*store.IndexingJob 14 + audits []*store.IndexingAuditEntry 15 + handles map[string]string 16 + } 17 + 18 + func newAPITestStore() *apiTestStore { 19 + return &apiTestStore{ 20 + docs: map[string]*store.Document{}, 21 + jobs: map[string]*store.IndexingJob{}, 22 + handles: map[string]string{}, 23 + } 24 + } 25 + 26 + func (s *apiTestStore) UpsertDocument(_ context.Context, doc *store.Document) error { 27 + clone := *doc 28 + s.docs[doc.ID] = &clone 29 + return nil 30 + } 31 + func (s *apiTestStore) GetDocument(_ context.Context, id string) (*store.Document, error) { 32 + return s.docs[id], nil 33 + } 34 + func (s *apiTestStore) MarkDeleted(_ context.Context, id string) error { 35 + if doc := s.docs[id]; doc != nil { 36 + doc.DeletedAt = time.Now().UTC().Format(time.RFC3339) 37 + } 38 + return nil 39 + } 40 + func (s *apiTestStore) ListDocuments(_ context.Context, _ store.DocumentFilter) ([]*store.Document, error) { 41 + return nil, nil 42 + } 43 + func (s *apiTestStore) OptimizeFTS(_ context.Context) error { return nil } 44 + func (s *apiTestStore) GetSyncState(_ context.Context, _ string) (*store.SyncState, error) { 45 + return nil, nil 46 + } 47 + func (s *apiTestStore) SetSyncState(_ context.Context, _, _ string) error { return nil } 48 + func (s *apiTestStore) UpdateRecordState(_ context.Context, _, _ string) error { return nil } 49 + func (s *apiTestStore) UpsertIdentityHandle(_ context.Context, did, handle string, _ bool, _ string) error { 50 + s.handles[did] = handle 51 + return nil 52 + } 53 + func (s *apiTestStore) GetIdentityHandle(_ context.Context, did string) (string, error) { 54 + return s.handles[did], nil 55 + } 56 + func (s *apiTestStore) GetIndexingJob(_ context.Context, documentID string) (*store.IndexingJob, error) { 57 + return s.jobs[documentID], nil 58 + } 59 + func (s *apiTestStore) EnqueueIndexingJob(_ context.Context, input store.IndexingJobInput) error { 60 + s.jobs[input.DocumentID] = &store.IndexingJob{ 61 + DocumentID: input.DocumentID, DID: input.DID, Collection: input.Collection, 62 + RKey: input.RKey, CID: input.CID, RecordJSON: input.RecordJSON, Source: input.Source, 63 + Status: store.IndexingJobPending, ScheduledAt: time.Now().UTC().Format(time.RFC3339), 64 + UpdatedAt: time.Now().UTC().Format(time.RFC3339), 65 + } 66 + return nil 67 + } 68 + func (s *apiTestStore) ClaimIndexingJob(context.Context, string, string) (*store.IndexingJob, error) { 69 + return nil, nil 70 + } 71 + func (s *apiTestStore) CompleteIndexingJob(_ context.Context, documentID string) error { 72 + if j := s.jobs[documentID]; j != nil { 73 + j.Status = store.IndexingJobCompleted 74 + } 75 + return nil 76 + } 77 + func (s *apiTestStore) RetryIndexingJob(_ context.Context, documentID, nextAt, lastError string) error { 78 + if j := s.jobs[documentID]; j != nil { 79 + j.Status, j.ScheduledAt, j.LastError = store.IndexingJobPending, nextAt, lastError 80 + j.Attempts++ 81 + } 82 + return nil 83 + } 84 + func (s *apiTestStore) FailIndexingJob(_ context.Context, documentID, status, lastError string) error { 85 + if j := s.jobs[documentID]; j != nil { 86 + j.Status, j.LastError = status, lastError 87 + j.Attempts++ 88 + } 89 + return nil 90 + } 91 + func (s *apiTestStore) ListIndexingJobs(_ context.Context, filter store.IndexingJobFilter) ([]*store.IndexingJob, error) { 92 + out := []*store.IndexingJob{} 93 + for _, job := range s.jobs { 94 + if filter.DocumentID != "" && job.DocumentID != filter.DocumentID { 95 + continue 96 + } 97 + if filter.Status != "" && job.Status != filter.Status { 98 + continue 99 + } 100 + if filter.Source != "" && job.Source != filter.Source { 101 + continue 102 + } 103 + clone := *job 104 + out = append(out, &clone) 105 + } 106 + sort.Slice(out, func(i, j int) bool { return out[i].DocumentID < out[j].DocumentID }) 107 + return out, nil 108 + } 109 + func (s *apiTestStore) GetIndexingJobStats(_ context.Context) (*store.IndexingJobStats, error) { 110 + stats := &store.IndexingJobStats{} 111 + for _, job := range s.jobs { 112 + switch job.Status { 113 + case store.IndexingJobPending: 114 + stats.Pending++ 115 + case store.IndexingJobProcessing: 116 + stats.Processing++ 117 + case store.IndexingJobCompleted: 118 + stats.Completed++ 119 + case store.IndexingJobFailed: 120 + stats.Failed++ 121 + case store.IndexingJobDeadLetter: 122 + stats.DeadLetter++ 123 + } 124 + } 125 + return stats, nil 126 + } 127 + func (s *apiTestStore) AppendIndexingAudit(_ context.Context, input store.IndexingAuditInput) error { 128 + s.audits = append(s.audits, &store.IndexingAuditEntry{ 129 + ID: int64(len(s.audits) + 1), Source: input.Source, DocumentID: input.DocumentID, 130 + Collection: input.Collection, CID: input.CID, Decision: input.Decision, 131 + Attempt: input.Attempt, Error: input.Error, CreatedAt: time.Now().UTC().Format(time.RFC3339), 132 + }) 133 + return nil 134 + } 135 + func (s *apiTestStore) ListIndexingAudit(_ context.Context, filter store.IndexingAuditFilter) ([]*store.IndexingAuditEntry, error) { 136 + out := []*store.IndexingAuditEntry{} 137 + for _, entry := range s.audits { 138 + if filter.DocumentID != "" && entry.DocumentID != filter.DocumentID { 139 + continue 140 + } 141 + if filter.Source != "" && entry.Source != filter.Source { 142 + continue 143 + } 144 + if filter.Decision != "" && entry.Decision != filter.Decision { 145 + continue 146 + } 147 + out = append(out, entry) 148 + } 149 + return out, nil 150 + } 151 + func (s *apiTestStore) GetFollowSubjects(context.Context, string) ([]string, error) { return nil, nil } 152 + func (s *apiTestStore) GetRepoCollaborators(context.Context, string) ([]string, error) { 153 + return nil, nil 154 + } 155 + func (s *apiTestStore) CountDocuments(_ context.Context) (int64, error) { 156 + return int64(len(s.docs)), nil 157 + } 158 + func (s *apiTestStore) CountPendingIndexingJobs(_ context.Context) (int64, error) { return 0, nil } 159 + func (s *apiTestStore) InsertJetstreamEvent(context.Context, *store.JetstreamEvent, int) error { 160 + return nil 161 + } 162 + func (s *apiTestStore) ListJetstreamEvents(context.Context, store.JetstreamEventFilter) ([]*store.JetstreamEvent, error) { 163 + return nil, nil 164 + } 165 + func (s *apiTestStore) Ping(context.Context) error { return nil }
+55 -49
packages/api/internal/config/config.go
··· 12 12 ) 13 13 14 14 type Config struct { 15 - TursoURL string 16 - TursoToken string 17 - TapURL string 18 - TapAuthPassword string 19 - IndexedCollections string 20 - SearchDefaultLimit int 21 - SearchMaxLimit int 22 - SearchDefaultMode string 23 - HTTPBindAddr string 24 - IndexerHealthAddr string 25 - LogLevel string 26 - LogFormat string 27 - EnableAdminEndpoints bool 28 - AdminAuthToken string 29 - EnableIngestEnrichment bool 30 - PLCDirectoryURL string 31 - IdentityServiceURL string 32 - XRPCTimeout time.Duration 33 - ConstellationURL string 34 - ConstellationUserAgent string 35 - ConstellationTimeout time.Duration 36 - ConstellationCacheTTL time.Duration 37 - OAuthClientID string 38 - OAuthRedirectURIs []string 39 - JetstreamURL string 15 + TursoURL string 16 + TursoToken string 17 + TapURL string 18 + TapAuthPassword string 19 + IndexedCollections string 20 + ReadThroughMode string 21 + ReadThroughCollections string 22 + ReadThroughMaxAttempts int 23 + SearchDefaultLimit int 24 + SearchMaxLimit int 25 + SearchDefaultMode string 26 + HTTPBindAddr string 27 + IndexerHealthAddr string 28 + LogLevel string 29 + LogFormat string 30 + EnableAdminEndpoints bool 31 + AdminAuthToken string 32 + EnableIngestEnrichment bool 33 + PLCDirectoryURL string 34 + IdentityServiceURL string 35 + XRPCTimeout time.Duration 36 + ConstellationURL string 37 + ConstellationUserAgent string 38 + ConstellationTimeout time.Duration 39 + ConstellationCacheTTL time.Duration 40 + OAuthClientID string 41 + OAuthRedirectURIs []string 42 + JetstreamURL string 40 43 JetstreamWantedCollections string 41 - ActivityMaxEvents int 42 - ActivityRewindDuration time.Duration 44 + ActivityMaxEvents int 45 + ActivityRewindDuration time.Duration 43 46 } 44 47 45 48 type LoadOptions struct { ··· 51 54 loadDotEnv() 52 55 53 56 cfg := &Config{ 54 - TursoURL: os.Getenv("TURSO_DATABASE_URL"), 55 - TursoToken: os.Getenv("TURSO_AUTH_TOKEN"), 56 - TapURL: os.Getenv("TAP_URL"), 57 - TapAuthPassword: os.Getenv("TAP_AUTH_PASSWORD"), 58 - IndexedCollections: os.Getenv("INDEXED_COLLECTIONS"), 59 - SearchDefaultMode: envOrDefault("SEARCH_DEFAULT_MODE", "keyword"), 60 - HTTPBindAddr: envOrDefault("HTTP_BIND_ADDR", ":8080"), 61 - IndexerHealthAddr: envOrDefault("INDEXER_HEALTH_ADDR", ":9090"), 62 - LogLevel: envOrDefault("LOG_LEVEL", "info"), 63 - LogFormat: envOrDefault("LOG_FORMAT", "json"), 64 - AdminAuthToken: os.Getenv("ADMIN_AUTH_TOKEN"), 65 - SearchDefaultLimit: envInt("SEARCH_DEFAULT_LIMIT", 20), 66 - SearchMaxLimit: envInt("SEARCH_MAX_LIMIT", 100), 67 - EnableAdminEndpoints: envBool("ENABLE_ADMIN_ENDPOINTS", false), 68 - EnableIngestEnrichment: envBool("ENABLE_INGEST_ENRICHMENT", true), 69 - PLCDirectoryURL: envOrDefault("PLC_DIRECTORY_URL", "https://plc.directory"), 70 - IdentityServiceURL: envOrDefault("IDENTITY_SERVICE_URL", "https://public.api.bsky.app"), 71 - XRPCTimeout: envDuration("XRPC_TIMEOUT", 15*time.Second), 72 - ConstellationURL: envOrDefault("CONSTELLATION_URL", "https://constellation.microcosm.blue"), 73 - ConstellationUserAgent: envOrDefault("CONSTELLATION_USER_AGENT", "twister/1.0 (https://tangled.org/desertthunder.dev/twisted; Owais <desertthunder.dev@gmail.com>)"), 74 - ConstellationTimeout: envDuration("CONSTELLATION_TIMEOUT", 10*time.Second), 75 - ConstellationCacheTTL: envDuration("CONSTELLATION_CACHE_TTL", 5*time.Minute), 57 + TursoURL: os.Getenv("TURSO_DATABASE_URL"), 58 + TursoToken: os.Getenv("TURSO_AUTH_TOKEN"), 59 + TapURL: os.Getenv("TAP_URL"), 60 + TapAuthPassword: os.Getenv("TAP_AUTH_PASSWORD"), 61 + IndexedCollections: os.Getenv("INDEXED_COLLECTIONS"), 62 + ReadThroughMode: envOrDefault("READ_THROUGH_MODE", "missing"), 63 + ReadThroughCollections: envOrDefault("READ_THROUGH_COLLECTIONS", os.Getenv("INDEXED_COLLECTIONS")), 64 + ReadThroughMaxAttempts: envInt("READ_THROUGH_MAX_ATTEMPTS", 5), 65 + SearchDefaultMode: envOrDefault("SEARCH_DEFAULT_MODE", "keyword"), 66 + HTTPBindAddr: envOrDefault("HTTP_BIND_ADDR", ":8080"), 67 + IndexerHealthAddr: envOrDefault("INDEXER_HEALTH_ADDR", ":9090"), 68 + LogLevel: envOrDefault("LOG_LEVEL", "info"), 69 + LogFormat: envOrDefault("LOG_FORMAT", "json"), 70 + AdminAuthToken: os.Getenv("ADMIN_AUTH_TOKEN"), 71 + SearchDefaultLimit: envInt("SEARCH_DEFAULT_LIMIT", 20), 72 + SearchMaxLimit: envInt("SEARCH_MAX_LIMIT", 100), 73 + EnableAdminEndpoints: envBool("ENABLE_ADMIN_ENDPOINTS", false), 74 + EnableIngestEnrichment: envBool("ENABLE_INGEST_ENRICHMENT", true), 75 + PLCDirectoryURL: envOrDefault("PLC_DIRECTORY_URL", "https://plc.directory"), 76 + IdentityServiceURL: envOrDefault("IDENTITY_SERVICE_URL", "https://public.api.bsky.app"), 77 + XRPCTimeout: envDuration("XRPC_TIMEOUT", 15*time.Second), 78 + ConstellationURL: envOrDefault("CONSTELLATION_URL", "https://constellation.microcosm.blue"), 79 + ConstellationUserAgent: envOrDefault("CONSTELLATION_USER_AGENT", "twister/1.0 (https://tangled.org/desertthunder.dev/twisted; Owais <desertthunder.dev@gmail.com>)"), 80 + ConstellationTimeout: envDuration("CONSTELLATION_TIMEOUT", 10*time.Second), 81 + ConstellationCacheTTL: envDuration("CONSTELLATION_CACHE_TTL", 5*time.Minute), 76 82 OAuthClientID: os.Getenv("OAUTH_CLIENT_ID"), 77 83 OAuthRedirectURIs: envSlice("OAUTH_REDIRECT_URIS", nil), 78 84 JetstreamURL: envOrDefault("JETSTREAM_URL", "wss://jetstream2.us-east.bsky.network/subscribe"),
+23
packages/api/internal/config/config_test.go
··· 57 57 t.Fatalf("TursoURL: got %q, want %q", cfg.TursoURL, wantURL) 58 58 } 59 59 } 60 + 61 + func TestLoadReadThroughDefaults(t *testing.T) { 62 + t.Setenv("TURSO_DATABASE_URL", "file:test.db") 63 + t.Setenv("TURSO_AUTH_TOKEN", "") 64 + t.Setenv("INDEXED_COLLECTIONS", "sh.tangled.repo,sh.tangled.repo.issue") 65 + t.Setenv("READ_THROUGH_MODE", "") 66 + t.Setenv("READ_THROUGH_COLLECTIONS", "") 67 + t.Setenv("READ_THROUGH_MAX_ATTEMPTS", "") 68 + 69 + cfg, err := Load(LoadOptions{}) 70 + if err != nil { 71 + t.Fatalf("load config: %v", err) 72 + } 73 + if cfg.ReadThroughMode != "missing" { 74 + t.Fatalf("ReadThroughMode: got %q", cfg.ReadThroughMode) 75 + } 76 + if cfg.ReadThroughCollections != "sh.tangled.repo,sh.tangled.repo.issue" { 77 + t.Fatalf("ReadThroughCollections: got %q", cfg.ReadThroughCollections) 78 + } 79 + if cfg.ReadThroughMaxAttempts != 5 { 80 + t.Fatalf("ReadThroughMaxAttempts: got %d", cfg.ReadThroughMaxAttempts) 81 + } 82 + }
+68
packages/api/internal/index/enrich.go
··· 1 + package index 2 + 3 + import ( 4 + "context" 5 + 6 + "tangled.org/desertthunder.dev/twister/internal/normalize" 7 + "tangled.org/desertthunder.dev/twister/internal/store" 8 + "tangled.org/desertthunder.dev/twister/internal/xrpc" 9 + ) 10 + 11 + func (p *Processor) enrichDocument( 12 + ctx context.Context, doc *store.Document, record map[string]any, 13 + ) error { 14 + handle, err := p.store.GetIdentityHandle(ctx, doc.DID) 15 + if err != nil { 16 + return err 17 + } 18 + if handle != "" { 19 + doc.AuthorHandle = handle 20 + if doc.RecordType == "profile" { 21 + doc.Title = handle 22 + } 23 + } 24 + if p.xrpc == nil { 25 + return nil 26 + } 27 + if doc.RepoDID != "" && doc.RepoName == "" { 28 + if uri := repoURIFromRecord(record); uri != "" { 29 + _, _, repoRKey, err := normalize.ParseATURI(uri) 30 + if err == nil && repoRKey != "" { 31 + if name, err := p.xrpc.ResolveRepoName(ctx, doc.RepoDID, repoRKey); err == nil { 32 + doc.RepoName = name 33 + } 34 + } 35 + } 36 + } 37 + if doc.AuthorHandle == "" && doc.DID != "" { 38 + if info, err := p.xrpc.ResolveIdentity(ctx, doc.DID); err == nil && info.Handle != "" { 39 + doc.AuthorHandle = info.Handle 40 + if doc.RecordType == "profile" { 41 + doc.Title = info.Handle 42 + } 43 + } 44 + } 45 + if doc.WebURL == "" { 46 + ownerHandle := doc.AuthorHandle 47 + if doc.RepoDID != "" && doc.RepoDID != doc.DID { 48 + if handle, err := p.store.GetIdentityHandle(ctx, doc.RepoDID); err == nil && handle != "" { 49 + ownerHandle = handle 50 + } else if info, err := p.xrpc.ResolveIdentity(ctx, doc.RepoDID); err == nil && info.Handle != "" { 51 + ownerHandle = info.Handle 52 + } 53 + } 54 + doc.WebURL = xrpc.BuildWebURL(ownerHandle, doc.RepoName, doc.RecordType, doc.RKey) 55 + } 56 + return nil 57 + } 58 + 59 + func repoURIFromRecord(record map[string]any) string { 60 + if uri, _ := record["repo"].(string); uri != "" { 61 + return uri 62 + } 63 + target, _ := record["target"].(map[string]any) 64 + if uri, _ := target["repo"].(string); uri != "" { 65 + return uri 66 + } 67 + return "" 68 + }
+24
packages/api/internal/index/errors.go
··· 1 + package index 2 + 3 + import "fmt" 4 + 5 + type PermanentError struct { 6 + Decision string 7 + Err error 8 + } 9 + 10 + func (e *PermanentError) Error() string { 11 + if e.Err == nil { 12 + return e.Decision 13 + } 14 + return fmt.Sprintf("%s: %v", e.Decision, e.Err) 15 + } 16 + 17 + func (e *PermanentError) Unwrap() error { 18 + return e.Err 19 + } 20 + 21 + func IsPermanent(err error) (*PermanentError, bool) { 22 + perr, ok := err.(*PermanentError) 23 + return perr, ok 24 + }
-1
packages/api/internal/index/index.go
··· 1 - package index
+85
packages/api/internal/index/policy.go
··· 1 + package index 2 + 3 + import "strings" 4 + 5 + const ( 6 + ReadThroughOff = "off" 7 + ReadThroughMissing = "missing" 8 + ReadThroughBroad = "broad" 9 + ) 10 + 11 + type Policy struct { 12 + indexed allowlist 13 + readThrough allowlist 14 + mode string 15 + } 16 + 17 + func NewPolicy(indexedCollections, readThroughCollections, mode string) Policy { 18 + if strings.TrimSpace(readThroughCollections) == "" { 19 + readThroughCollections = indexedCollections 20 + } 21 + return Policy{ 22 + indexed: parseAllowlist(indexedCollections), 23 + readThrough: parseAllowlist(readThroughCollections), 24 + mode: normalizeMode(mode), 25 + } 26 + } 27 + 28 + func (p Policy) Allows(source, collection string) bool { 29 + if source == "read_through" { 30 + return p.readThrough.match(collection) 31 + } 32 + return p.indexed.match(collection) 33 + } 34 + 35 + func (p Policy) ReadThroughMode() string { 36 + return p.mode 37 + } 38 + 39 + func normalizeMode(mode string) string { 40 + switch strings.TrimSpace(strings.ToLower(mode)) { 41 + case ReadThroughOff, ReadThroughBroad: 42 + return strings.TrimSpace(strings.ToLower(mode)) 43 + default: 44 + return ReadThroughMissing 45 + } 46 + } 47 + 48 + type allowlist struct { 49 + entries []string 50 + } 51 + 52 + func parseAllowlist(raw string) allowlist { 53 + if strings.TrimSpace(raw) == "" { 54 + return allowlist{} 55 + } 56 + parts := strings.FieldsFunc(raw, func(r rune) bool { 57 + return r == ',' || r == ' ' || r == '\n' || r == '\t' 58 + }) 59 + entries := make([]string, 0, len(parts)) 60 + for _, part := range parts { 61 + entry := strings.TrimSpace(part) 62 + if entry != "" { 63 + entries = append(entries, entry) 64 + } 65 + } 66 + return allowlist{entries: entries} 67 + } 68 + 69 + func (a allowlist) match(collection string) bool { 70 + if len(a.entries) == 0 { 71 + return true 72 + } 73 + for _, entry := range a.entries { 74 + if entry == collection { 75 + return true 76 + } 77 + if strings.HasSuffix(entry, "*") { 78 + prefix := strings.TrimSuffix(entry, "*") 79 + if strings.HasPrefix(collection, prefix) { 80 + return true 81 + } 82 + } 83 + } 84 + return false 85 + }
+101
packages/api/internal/index/processor.go
··· 1 + package index 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + 8 + "tangled.org/desertthunder.dev/twister/internal/normalize" 9 + "tangled.org/desertthunder.dev/twister/internal/store" 10 + "tangled.org/desertthunder.dev/twister/internal/xrpc" 11 + ) 12 + 13 + type Result struct { 14 + Decision string 15 + DocumentID string 16 + Collection string 17 + CID string 18 + } 19 + 20 + type Processor struct { 21 + store store.IndexingStore 22 + registry *normalize.Registry 23 + xrpc *xrpc.Client 24 + policy Policy 25 + log *slog.Logger 26 + } 27 + 28 + func NewProcessor( 29 + st store.IndexingStore, registry *normalize.Registry, xrpcClient *xrpc.Client, 30 + policy Policy, log *slog.Logger, 31 + ) *Processor { 32 + if log == nil { 33 + log = slog.Default() 34 + } 35 + return &Processor{store: st, registry: registry, xrpc: xrpcClient, policy: policy, log: log} 36 + } 37 + 38 + func (p *Processor) ProcessRecord( 39 + ctx context.Context, source string, event normalize.TapRecordEvent, 40 + ) (*Result, error) { 41 + if event.Record == nil { 42 + return &Result{Decision: "skip_missing_record"}, nil 43 + } 44 + record := event.Record 45 + result := &Result{ 46 + Decision: "skip_unknown", 47 + DocumentID: normalize.StableID(record.DID, record.Collection, record.RKey), 48 + Collection: record.Collection, 49 + CID: record.CID, 50 + } 51 + if !p.policy.Allows(source, record.Collection) { 52 + result.Decision = "skip_collection" 53 + return result, nil 54 + } 55 + if handler, ok := p.registry.StateHandler(record.Collection); ok { 56 + if record.Action == "delete" { 57 + result.Decision = "skip_state_delete" 58 + return result, nil 59 + } 60 + update, err := handler.HandleState(event) 61 + if err != nil { 62 + return result, &PermanentError{Decision: "invalid_state_record", Err: err} 63 + } 64 + if err := p.store.UpdateRecordState(ctx, update.SubjectURI, update.State); err != nil { 65 + return result, fmt.Errorf("update state: %w", err) 66 + } 67 + result.Decision = "state_updated" 68 + return result, nil 69 + } 70 + adapter, ok := p.registry.Adapter(record.Collection) 71 + if !ok { 72 + result.Decision = "skip_unsupported_collection" 73 + return result, nil 74 + } 75 + if record.Action == "delete" { 76 + if err := p.store.MarkDeleted(ctx, result.DocumentID); err != nil { 77 + return result, fmt.Errorf("mark deleted: %w", err) 78 + } 79 + result.Decision = "document_deleted" 80 + return result, nil 81 + } 82 + if record.Record == nil { 83 + return result, &PermanentError{Decision: "missing_record_payload"} 84 + } 85 + if !adapter.Searchable(record.Record) { 86 + result.Decision = "skip_unsearchable" 87 + return result, nil 88 + } 89 + doc, err := adapter.Normalize(event) 90 + if err != nil { 91 + return result, &PermanentError{Decision: "normalize_failed", Err: err} 92 + } 93 + if err := p.enrichDocument(ctx, doc, record.Record); err != nil { 94 + return result, err 95 + } 96 + if err := p.store.UpsertDocument(ctx, doc); err != nil { 97 + return result, fmt.Errorf("upsert document: %w", err) 98 + } 99 + result.Decision = "document_upserted" 100 + return result, nil 101 + }
+36 -189
packages/api/internal/ingest/ingest.go
··· 10 10 "sync" 11 11 "time" 12 12 13 + idx "tangled.org/desertthunder.dev/twister/internal/index" 13 14 "tangled.org/desertthunder.dev/twister/internal/normalize" 14 15 "tangled.org/desertthunder.dev/twister/internal/store" 15 16 "tangled.org/desertthunder.dev/twister/internal/xrpc" ··· 30 31 // Runner ingests Tap events into the store. 31 32 type Runner struct { 32 33 store store.Store 33 - registry *normalize.Registry 34 34 tap client 35 - xrpcClient *xrpc.Client 36 - allowlist allowlist 35 + registry *normalize.Registry 36 + policy idx.Policy 37 + processor *idx.Processor 37 38 consumerName string 38 39 log *slog.Logger 39 40 resumeCursor int64 ··· 47 48 if log == nil { 48 49 log = slog.Default() 49 50 } 51 + policy := idx.NewPolicy(indexedCollections, indexedCollections, idx.ReadThroughMissing) 50 52 return &Runner{ 51 53 store: st, 52 - registry: registry, 53 54 tap: tap, 54 - allowlist: parseAllowlist(indexedCollections), 55 + registry: registry, 56 + policy: policy, 57 + processor: idx.NewProcessor(st, registry, nil, policy, log), 55 58 consumerName: defaultConsumerName, 56 59 log: log, 57 60 } ··· 59 62 60 63 // SetXRPCClient enables ingest-time enrichment via XRPC lookups. 61 64 func (r *Runner) SetXRPCClient(c *xrpc.Client) { 62 - r.xrpcClient = c 65 + if c == nil { 66 + return 67 + } 68 + r.processor = idx.NewProcessor(r.store, r.registry, c, r.policy, r.log) 63 69 } 64 70 65 71 func (r *Runner) Run(ctx context.Context) error { ··· 184 190 } 185 191 186 192 func (r *Runner) processRecordEvent(ctx context.Context, event normalize.TapRecordEvent) error { 187 - if event.Record == nil { 188 - return r.advanceCursorAndAck(ctx, event.ID) 189 - } 190 - 191 193 record := event.Record 192 - if !r.allowlist.match(record.Collection) { 193 - return r.advanceCursorAndAck(ctx, event.ID) 194 - } 195 - 196 - if handler, ok := r.registry.StateHandler(record.Collection); ok { 197 - if record.Action == "delete" { 198 - return r.advanceCursorAndAck(ctx, event.ID) 199 - } 200 - update, err := handler.HandleState(event) 201 - if err != nil { 202 - r.log.Warn("state normalization failed", 203 - slog.Int64("event_id", event.ID), 204 - slog.String("collection", record.Collection), 205 - slog.String("did", record.DID), 206 - slog.String("rkey", record.RKey), 207 - slog.String("error", err.Error()), 208 - ) 209 - return r.advanceCursorAndAck(ctx, event.ID) 210 - } 211 - if err := r.store.UpdateRecordState(ctx, update.SubjectURI, update.State); err != nil { 212 - return err 213 - } 214 - return r.advanceCursorAndAck(ctx, event.ID) 215 - } 216 - 217 - adapter, ok := r.registry.Adapter(record.Collection) 218 - if !ok { 219 - return r.advanceCursorAndAck(ctx, event.ID) 220 - } 221 - 222 - switch record.Action { 223 - case "delete": 224 - docID := normalize.StableID(record.DID, record.Collection, record.RKey) 225 - if err := r.store.MarkDeleted(ctx, docID); err != nil { 226 - return err 227 - } 228 - return r.advanceCursorAndAck(ctx, event.ID) 229 - case "create", "update": 230 - if record.Record == nil { 231 - r.log.Warn("record payload missing", 232 - slog.Int64("event_id", event.ID), 233 - slog.String("collection", record.Collection), 234 - slog.String("did", record.DID), 235 - slog.String("rkey", record.RKey), 236 - ) 237 - return r.advanceCursorAndAck(ctx, event.ID) 238 - } 239 - default: 240 - return r.advanceCursorAndAck(ctx, event.ID) 241 - } 242 - 243 - doc, err := adapter.Normalize(event) 244 - if err != nil { 245 - r.log.Warn("normalization failed", 194 + result, err := r.processor.ProcessRecord(ctx, store.IndexSourceTap, event) 195 + if perr, ok := idx.IsPermanent(err); ok { 196 + r.log.Warn("tap processing skipped", 246 197 slog.Int64("event_id", event.ID), 247 198 slog.String("collection", record.Collection), 248 199 slog.String("did", record.DID), 249 200 slog.String("rkey", record.RKey), 250 - slog.String("error", err.Error()), 201 + slog.String("decision", perr.Decision), 202 + slog.String("error", perr.Error()), 251 203 ) 204 + _ = r.store.AppendIndexingAudit(ctx, store.IndexingAuditInput{ 205 + Source: store.IndexSourceTap, 206 + DocumentID: normalize.StableID(record.DID, record.Collection, record.RKey), 207 + Collection: record.Collection, 208 + CID: record.CID, 209 + Decision: perr.Decision, 210 + Error: perr.Error(), 211 + }) 252 212 return r.advanceCursorAndAck(ctx, event.ID) 253 213 } 254 - 255 - handle, err := r.store.GetIdentityHandle(ctx, record.DID) 256 214 if err != nil { 257 215 return err 258 216 } 259 - if handle != "" { 260 - doc.AuthorHandle = handle 261 - if doc.RecordType == "profile" { 262 - doc.Title = handle 263 - } 264 - } 265 - 266 - r.enrichDocument(ctx, doc) 267 - 268 - if err := r.store.UpsertDocument(ctx, doc); err != nil { 269 - return err 217 + if result != nil { 218 + _ = r.store.AppendIndexingAudit(ctx, store.IndexingAuditInput{ 219 + Source: store.IndexSourceTap, 220 + DocumentID: result.DocumentID, 221 + Collection: result.Collection, 222 + CID: result.CID, 223 + Decision: result.Decision, 224 + }) 270 225 } 271 - 272 226 return r.advanceCursorAndAck(ctx, event.ID) 273 227 } 274 228 275 229 func (r *Runner) advanceCursorAndAck(ctx context.Context, eventID int64) error { 276 230 cursor := fmt.Sprintf("%d", eventID) 277 - if err := r.tap.AckEvent(ctx, eventID); err != nil { 231 + if err := r.persistCursorWithRetry(ctx, cursor, eventID); err != nil { 278 232 return err 279 233 } 280 - if err := r.persistCursorWithRetry(ctx, cursor, eventID); err != nil { 234 + if err := r.tap.AckEvent(ctx, eventID); err != nil { 281 235 return err 282 236 } 283 237 r.markProcessed(cursor) ··· 295 249 } else { 296 250 attempt++ 297 251 backoff := retryBackoff(attempt) 298 - r.log.Error("cursor persist failed after ack", 252 + r.log.Error("cursor persist failed before ack", 299 253 slog.Int64("event_id", eventID), 300 254 slog.Int("attempt", attempt), 301 255 slog.Duration("retry_in", backoff), ··· 344 298 r.lastCursor = cursor 345 299 r.processedTick++ 346 300 r.statusMu.Unlock() 347 - } 348 - 349 - type allowlist struct { 350 - entries []string 351 - } 352 - 353 - func parseAllowlist(raw string) allowlist { 354 - if strings.TrimSpace(raw) == "" { 355 - return allowlist{entries: nil} 356 - } 357 - parts := strings.FieldsFunc(raw, func(r rune) bool { 358 - return r == ',' || r == ' ' || r == '\n' || r == '\t' 359 - }) 360 - entries := make([]string, 0, len(parts)) 361 - for _, part := range parts { 362 - entry := strings.TrimSpace(part) 363 - if entry == "" { 364 - continue 365 - } 366 - entries = append(entries, entry) 367 - } 368 - return allowlist{entries: entries} 369 - } 370 - 371 - func (a allowlist) match(collection string) bool { 372 - if len(a.entries) == 0 { 373 - return true 374 - } 375 - for _, entry := range a.entries { 376 - if entry == collection { 377 - return true 378 - } 379 - if strings.HasSuffix(entry, "*") { 380 - prefix := strings.TrimSuffix(entry, "*") 381 - if strings.HasPrefix(collection, prefix) { 382 - return true 383 - } 384 - } 385 - } 386 - return false 387 - } 388 - 389 - // enrichDocument fills RepoName, AuthorHandle, and WebURL via XRPC when possible. 390 - // Failures are logged but never block ingestion. 391 - func (r *Runner) enrichDocument(ctx context.Context, doc *store.Document) { 392 - if r.xrpcClient == nil { 393 - return 394 - } 395 - 396 - if doc.RepoDID != "" && doc.RepoName == "" { 397 - repoRKey := extractRepoRKey(doc.ATURI, doc.Collection) 398 - if repoRKey != "" { 399 - name, err := r.xrpcClient.ResolveRepoName(ctx, doc.RepoDID, repoRKey) 400 - if err != nil { 401 - r.log.Debug("enrich: resolve repo name failed", 402 - slog.String("doc_id", doc.ID), 403 - slog.String("repo_did", doc.RepoDID), 404 - slog.String("error", err.Error()), 405 - ) 406 - } else { 407 - doc.RepoName = name 408 - } 409 - } 410 - } 411 - 412 - if doc.AuthorHandle == "" && doc.DID != "" { 413 - info, err := r.xrpcClient.ResolveIdentity(ctx, doc.DID) 414 - if err != nil { 415 - r.log.Debug("enrich: resolve author handle failed", 416 - slog.String("doc_id", doc.ID), 417 - slog.String("did", doc.DID), 418 - slog.String("error", err.Error()), 419 - ) 420 - } else if info.Handle != "" { 421 - doc.AuthorHandle = info.Handle 422 - } 423 - } 424 - 425 - if doc.WebURL == "" { 426 - ownerHandle := doc.AuthorHandle 427 - if doc.RepoDID != "" && doc.RepoDID != doc.DID { 428 - repoOwnerHandle, err := r.store.GetIdentityHandle(ctx, doc.RepoDID) 429 - if err == nil && repoOwnerHandle != "" { 430 - ownerHandle = repoOwnerHandle 431 - } else if r.xrpcClient != nil { 432 - info, err := r.xrpcClient.ResolveIdentity(ctx, doc.RepoDID) 433 - if err == nil && info.Handle != "" { 434 - ownerHandle = info.Handle 435 - } 436 - } 437 - } 438 - doc.WebURL = xrpc.BuildWebURL(ownerHandle, doc.RepoName, doc.RecordType, doc.RKey) 439 - } 440 - } 441 - 442 - // extractRepoRKey attempts to extract the repo rkey from the document context. 443 - // For repo-scoped collections like sh.tangled.repo.issue, the AT-URI is 444 - // at://did/collection/rkey but the repo is identified by RepoDID. We look 445 - // for a stored repo document, or try common rkey patterns. 446 - func extractRepoRKey(atURI, collection string) string { 447 - if collection == "sh.tangled.repo" { 448 - parts := strings.SplitN(atURI, "/", 5) 449 - if len(parts) >= 5 { 450 - return parts[4] 451 - } 452 - } 453 - return "" 454 301 } 455 302 456 303 func retryBackoff(attempt int) time.Duration {
+39 -10
packages/api/internal/ingest/ingest_test.go
··· 6 6 "log/slog" 7 7 "testing" 8 8 9 + idx "tangled.org/desertthunder.dev/twister/internal/index" 9 10 "tangled.org/desertthunder.dev/twister/internal/normalize" 10 11 "tangled.org/desertthunder.dev/twister/internal/store" 11 12 ) ··· 36 37 initialSync *store.SyncState 37 38 recordStates map[string]string 38 39 handles map[string]string 39 - onSetSync func() 40 + jobs map[string]*store.IndexingJob 41 + audits []store.IndexingAuditInput 42 + onSetSync func() 40 43 } 41 44 42 45 func newFakeStore() *fakeStore { ··· 44 47 docs: make(map[string]*store.Document), 45 48 deleted: make(map[string]bool), 46 49 recordStates: make(map[string]string), 47 - handles: make(map[string]string), 50 + handles: make(map[string]string), 51 + jobs: make(map[string]*store.IndexingJob), 48 52 } 49 53 } 50 54 ··· 100 104 return nil 101 105 } 102 106 103 - func (f *fakeStore) ClaimIndexingJob(_ context.Context) (*store.IndexingJob, error) { 107 + func (f *fakeStore) GetIndexingJob(_ context.Context, documentID string) (*store.IndexingJob, error) { 108 + return f.jobs[documentID], nil 109 + } 110 + 111 + func (f *fakeStore) ClaimIndexingJob(_ context.Context, _, _ string) (*store.IndexingJob, error) { 104 112 return nil, nil 105 113 } 106 114 ··· 110 118 111 119 func (f *fakeStore) RetryIndexingJob(_ context.Context, _, _, _ string) error { 112 120 return nil 121 + } 122 + 123 + func (f *fakeStore) FailIndexingJob(_ context.Context, _, _, _ string) error { 124 + return nil 125 + } 126 + 127 + func (f *fakeStore) ListIndexingJobs(_ context.Context, _ store.IndexingJobFilter) ([]*store.IndexingJob, error) { 128 + return nil, nil 129 + } 130 + 131 + func (f *fakeStore) GetIndexingJobStats(_ context.Context) (*store.IndexingJobStats, error) { 132 + return &store.IndexingJobStats{}, nil 133 + } 134 + 135 + func (f *fakeStore) AppendIndexingAudit(_ context.Context, input store.IndexingAuditInput) error { 136 + f.audits = append(f.audits, input) 137 + return nil 138 + } 139 + 140 + func (f *fakeStore) ListIndexingAudit(_ context.Context, _ store.IndexingAuditFilter) ([]*store.IndexingAuditEntry, error) { 141 + return nil, nil 113 142 } 114 143 115 144 func (f *fakeStore) GetFollowSubjects(_ context.Context, _ string) ([]string, error) { ··· 302 331 } 303 332 304 333 func TestAllowlistMatching(t *testing.T) { 305 - a := parseAllowlist("sh.tangled.repo, sh.tangled.string sh.tangled.actor.*") 306 - if !a.match("sh.tangled.repo") { 334 + policy := idx.NewPolicy("sh.tangled.repo, sh.tangled.string sh.tangled.actor.*", "", idx.ReadThroughMissing) 335 + if !policy.Allows(store.IndexSourceTap, "sh.tangled.repo") { 307 336 t.Fatal("expected exact match") 308 337 } 309 - if !a.match("sh.tangled.actor.profile") { 338 + if !policy.Allows(store.IndexSourceTap, "sh.tangled.actor.profile") { 310 339 t.Fatal("expected wildcard prefix match") 311 340 } 312 - if a.match("app.bsky.feed.post") { 341 + if policy.Allows(store.IndexSourceTap, "app.bsky.feed.post") { 313 342 t.Fatal("unexpected match") 314 343 } 315 344 } ··· 337 366 } 338 367 } 339 368 340 - func TestRunner_AckBeforeCursorPersist(t *testing.T) { 369 + func TestRunner_PersistCursorBeforeAck(t *testing.T) { 341 370 st := newFakeStore() 342 371 tap := &fakeTapClient{} 343 372 r := newRunnerForTest(st, tap, "sh.tangled.*") ··· 345 374 acked := false 346 375 tap.onAck = func(_ int64) { acked = true } 347 376 st.onSetSync = func() { 348 - if !acked { 349 - t.Fatalf("cursor persisted before ack") 377 + if acked { 378 + t.Fatalf("ack happened before cursor persisted") 350 379 } 351 380 } 352 381
+81
packages/api/internal/store/indexing_audit.go
··· 1 + package store 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "time" 7 + ) 8 + 9 + func (s *SQLStore) AppendIndexingAudit(ctx context.Context, input IndexingAuditInput) error { 10 + now := time.Now().UTC().Format(time.RFC3339) 11 + _, err := s.db.ExecContext(ctx, ` 12 + INSERT INTO indexing_audit ( 13 + source, document_id, collection, cid, decision, attempt, error, created_at 14 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, 15 + input.Source, input.DocumentID, input.Collection, input.CID, 16 + input.Decision, input.Attempt, nullableStr(input.Error), now, 17 + ) 18 + if err != nil { 19 + return fmt.Errorf("append indexing audit: %w", err) 20 + } 21 + return nil 22 + } 23 + 24 + func (s *SQLStore) ListIndexingAudit( 25 + ctx context.Context, filter IndexingAuditFilter, 26 + ) ([]*IndexingAuditEntry, error) { 27 + query := ` 28 + SELECT id, source, document_id, collection, cid, decision, 29 + attempt, COALESCE(error, ''), created_at 30 + FROM indexing_audit 31 + WHERE 1 = 1` 32 + args := []any{} 33 + 34 + if filter.DocumentID != "" { 35 + query += " AND document_id = ?" 36 + args = append(args, filter.DocumentID) 37 + } 38 + if filter.Source != "" { 39 + query += " AND source = ?" 40 + args = append(args, filter.Source) 41 + } 42 + if filter.Decision != "" { 43 + query += " AND decision = ?" 44 + args = append(args, filter.Decision) 45 + } 46 + 47 + query += " ORDER BY created_at DESC" 48 + limit := filter.Limit 49 + if limit <= 0 { 50 + limit = 50 51 + } 52 + query += " LIMIT ?" 53 + args = append(args, limit) 54 + if filter.Offset > 0 { 55 + query += " OFFSET ?" 56 + args = append(args, filter.Offset) 57 + } 58 + 59 + rows, err := s.db.QueryContext(ctx, query, args...) 60 + if err != nil { 61 + return nil, fmt.Errorf("list indexing audit: %w", err) 62 + } 63 + defer rows.Close() 64 + 65 + var entries []*IndexingAuditEntry 66 + for rows.Next() { 67 + entry := &IndexingAuditEntry{} 68 + if err := rows.Scan( 69 + &entry.ID, &entry.Source, &entry.DocumentID, &entry.Collection, 70 + &entry.CID, &entry.Decision, &entry.Attempt, &entry.Error, 71 + &entry.CreatedAt, 72 + ); err != nil { 73 + return nil, fmt.Errorf("scan indexing audit: %w", err) 74 + } 75 + entries = append(entries, entry) 76 + } 77 + if err := rows.Err(); err != nil { 78 + return nil, fmt.Errorf("iterate indexing audit: %w", err) 79 + } 80 + return entries, nil 81 + }
+284
packages/api/internal/store/indexing_jobs.go
··· 1 + package store 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "strings" 9 + "time" 10 + ) 11 + 12 + func (s *SQLStore) GetIndexingJob(ctx context.Context, documentID string) (*IndexingJob, error) { 13 + row := s.db.QueryRowContext(ctx, ` 14 + SELECT document_id, did, collection, rkey, cid, record_json, source, 15 + attempts, status, COALESCE(last_error, ''), scheduled_at, updated_at, 16 + COALESCE(lease_owner, ''), COALESCE(lease_expires_at, ''), 17 + COALESCE(completed_at, '') 18 + FROM indexing_jobs 19 + WHERE document_id = ?`, documentID) 20 + 21 + job, err := scanIndexingJob(row) 22 + if errors.Is(err, sql.ErrNoRows) { 23 + return nil, nil 24 + } 25 + if err != nil { 26 + return nil, fmt.Errorf("get indexing job: %w", err) 27 + } 28 + return job, nil 29 + } 30 + 31 + func (s *SQLStore) EnqueueIndexingJob(ctx context.Context, input IndexingJobInput) error { 32 + now := time.Now().UTC().Format(time.RFC3339) 33 + source := strings.TrimSpace(input.Source) 34 + if source == "" { 35 + source = IndexSourceReadThrough 36 + } 37 + 38 + _, err := s.db.ExecContext(ctx, ` 39 + INSERT INTO indexing_jobs ( 40 + document_id, did, collection, rkey, cid, record_json, source, 41 + status, attempts, last_error, scheduled_at, updated_at, 42 + lease_owner, lease_expires_at, completed_at 43 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0, NULL, ?, ?, '', '', '') 44 + ON CONFLICT(document_id) DO UPDATE SET 45 + did = excluded.did, 46 + collection = excluded.collection, 47 + rkey = excluded.rkey, 48 + cid = excluded.cid, 49 + record_json = excluded.record_json, 50 + source = excluded.source, 51 + status = excluded.status, 52 + last_error = NULL, 53 + scheduled_at = excluded.scheduled_at, 54 + updated_at = excluded.updated_at, 55 + lease_owner = '', 56 + lease_expires_at = '', 57 + completed_at = ''`, 58 + input.DocumentID, input.DID, input.Collection, input.RKey, input.CID, 59 + input.RecordJSON, source, IndexingJobPending, now, now, 60 + ) 61 + if err != nil { 62 + return fmt.Errorf("enqueue indexing job: %w", err) 63 + } 64 + return nil 65 + } 66 + 67 + func (s *SQLStore) ClaimIndexingJob( 68 + ctx context.Context, workerID string, leaseUntil string, 69 + ) (*IndexingJob, error) { 70 + now := time.Now().UTC().Format(time.RFC3339) 71 + conn, err := s.db.Conn(ctx) 72 + if err != nil { 73 + return nil, fmt.Errorf("claim indexing job conn: %w", err) 74 + } 75 + defer conn.Close() 76 + 77 + if _, err := conn.ExecContext(ctx, `BEGIN IMMEDIATE`); err != nil { 78 + return nil, fmt.Errorf("begin immediate claim tx: %w", err) 79 + } 80 + committed := false 81 + defer func() { 82 + if !committed { 83 + _, _ = conn.ExecContext(context.Background(), `ROLLBACK`) 84 + } 85 + }() 86 + 87 + var documentID string 88 + err = conn.QueryRowContext(ctx, ` 89 + SELECT document_id 90 + FROM indexing_jobs 91 + WHERE (status = ? AND datetime(scheduled_at) <= datetime(?)) 92 + OR ( 93 + status = ? 94 + AND lease_expires_at != '' 95 + AND datetime(lease_expires_at) <= datetime(?) 96 + ) 97 + ORDER BY scheduled_at ASC, updated_at ASC 98 + LIMIT 1`, 99 + IndexingJobPending, now, IndexingJobProcessing, now, 100 + ).Scan(&documentID) 101 + if errors.Is(err, sql.ErrNoRows) { 102 + if _, err := conn.ExecContext(ctx, `COMMIT`); err != nil { 103 + return nil, fmt.Errorf("commit empty claim tx: %w", err) 104 + } 105 + committed = true 106 + return nil, nil 107 + } 108 + if err != nil { 109 + return nil, fmt.Errorf("select claim candidate: %w", err) 110 + } 111 + 112 + row := conn.QueryRowContext(ctx, ` 113 + UPDATE indexing_jobs 114 + SET status = ?, updated_at = ?, lease_owner = ?, lease_expires_at = ? 115 + WHERE document_id = ? 116 + RETURNING document_id, did, collection, rkey, cid, record_json, source, 117 + attempts, status, COALESCE(last_error, ''), scheduled_at, 118 + updated_at, COALESCE(lease_owner, ''), 119 + COALESCE(lease_expires_at, ''), COALESCE(completed_at, '')`, 120 + IndexingJobProcessing, now, workerID, leaseUntil, documentID, 121 + ) 122 + 123 + job, err := scanIndexingJob(row) 124 + if err != nil { 125 + return nil, fmt.Errorf("update claim candidate: %w", err) 126 + } 127 + if _, err := conn.ExecContext(ctx, `COMMIT`); err != nil { 128 + return nil, fmt.Errorf("commit claim tx: %w", err) 129 + } 130 + committed = true 131 + return job, nil 132 + } 133 + 134 + func (s *SQLStore) CompleteIndexingJob(ctx context.Context, documentID string) error { 135 + now := time.Now().UTC().Format(time.RFC3339) 136 + _, err := s.db.ExecContext(ctx, ` 137 + UPDATE indexing_jobs 138 + SET status = ?, updated_at = ?, completed_at = ?, 139 + lease_owner = '', lease_expires_at = '', last_error = NULL 140 + WHERE document_id = ?`, 141 + IndexingJobCompleted, now, now, documentID, 142 + ) 143 + if err != nil { 144 + return fmt.Errorf("complete indexing job: %w", err) 145 + } 146 + return nil 147 + } 148 + 149 + func (s *SQLStore) RetryIndexingJob( 150 + ctx context.Context, documentID string, nextScheduledAt string, lastError string, 151 + ) error { 152 + now := time.Now().UTC().Format(time.RFC3339) 153 + _, err := s.db.ExecContext(ctx, ` 154 + UPDATE indexing_jobs 155 + SET status = ?, attempts = attempts + 1, last_error = ?, scheduled_at = ?, 156 + updated_at = ?, lease_owner = '', lease_expires_at = '' 157 + WHERE document_id = ?`, 158 + IndexingJobPending, lastError, nextScheduledAt, now, documentID, 159 + ) 160 + if err != nil { 161 + return fmt.Errorf("retry indexing job: %w", err) 162 + } 163 + return nil 164 + } 165 + 166 + func (s *SQLStore) FailIndexingJob( 167 + ctx context.Context, documentID string, status string, lastError string, 168 + ) error { 169 + now := time.Now().UTC().Format(time.RFC3339) 170 + _, err := s.db.ExecContext(ctx, ` 171 + UPDATE indexing_jobs 172 + SET status = ?, attempts = attempts + 1, last_error = ?, updated_at = ?, 173 + lease_owner = '', lease_expires_at = '' 174 + WHERE document_id = ?`, 175 + status, lastError, now, documentID, 176 + ) 177 + if err != nil { 178 + return fmt.Errorf("fail indexing job: %w", err) 179 + } 180 + return nil 181 + } 182 + 183 + func (s *SQLStore) ListIndexingJobs( 184 + ctx context.Context, filter IndexingJobFilter, 185 + ) ([]*IndexingJob, error) { 186 + query := ` 187 + SELECT document_id, did, collection, rkey, cid, record_json, source, 188 + attempts, status, COALESCE(last_error, ''), scheduled_at, updated_at, 189 + COALESCE(lease_owner, ''), COALESCE(lease_expires_at, ''), 190 + COALESCE(completed_at, '') 191 + FROM indexing_jobs 192 + WHERE 1 = 1` 193 + args := []any{} 194 + 195 + if filter.DocumentID != "" { 196 + query += " AND document_id = ?" 197 + args = append(args, filter.DocumentID) 198 + } 199 + if filter.Status != "" { 200 + query += " AND status = ?" 201 + args = append(args, filter.Status) 202 + } 203 + if filter.Source != "" { 204 + query += " AND source = ?" 205 + args = append(args, filter.Source) 206 + } 207 + 208 + query += " ORDER BY updated_at DESC" 209 + limit := filter.Limit 210 + if limit <= 0 { 211 + limit = 50 212 + } 213 + query += " LIMIT ?" 214 + args = append(args, limit) 215 + if filter.Offset > 0 { 216 + query += " OFFSET ?" 217 + args = append(args, filter.Offset) 218 + } 219 + 220 + rows, err := s.db.QueryContext(ctx, query, args...) 221 + if err != nil { 222 + return nil, fmt.Errorf("list indexing jobs: %w", err) 223 + } 224 + defer rows.Close() 225 + 226 + var jobs []*IndexingJob 227 + for rows.Next() { 228 + job, err := scanIndexingJob(rows) 229 + if err != nil { 230 + return nil, fmt.Errorf("scan indexing job: %w", err) 231 + } 232 + jobs = append(jobs, job) 233 + } 234 + if err := rows.Err(); err != nil { 235 + return nil, fmt.Errorf("iterate indexing jobs: %w", err) 236 + } 237 + return jobs, nil 238 + } 239 + 240 + func (s *SQLStore) GetIndexingJobStats(ctx context.Context) (*IndexingJobStats, error) { 241 + row := s.db.QueryRowContext(ctx, ` 242 + SELECT 243 + COUNT(*) FILTER (WHERE status = ?), 244 + COUNT(*) FILTER (WHERE status = ?), 245 + COUNT(*) FILTER (WHERE status = ?), 246 + COUNT(*) FILTER (WHERE status = ?), 247 + COUNT(*) FILTER (WHERE status = ?), 248 + COALESCE(MIN(CASE WHEN status = ? THEN scheduled_at END), ''), 249 + COALESCE(MIN(CASE WHEN status = ? THEN updated_at END), ''), 250 + COALESCE(MAX(completed_at), ''), 251 + COALESCE(MAX(updated_at), '') 252 + FROM indexing_jobs`, 253 + IndexingJobPending, IndexingJobProcessing, IndexingJobCompleted, 254 + IndexingJobFailed, IndexingJobDeadLetter, IndexingJobPending, 255 + IndexingJobProcessing, 256 + ) 257 + 258 + stats := &IndexingJobStats{} 259 + err := row.Scan( 260 + &stats.Pending, &stats.Processing, &stats.Completed, &stats.Failed, 261 + &stats.DeadLetter, &stats.OldestPendingAt, &stats.OldestRunningAt, 262 + &stats.LastCompletedAt, &stats.LastProcessedAt, 263 + ) 264 + if err != nil { 265 + return nil, fmt.Errorf("get indexing job stats: %w", err) 266 + } 267 + return stats, nil 268 + } 269 + 270 + func scanIndexingJob(scanner interface { 271 + Scan(dest ...any) error 272 + }) (*IndexingJob, error) { 273 + job := &IndexingJob{} 274 + err := scanner.Scan( 275 + &job.DocumentID, &job.DID, &job.Collection, &job.RKey, &job.CID, 276 + &job.RecordJSON, &job.Source, &job.Attempts, &job.Status, 277 + &job.LastError, &job.ScheduledAt, &job.UpdatedAt, &job.LeaseOwner, 278 + &job.LeaseUntil, &job.CompletedAt, 279 + ) 280 + if err != nil { 281 + return nil, err 282 + } 283 + return job, nil 284 + }
+74
packages/api/internal/store/indexing_types.go
··· 1 + package store 2 + 3 + import "context" 4 + 5 + const ( 6 + IndexSourceTap = "tap" 7 + IndexSourceReadThrough = "read_through" 8 + IndexSourceBackfill = "backfill" 9 + IndexSourceAdmin = "admin" 10 + 11 + IndexingJobPending = "pending" 12 + IndexingJobProcessing = "processing" 13 + IndexingJobCompleted = "completed" 14 + IndexingJobFailed = "failed" 15 + IndexingJobDeadLetter = "dead_letter" 16 + ) 17 + 18 + type IndexingJobFilter struct { 19 + Status string 20 + Source string 21 + DocumentID string 22 + Limit int 23 + Offset int 24 + } 25 + 26 + type IndexingAuditFilter struct { 27 + Source string 28 + Decision string 29 + DocumentID string 30 + Limit int 31 + Offset int 32 + } 33 + 34 + type IndexingJobStats struct { 35 + Pending int64 36 + Processing int64 37 + Completed int64 38 + Failed int64 39 + DeadLetter int64 40 + OldestPendingAt string 41 + OldestRunningAt string 42 + LastCompletedAt string 43 + LastProcessedAt string 44 + } 45 + 46 + type IndexingAuditEntry struct { 47 + ID int64 48 + Source string 49 + DocumentID string 50 + Collection string 51 + CID string 52 + Decision string 53 + Attempt int 54 + Error string 55 + CreatedAt string 56 + } 57 + 58 + type IndexingAuditInput struct { 59 + Source string 60 + DocumentID string 61 + Collection string 62 + CID string 63 + Decision string 64 + Attempt int 65 + Error string 66 + } 67 + 68 + type IndexingStore interface { 69 + GetDocument(ctx context.Context, id string) (*Document, error) 70 + GetIdentityHandle(ctx context.Context, did string) (string, error) 71 + MarkDeleted(ctx context.Context, id string) error 72 + UpdateRecordState(ctx context.Context, subjectURI string, state string) error 73 + UpsertDocument(ctx context.Context, doc *Document) error 74 + }
+32
packages/api/internal/store/migrations/008_indexing_observability.sql
··· 1 + ALTER TABLE indexing_jobs 2 + ADD COLUMN source TEXT NOT NULL DEFAULT 'read_through'; 3 + 4 + ALTER TABLE indexing_jobs 5 + ADD COLUMN lease_owner TEXT DEFAULT ''; 6 + 7 + ALTER TABLE indexing_jobs 8 + ADD COLUMN lease_expires_at TEXT DEFAULT ''; 9 + 10 + ALTER TABLE indexing_jobs 11 + ADD COLUMN completed_at TEXT DEFAULT ''; 12 + 13 + CREATE INDEX IF NOT EXISTS idx_indexing_jobs_claim 14 + ON indexing_jobs(status, scheduled_at, lease_expires_at, updated_at); 15 + 16 + CREATE TABLE IF NOT EXISTS indexing_audit ( 17 + id INTEGER PRIMARY KEY AUTOINCREMENT, 18 + source TEXT NOT NULL, 19 + document_id TEXT NOT NULL, 20 + collection TEXT NOT NULL, 21 + cid TEXT NOT NULL, 22 + decision TEXT NOT NULL, 23 + attempt INTEGER NOT NULL DEFAULT 0, 24 + error TEXT, 25 + created_at TEXT NOT NULL 26 + ); 27 + 28 + CREATE INDEX IF NOT EXISTS idx_indexing_audit_created 29 + ON indexing_audit(created_at DESC); 30 + 31 + CREATE INDEX IF NOT EXISTS idx_indexing_audit_document 32 + ON indexing_audit(document_id, created_at DESC);
-116
packages/api/internal/store/sql_store.go
··· 253 253 return handle.String, nil 254 254 } 255 255 256 - func (s *SQLStore) EnqueueIndexingJob(ctx context.Context, input IndexingJobInput) error { 257 - now := time.Now().UTC().Format(time.RFC3339) 258 - _, err := s.db.ExecContext(ctx, ` 259 - INSERT INTO indexing_jobs ( 260 - document_id, did, collection, rkey, cid, record_json, 261 - status, attempts, last_error, scheduled_at, updated_at 262 - ) VALUES (?, ?, ?, ?, ?, ?, 'pending', 0, NULL, ?, ?) 263 - ON CONFLICT(document_id) DO UPDATE SET 264 - did = excluded.did, 265 - collection = excluded.collection, 266 - rkey = excluded.rkey, 267 - cid = excluded.cid, 268 - record_json = excluded.record_json, 269 - status = 'pending', 270 - last_error = NULL, 271 - scheduled_at = excluded.scheduled_at, 272 - updated_at = excluded.updated_at`, 273 - input.DocumentID, input.DID, input.Collection, input.RKey, input.CID, input.RecordJSON, now, now, 274 - ) 275 - if err != nil { 276 - return fmt.Errorf("enqueue indexing job: %w", err) 277 - } 278 - return nil 279 - } 280 - 281 - func (s *SQLStore) ClaimIndexingJob(ctx context.Context) (*IndexingJob, error) { 282 - now := time.Now().UTC().Format(time.RFC3339) 283 - staleCutoff := time.Now().UTC().Add(-5 * time.Minute).Format(time.RFC3339) 284 - 285 - tx, err := s.db.BeginTx(ctx, nil) 286 - if err != nil { 287 - return nil, fmt.Errorf("begin claim indexing job tx: %w", err) 288 - } 289 - defer tx.Rollback() 290 - 291 - var job IndexingJob 292 - row := tx.QueryRowContext(ctx, ` 293 - SELECT document_id, did, collection, rkey, cid, record_json, 294 - attempts, status, COALESCE(last_error, ''), scheduled_at, updated_at 295 - FROM indexing_jobs 296 - WHERE (status = 'pending' AND scheduled_at <= ?) 297 - OR (status = 'processing' AND updated_at <= ?) 298 - ORDER BY scheduled_at ASC, updated_at ASC 299 - LIMIT 1`, now, staleCutoff) 300 - 301 - err = row.Scan( 302 - &job.DocumentID, 303 - &job.DID, 304 - &job.Collection, 305 - &job.RKey, 306 - &job.CID, 307 - &job.RecordJSON, 308 - &job.Attempts, 309 - &job.Status, 310 - &job.LastError, 311 - &job.ScheduledAt, 312 - &job.UpdatedAt, 313 - ) 314 - if errors.Is(err, sql.ErrNoRows) { 315 - return nil, nil 316 - } 317 - if err != nil { 318 - return nil, fmt.Errorf("select indexing job: %w", err) 319 - } 320 - 321 - res, err := tx.ExecContext(ctx, ` 322 - UPDATE indexing_jobs 323 - SET status = 'processing', updated_at = ? 324 - WHERE document_id = ?`, 325 - now, job.DocumentID, 326 - ) 327 - if err != nil { 328 - return nil, fmt.Errorf("mark indexing job processing: %w", err) 329 - } 330 - affected, err := res.RowsAffected() 331 - if err != nil { 332 - return nil, fmt.Errorf("rows affected claim indexing job: %w", err) 333 - } 334 - if affected == 0 { 335 - return nil, nil 336 - } 337 - 338 - if err := tx.Commit(); err != nil { 339 - return nil, fmt.Errorf("commit claim indexing job tx: %w", err) 340 - } 341 - job.Status = "processing" 342 - job.UpdatedAt = now 343 - return &job, nil 344 - } 345 - 346 - func (s *SQLStore) CompleteIndexingJob(ctx context.Context, documentID string) error { 347 - _, err := s.db.ExecContext(ctx, `DELETE FROM indexing_jobs WHERE document_id = ?`, documentID) 348 - if err != nil { 349 - return fmt.Errorf("complete indexing job: %w", err) 350 - } 351 - return nil 352 - } 353 - 354 - func (s *SQLStore) RetryIndexingJob(ctx context.Context, documentID string, nextScheduledAt string, lastError string) error { 355 - now := time.Now().UTC().Format(time.RFC3339) 356 - _, err := s.db.ExecContext(ctx, ` 357 - UPDATE indexing_jobs 358 - SET status = 'pending', 359 - attempts = attempts + 1, 360 - last_error = ?, 361 - scheduled_at = ?, 362 - updated_at = ? 363 - WHERE document_id = ?`, 364 - lastError, nextScheduledAt, now, documentID, 365 - ) 366 - if err != nil { 367 - return fmt.Errorf("retry indexing job: %w", err) 368 - } 369 - return nil 370 - } 371 - 372 256 func (s *SQLStore) GetFollowSubjects(ctx context.Context, did string) ([]string, error) { 373 257 rows, err := s.db.QueryContext(ctx, ` 374 258 SELECT DISTINCT repo_did
+12 -1
packages/api/internal/store/store.go
··· 49 49 RKey string 50 50 CID string 51 51 RecordJSON string 52 + Source string 52 53 Attempts int 53 54 Status string 54 55 LastError string 55 56 ScheduledAt string 56 57 UpdatedAt string 58 + LeaseOwner string 59 + LeaseUntil string 60 + CompletedAt string 57 61 } 58 62 59 63 // JetstreamEvent is a cached JetStream activity event. ··· 86 90 RKey string 87 91 CID string 88 92 RecordJSON string 93 + Source string 89 94 } 90 95 91 96 // DocumentFilter scopes a ListDocuments query to a subset of documents. ··· 110 115 UpdateRecordState(ctx context.Context, subjectURI string, state string) error 111 116 UpsertIdentityHandle(ctx context.Context, did, handle string, isActive bool, status string) error 112 117 GetIdentityHandle(ctx context.Context, did string) (string, error) 118 + GetIndexingJob(ctx context.Context, documentID string) (*IndexingJob, error) 113 119 EnqueueIndexingJob(ctx context.Context, input IndexingJobInput) error 114 - ClaimIndexingJob(ctx context.Context) (*IndexingJob, error) 120 + ClaimIndexingJob(ctx context.Context, workerID string, leaseUntil string) (*IndexingJob, error) 115 121 CompleteIndexingJob(ctx context.Context, documentID string) error 116 122 RetryIndexingJob(ctx context.Context, documentID string, nextScheduledAt string, lastError string) error 123 + FailIndexingJob(ctx context.Context, documentID string, status string, lastError string) error 124 + ListIndexingJobs(ctx context.Context, filter IndexingJobFilter) ([]*IndexingJob, error) 125 + GetIndexingJobStats(ctx context.Context) (*IndexingJobStats, error) 126 + AppendIndexingAudit(ctx context.Context, input IndexingAuditInput) error 127 + ListIndexingAudit(ctx context.Context, filter IndexingAuditFilter) ([]*IndexingAuditEntry, error) 117 128 GetFollowSubjects(ctx context.Context, did string) ([]string, error) 118 129 GetRepoCollaborators(ctx context.Context, repoOwnerDID string) ([]string, error) 119 130 CountDocuments(ctx context.Context) (int64, error)
+89 -4
packages/api/internal/store/store_test.go
··· 4 4 "context" 5 5 "os" 6 6 "path/filepath" 7 + "sync" 7 8 "testing" 9 + "time" 8 10 9 11 "tangled.org/desertthunder.dev/twister/internal/store" 10 12 ) ··· 383 385 RKey: "repo1", 384 386 CID: "cid-repo1", 385 387 RecordJSON: `{"name":"repo1"}`, 388 + Source: store.IndexSourceReadThrough, 386 389 } 387 390 if err := st.EnqueueIndexingJob(ctx, job); err != nil { 388 391 t.Fatalf("enqueue indexing job: %v", err) ··· 391 394 t.Fatalf("enqueue indexing job second call: %v", err) 392 395 } 393 396 394 - claimed, err := st.ClaimIndexingJob(ctx) 397 + claimed, err := st.ClaimIndexingJob(ctx, "worker-a", time.Now().Add(time.Minute).Format(time.RFC3339)) 395 398 if err != nil { 396 399 t.Fatalf("claim indexing job: %v", err) 397 400 } ··· 406 409 t.Fatalf("retry indexing job: %v", err) 407 410 } 408 411 409 - none, err := st.ClaimIndexingJob(ctx) 412 + none, err := st.ClaimIndexingJob(ctx, "worker-b", time.Now().Add(time.Minute).Format(time.RFC3339)) 410 413 if err != nil { 411 414 t.Fatalf("claim delayed indexing job: %v", err) 412 415 } ··· 418 421 t.Fatalf("retry indexing job now: %v", err) 419 422 } 420 423 421 - claimed, err = st.ClaimIndexingJob(ctx) 424 + claimed, err = st.ClaimIndexingJob(ctx, "worker-c", time.Now().Add(time.Minute).Format(time.RFC3339)) 422 425 if err != nil { 423 426 t.Fatalf("claim retried indexing job: %v", err) 424 427 } ··· 430 433 t.Fatalf("complete indexing job: %v", err) 431 434 } 432 435 433 - claimed, err = st.ClaimIndexingJob(ctx) 436 + claimed, err = st.ClaimIndexingJob(ctx, "worker-d", time.Now().Add(time.Minute).Format(time.RFC3339)) 434 437 if err != nil { 435 438 t.Fatalf("claim after complete: %v", err) 436 439 } 437 440 if claimed != nil { 438 441 t.Fatalf("expected no job after complete, got %#v", claimed) 442 + } 443 + 444 + got, err := st.GetIndexingJob(ctx, job.DocumentID) 445 + if err != nil { 446 + t.Fatalf("get completed job: %v", err) 447 + } 448 + if got == nil || got.Status != store.IndexingJobCompleted { 449 + t.Fatalf("expected completed job row, got %#v", got) 450 + } 451 + }) 452 + 453 + t.Run("indexing claim is single winner", func(t *testing.T) { 454 + job := store.IndexingJobInput{ 455 + DocumentID: "did:plc:owner|sh.tangled.repo|repo2", 456 + DID: "did:plc:owner", 457 + Collection: "sh.tangled.repo", 458 + RKey: "repo2", 459 + CID: "cid-repo2", 460 + RecordJSON: `{"name":"repo2"}`, 461 + Source: store.IndexSourceReadThrough, 462 + } 463 + if err := st.EnqueueIndexingJob(ctx, job); err != nil { 464 + t.Fatalf("enqueue job: %v", err) 465 + } 466 + 467 + var wg sync.WaitGroup 468 + results := make(chan *store.IndexingJob, 2) 469 + errs := make(chan error, 2) 470 + for _, worker := range []string{"worker-1", "worker-2"} { 471 + wg.Add(1) 472 + go func(worker string) { 473 + defer wg.Done() 474 + claimed, err := st.ClaimIndexingJob( 475 + ctx, worker, time.Now().Add(time.Minute).Format(time.RFC3339), 476 + ) 477 + errs <- err 478 + results <- claimed 479 + }(worker) 480 + } 481 + wg.Wait() 482 + close(results) 483 + close(errs) 484 + 485 + claimedCount := 0 486 + for err := range errs { 487 + if err != nil { 488 + t.Fatalf("claim concurrent job: %v", err) 489 + } 490 + } 491 + for claimed := range results { 492 + if claimed != nil { 493 + claimedCount++ 494 + } 495 + } 496 + if claimedCount != 1 { 497 + t.Fatalf("expected exactly one claimant, got %d", claimedCount) 498 + } 499 + }) 500 + 501 + t.Run("indexing audit and stats", func(t *testing.T) { 502 + if err := st.AppendIndexingAudit(ctx, store.IndexingAuditInput{ 503 + Source: store.IndexSourceReadThrough, 504 + DocumentID: "doc-audit", 505 + Collection: "sh.tangled.repo", 506 + CID: "cid-audit", 507 + Decision: "enqueued", 508 + }); err != nil { 509 + t.Fatalf("append indexing audit: %v", err) 510 + } 511 + stats, err := st.GetIndexingJobStats(ctx) 512 + if err != nil { 513 + t.Fatalf("get indexing stats: %v", err) 514 + } 515 + if stats.Completed < 1 { 516 + t.Fatalf("expected completed jobs in stats, got %#v", stats) 517 + } 518 + entries, err := st.ListIndexingAudit(ctx, store.IndexingAuditFilter{DocumentID: "doc-audit"}) 519 + if err != nil { 520 + t.Fatalf("list indexing audit: %v", err) 521 + } 522 + if len(entries) != 1 || entries[0].Decision != "enqueued" { 523 + t.Fatalf("unexpected audit rows: %#v", entries) 439 524 } 440 525 }) 441 526 }
-144
packages/api/overview.md
··· 1 - # Twister Backend Overview 2 - 3 - Twister is a Go backend service that indexes Tangled network content and serves 4 - search queries for the Twisted mobile client. It complements Tangled's public 5 - APIs by providing global search and derived data that's hard to compute client-side. 6 - 7 - ## Core Responsibilities 8 - 9 - 1. **Real-time Indexing**: Consumes record changes from Tap (Tangled's event 10 - stream) and indexes them into SQLite FTS5 for full-text search. 11 - 12 - 2. **Search API**: Exposes HTTP endpoints for keyword search over repos, 13 - profiles, issues, pull requests, and follows. 14 - 15 - 3. **Graph Augmentation**: Caches follower relationships and provides profile 16 - summaries not easily derivable from the public knot/PDS APIs. 17 - 18 - ## Architecture 19 - 20 - ```text 21 - ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ 22 - │ Tap │────▶│ Indexer │────▶│ Turso │ 23 - │ (event stream) │ (ingest) │ │ (SQLite) │ 24 - └─────────────┘ └─────────────┘ └──────┬──────┘ 25 - 26 - ┌─────────────┐ ┌─────────────┐ │ 27 - │ Twisted │────▶│ API │◀───────────┘ 28 - │ (mobile) │ │ (search) │ 29 - └─────────────┘ └─────────────┘ 30 - ``` 31 - 32 - ## Commands 33 - 34 - The `twister` binary provides six subcommands: 35 - 36 - - `api` / `serve`: HTTP server for search and proxy endpoints 37 - - `indexer`: Tap consumer that processes record events into the index 38 - - `backfill`: Discovers users from seeds and registers repos with Tap 39 - - `reindex`: Rebuilds the FTS index from existing documents 40 - - `enrich`: Backfills derived fields (handles, URLs) on existing documents 41 - - `healthcheck`: One-shot health probe 42 - 43 - ## Data Flow 44 - 45 - ### Indexing Pipeline (internal/ingest) 46 - 47 - 1. **Tap Consumer** (`internal/tapclient`): Connects to Tap WebSocket, reads 48 - record events with cursor-based resume. 49 - 50 - 2. **Normalizers** (`internal/normalize`): Collection-specific adapters convert 51 - raw records into normalized `Document` structs. Each collection (repos, 52 - issues, PRs, follows, profiles) has a dedicated adapter. 53 - 54 - 3. **Store** (`internal/store`): Upserts documents into SQLite with FTS5 index. 55 - Tracks cursor position in `sync_state` table for resume-after-restart. 56 - 57 - ### Search (internal/search) 58 - 59 - - **Keyword Search**: SQLite FTS5 full-text search with BM25 ranking 60 - - **Filters**: By collection, author, repo, language, date range, state 61 - - **Results**: Denormalized documents with snippet highlighting 62 - 63 - ## Key Packages 64 - 65 - | Package | Purpose | 66 - | ------------------------ | ------------------------------------------ | 67 - | `internal/api` | HTTP router, handlers, middleware | 68 - | `internal/store` | Database access layer (Turso/local SQLite) | 69 - | `internal/ingest` | Tap event processing pipeline | 70 - | `internal/normalize` | Record adapters for each collection type | 71 - | `internal/search` | FTS5 query execution | 72 - | `internal/backfill` | Graph discovery and Tap registration | 73 - | `internal/xrpc` | XRPC client for Tangled PDS/knot APIs | 74 - | `internal/constellation` | External service for graph queries | 75 - 76 - ## API Endpoints 77 - 78 - ### Search 79 - 80 - - `GET /search` - Unified search across all collections 81 - - `GET /search/keyword` - FTS5 keyword search 82 - 83 - ### Actors & Repos 84 - 85 - - `GET /actors/{handle}` - Profile data 86 - - `GET /actors/{handle}/repos` - User's repositories 87 - - `GET /actors/{handle}/repos/{repo}` - Repo details (tree, log, branches) 88 - 89 - ### Issues & Pulls 90 - 91 - - `GET /issues/{handle}/{rkey}` - Issue detail 92 - - `GET /pulls/{handle}/{rkey}` - Pull request detail 93 - 94 - ### Identity 95 - 96 - - `GET /identity/resolve` - Resolve handle to DID 97 - - `GET /identity/did/{did}` - DID document lookup 98 - 99 - ### Proxy 100 - 101 - - `GET /xrpc/knot/{knot}/{nsid}` - Proxy to Tangled knot 102 - - `GET /xrpc/pds/{pds}/{nsid}` - Proxy to PDS 103 - - `GET /xrpc/bsky/{nsid}` - Proxy to Bluesky 104 - 105 - ## Database Schema 106 - 107 - Core tables in Turso/libSQL: 108 - 109 - - `documents`: Denormalized search documents (title, body, metadata) 110 - - `documents_fts`: FTS5 virtual table for full-text search 111 - - `identity_handles`: DID → handle mapping with active status 112 - - `record_state`: Issue/PR state (open/closed/merged) 113 - - `sync_state`: Cursor tracking for Tap consumer resume 114 - - `indexing_jobs`: Queue for async read-through indexing 115 - 116 - ## Configuration 117 - 118 - Environment variables (loaded by `internal/config`): 119 - 120 - | Variable | Purpose | 121 - | --------------------- | ------------------------------------------ | 122 - | `HTTP_BIND_ADDR` | API server address (default `:8080`) | 123 - | `TURSO_DATABASE_URL` | libsql:// URL for Turso or file: for local | 124 - | `TURSO_AUTH_TOKEN` | Turso auth token (empty for local) | 125 - | `TAP_URL` | Tap WebSocket URL (required for indexer) | 126 - | `TAP_AUTH_PASSWORD` | Tap authentication password | 127 - | `INDEXED_COLLECTIONS` | Comma-separated allowlist (empty = all) | 128 - 129 - ## Local Development 130 - 131 - ```bash 132 - # API only (uses local SQLite file) 133 - just api-dev 134 - 135 - # Indexer only 136 - just api-run-indexer 137 - 138 - # Both in parallel (three terminals) 139 - pnpm dev # Frontend 140 - pnpm api:run:api # API 141 - pnpm api:run:indexer # Indexer 142 - ``` 143 - 144 - For remote Turso, use `--remote` flag or set `TURSO_DATABASE_URL` in `.env`.
-24
packages/scripts/api/README.md
··· 1 - # Twister API Smoke Checks 2 - 3 - Python smoke checks for Twister API endpoints, managed with uv. 4 - 5 - ## Usage 6 - 7 - From the repo root: 8 - 9 - ```sh 10 - # Run all 11 - uv run --project packages/scripts/api twister-api-smoke 12 - # Run specific checks (healthz | readyz | search | documents | indexing | activity) 13 - uv run --project packages/scripts/api twister-api-smoke --check healthz 14 - ``` 15 - 16 - ## Options 17 - 18 - - `--verbose` for detailed output of API responses (JSON) 19 - - `--base-url` (or env `TWISTER_API_BASE_URL`, default `http://localhost:8080`) 20 - - `--query` for search check (default `twisted`) 21 - - `--document-id` for documents check 22 - - `--actor-handle` for indexing check (default `desertthunder.dev`) 23 - - `--repo-at-uri` for repo fixture indexing/search checks 24 - - `--profile-at-uri` for profile fixture indexing/search checks
-1
packages/scripts/api/pyproject.toml
··· 2 2 name = "twister-api-smoke" 3 3 version = "0.1.0" 4 4 description = "Smoke checks for Twister API endpoints" 5 - readme = "README.md" 6 5 requires-python = ">=3.11" 7 6 dependencies = [] 8 7
+91 -12
packages/scripts/api/src/twister_api_smoke/cli.py
··· 1 + """Twister API Smoke Checks. 2 + 3 + 4 + Python smoke checks for Twister API endpoints, managed with uv. 5 + 6 + Usage 7 + ----- 8 + 9 + From the repo root: 10 + 11 + .. code:: sh 12 + 13 + # Run all 14 + uv run --project packages/scripts/api twister-api-smoke 15 + # Run specific checks (healthz | readyz | search | documents | indexing | admin | activity) 16 + uv run --project packages/scripts/api twister-api-smoke --check healthz 17 + 18 + Options 19 + ------- 20 + 21 + - ``--verbose`` for detailed output of API responses (JSON) 22 + - ``--base-url`` (or env ``TWISTER_API_BASE_URL``, default ``http://localhost:8080``) 23 + - ``--query`` for search check (default ``twisted``) 24 + - ``--document-id`` for documents check 25 + - ``--actor-handle`` for indexing check (default ``desertthunder.dev``) 26 + - ``--repo-at-uri`` for repo fixture indexing/search checks 27 + - ``--profile-at-uri`` for profile fixture indexing/search checks 28 + - ``--admin-token`` (or env ``ADMIN_AUTH_TOKEN``) for admin smoke checks 29 + """ 30 + 1 31 import argparse 2 32 import enum 3 33 import json ··· 44 74 actor_handle: str 45 75 repo_at_uri: str 46 76 profile_at_uri: str 77 + admin_token: str 47 78 verbose: bool 48 79 49 80 50 - def http_get_status(url: str) -> int: 81 + def auth_headers(token: str) -> dict[str, str]: 82 + if not token: 83 + return {} 84 + return {"Authorization": f"Bearer {token}"} 85 + 86 + 87 + def http_get_status(url: str, headers: dict[str, str] | None = None) -> int: 51 88 req = Request(url, method="GET") 89 + for key, value in (headers or {}).items(): 90 + req.add_header(key, value) 52 91 try: 53 92 with urlopen(req, timeout=10) as resp: 54 93 return resp.status ··· 58 97 fail(f"request failed for {url}: {err}") 59 98 60 99 61 - def http_get_json(url: str, params: dict[str, str] | None = None) -> Any: 100 + def http_get_json( 101 + url: str, 102 + params: dict[str, str] | None = None, 103 + headers: dict[str, str] | None = None, 104 + ) -> Any: 62 105 if params: 63 106 query = urlencode(params) 64 107 sep = "&" if "?" in url else "?" 65 108 url = f"{url}{sep}{query}" 66 109 67 110 req = Request(url, method="GET") 111 + for key, value in (headers or {}).items(): 112 + req.add_header(key, value) 68 113 try: 69 114 with urlopen(req, timeout=15) as resp: 70 115 payload = resp.read().decode("utf-8") ··· 208 253 echo("documents ok") 209 254 210 255 256 + def resolve_repo_name(opts: Options) -> str: 257 + payload = http_get_json(urljoin(opts.base_url, f"/actors/{opts.actor_handle}/repos")) 258 + records = payload.get("records") if isinstance(payload, dict) else None 259 + if not isinstance(records, list): 260 + fail("repo listing payload is missing records") 261 + for record in records: 262 + if not isinstance(record, dict): 263 + continue 264 + if record.get("uri") != opts.repo_at_uri: 265 + continue 266 + value = record.get("value") 267 + if isinstance(value, dict) and isinstance(value.get("name"), str): 268 + return value["name"] 269 + fail(f"repo fixture uri not found in /actors/{opts.actor_handle}/repos: {opts.repo_at_uri}") 270 + 271 + 211 272 def check_indexing(opts: Options) -> None: 212 273 repo_id = at_uri_to_document_id(opts.repo_at_uri) 213 274 profile_id = at_uri_to_document_id(opts.profile_at_uri) ··· 220 281 echo(f"triggering read-through fetch via /actors/{opts.actor_handle}") 221 282 assert_status(urljoin(opts.base_url, f"/actors/{opts.actor_handle}"), 200) 222 283 223 - echo(f"triggering read-through fetch via /actors/{opts.actor_handle}/repos") 224 - assert_status(urljoin(opts.base_url, f"/actors/{opts.actor_handle}/repos"), 200) 284 + repo_name = resolve_repo_name(opts) 285 + echo(f"triggering read-through fetch via /actors/{opts.actor_handle}/repos/{repo_name}") 286 + assert_status(urljoin(opts.base_url, f"/actors/{opts.actor_handle}/repos/{repo_name}"), 200) 225 287 226 288 echo(f"waiting for queued indexing of profile fixture {profile_id}") 227 289 for _ in range(30): ··· 247 309 fail(f"repo fixture did not become available at /documents/{repo_encoded} within 30s") 248 310 249 311 312 + def check_admin(opts: Options) -> None: 313 + if not opts.admin_token: 314 + fail("admin check requires --admin-token or ADMIN_AUTH_TOKEN") 315 + echo("checking GET /admin/status") 316 + payload = http_get_json( 317 + urljoin(opts.base_url, "/admin/status"), 318 + headers=auth_headers(opts.admin_token), 319 + ) 320 + if not isinstance(payload, dict) or "read_through" not in payload: 321 + fail("admin status response is missing read_through data") 322 + maybe_log_json(opts, "admin-status", payload) 323 + echo("admin ok") 324 + 325 + 250 326 def check_activity(opts: Options) -> None: 251 327 echo("checking websocket handshake on /activity/stream") 252 328 parsed = urlparse(opts.base_url) ··· 285 361 "search": check_search, 286 362 "documents": check_documents, 287 363 "indexing": check_indexing, 364 + "admin": check_admin, 288 365 "activity": check_activity, 289 366 } 290 367 ··· 322 399 help="Profile AT URI expected to be fetched and indexed by smoke checks", 323 400 ) 324 401 parser.add_argument( 402 + "--admin-token", 403 + default=os.environ.get("ADMIN_AUTH_TOKEN", ""), 404 + help="Bearer token for admin smoke checks", 405 + ) 406 + parser.add_argument( 325 407 "--verbose", action="store_true", help="Print JSON payloads returned by smoke endpoints" 326 408 ) 327 409 ··· 335 417 actor_handle=ns.actor_handle, 336 418 repo_at_uri=ns.repo_at_uri, 337 419 profile_at_uri=ns.profile_at_uri, 420 + admin_token=ns.admin_token, 338 421 verbose=ns.verbose, 339 422 ) 340 423 ··· 342 425 def main(argv: list[str] | None = None) -> int: 343 426 opts = parse_args(sys.argv[1:] if argv is None else argv) 344 427 if opts.check == "all": 345 - for name in ( 346 - "healthz", 347 - "readyz", 348 - "indexing", 349 - "search", 350 - "documents", 351 - "activity", 352 - ): 428 + checks = ["healthz", "readyz", "indexing", "search", "documents", "activity"] 429 + if opts.admin_token: 430 + checks.append("admin") 431 + for name in checks: 353 432 CHECKS[name](opts) 354 433 echo("all API smoke checks passed") 355 434 return 0