at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 153 lines 9.1 kB view raw view rendered
1# hydrant 2 3`hydrant` is an AT Protocol indexer built on the `fjall` database that handles sync for you. it's flexible, supporting both full-network indexing and filtered indexing (e.g., by DID), also allowing querying with XRPCs and providing an ordered event stream with cursor support. 4 5you can see [random.wisp.place](https://tangled.org/did:plc:dfl62fgb7wtjj3fcbb72naae/random.wisp.place) for an example on how to use hydrant. 6 7**WARNING: *the db format is not stable yet.*** it's in active development so if you are going to rely on the db format being stable, don't (eg. for query features, if you are using ephemeral mode this doesn't matter for example, or you dont mind losing your existing backfilled data in hydrant if you already processed them.). 8 9## vs `tap` 10 11while [`tap`](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) is designed as a firehose consumer and simply just propagates events while handling sync, `hydrant` is flexible, it allows you to directly query the database for records, and it also provides an ordered view of events, allowing the use of a cursor to fetch events from a specific point in time. 12 13### stream behavior 14 15the `WS /stream` (hydrant) and `WS /channel` (tap) endpoints have different designs: 16 17| aspect | `tap` (`/channel`) | `hydrant` (`/stream`) | 18| :--- | :--- | :--- | 19| distribution | sharded work queue: events are load-balanced across connected clients. If 5 clients connect, each receives ~20% of events. | broadcast: every connected client receives a full copy of the event stream. if 5 clients connect, all 5 receive 100% of events. | 20| cursors | server-managed: clients ACK messages. the server tracks progress and redelivers unacked messages. | client-managed: client provides `?cursor=123`. the server streams from that point. | 21| persistence | events are stored in an outbox and sent to the consumer, removing them, so they can't be replayed once they are acked. | `record` events are replayable. `identity`/`account` are ephemeral. use `GET /repos/:did` to query identity / account info (handle, pds, signing key, etc.). | 22| backfill | backfill events are mixed into the live queue and prioritized (per-repo, acting as synchronization barrier) by the server. | backfill simply inserts historical events (`live: false`) into the global event log. streaming is just reading this log sequentially. synchronization is the same as tap, `live: true` vs `live: false`. | 23| event types | `record`, `identity` (includes status) | `record`, `identity` (handle), `account` (status) | 24 25### multiple relay support 26 27`hydrant` supports connecting to multiple relays simultaneously for both firehose ingestion and crawling. when `RELAY_HOSTS` is configured with multiple URLs: 28 29- one independent firehose stream loop is spawned per relay 30- one independent crawling loop is spawned per relay 31- each relay maintains its own firehose / crawler cursor state 32- all ingestion loops and crawlers share the same worker pool and database 33- all crawlers share the same pending queue for backfill 34 35## configuration 36 37`hydrant` is configured via environment variables. all variables are prefixed with `HYDRANT_` (except `RUST_LOG`). 38 39| variable | default | description | 40| :--- | :--- | :--- | 41| `DATABASE_PATH` | `./hydrant.db` | path to the database folder. | 42| `RUST_LOG` | `info` | log filter directives (e.g., `debug`, `hydrant=trace`). standard [`tracing` env-filter syntax](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html). | 43| `RELAY_HOST` | `http://relay.fire.hose.cam` | URL of the relay. | 44| `RELAY_HOSTS` | | comma-separated list of relay URLs. if unset, falls back to `RELAY_HOST`. | 45| `PLC_URL` | `https://plc.wtf` | base URL(s) of the PLC directory (comma-separated for multiple). | 46| `FULL_NETWORK` | `false` | if `true`, discovers and indexes all repositories in the network. | 47| `FILTER_SIGNALS` | | comma-separated list of NSID patterns to use for the filter on startup (e.g. `app.bsky.feed.post,app.bsky.graph.*`). | 48| `FILTER_COLLECTIONS` | | comma-separated list of NSID patterns to use for the collections filter on startup. | 49| `FILTER_EXCLUDES` | | comma-separated list of DIDs to exclude from indexing on startup. | 50| `FIREHOSE_WORKERS` | `8` (`32` if full network) | number of concurrent workers for firehose events. | 51| `BACKFILL_CONCURRENCY_LIMIT` | `128` | maximum number of concurrent backfill tasks. | 52| `VERIFY_SIGNATURES` | `full` | signature verification level: `full`, `backfill-only`, or `none`. | 53| `CURSOR_SAVE_INTERVAL` | `5` | interval (in seconds) to save the firehose cursor. | 54| `REPO_FETCH_TIMEOUT` | `300` | timeout (in seconds) for fetching repositories. | 55| `CACHE_SIZE` | `256` | size of the database cache in MB. | 56| `IDENTITY_CACHE_SIZE` | `1000000` | number of identity entries to cache. | 57| `API_PORT` | `3000` | port for the API server. | 58| `ENABLE_DEBUG` | `false` | enable debug endpoints. | 59| `DEBUG_PORT` | `3001` | port for debug endpoints (if enabled). | 60| `NO_LZ4_COMPRESSION` | `false` | disable lz4 compression for storage. | 61| `EPHEMERAL` | `false` | if enabled, no records are stored (XRPCs won't be reliable). events are only stored up to an hour for playback. | 62| `ENABLE_FIREHOSE` | `true` | whether to ingest relay subscriptions. | 63| `ENABLE_BACKFILL` | `true` | whether to backfill from PDS instances. | 64| `ENABLE_CRAWLER` | `false` (if Filter), `true` (if Full) | whether to actively query the network for unknown repositories. | 65| `CRAWLER_MAX_PENDING_REPOS` | `2000` | max pending repos for crawler. | 66| `CRAWLER_RESUME_PENDING_REPOS` | `1000` | resume threshold for crawler pending repos. | 67 68## api 69 70### management 71 72- `GET /filter`: get the current filter configuration. 73- `PATCH /filter`: update the filter configuration. 74 75#### filter mode 76 77the `mode` field controls what gets indexed: 78 79| mode | behaviour | 80| :--- | :--- | 81| `filter` | auto-discovers and backfills any account whose firehose commit touches a collection matching one of the `signals` patterns. you can also explicitly track individual repositories via the `/repos` endpoint regardless of matching signals. | 82| `full` | index the entire network. `signals` are ignored for discovery, but `excludes` and `collections` still apply. | 83 84#### fields 85 86| field | type | description | 87| :--- | :--- | :--- | 88| `mode` | `"filter"` \| `"full"` | indexing mode (see above). | 89| `signals` | set update | NSID patterns (e.g. `app.bsky.feed.post` or `app.bsky.*`) that trigger auto-discovery in `filter` mode. | 90| `collections` | set update | NSID patterns used to filter which records are stored. if empty, all collections are stored. applies in all modes. | 91| `excludes` | set update | set of DIDs to always skip, regardless of mode. checked before any other filter logic. | 92 93#### set updates 94 95each set field accepts one of two forms: 96 97- **replace**: an array replaces the entire set — `["did:plc:abc", "did:web:example.org"]` 98- **patch**: an object maps items to `true` (add) or `false` (remove) — `{"did:plc:abc": true, "did:web:example.org": false}` 99 100#### NSID patterns 101 102`signals` and `collections` support an optional `.*` suffix to match an entire namespace: 103 104- `app.bsky.feed.post` — exact match only 105- `app.bsky.feed.*` — matches any collection under `app.bsky.feed` 106 107### repository management 108 109- `GET /repos`: get an NDJSON stream of repositories and their sync status. supports pagination and filtering: 110 - `limit`: max results (default 100, max 1000) 111 - `cursor`: opaque key for paginating. 112 - `partition`: `all` (default), `pending` (backfill queue), or `resync` (retries) 113- `GET /repos/{did}`: get the sync status and metadata of a specific repository. also returns the handle, PDS URL and the atproto signing key (these won't be available before the repo has been backfilled once at least). 114- `PUT /repos`: explicitly track repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). 115- `DELETE /repos`: untrack repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). optionally include `"deleteData": true` to also purge the repository from the database. 116 117### data access (xrpc) 118 119`hydrant` implements the following XRPC endpoints under `/xrpc/`: 120 121#### com.atproto.* 122 123the following are implemented currently: 124- `com.atproto.repo.getRecord` 125- `com.atproto.repo.listRecords` 126 127#### systems.gaze.hydrant.* 128 129these are some non-standard XRPCs that might be useful. 130 131##### systems.gaze.hydrant.countRecords 132 133return the total number of stored records in a collection. 134 135| param | required | description | 136| :--- | :--- | :--- | 137| `identifier` | yes | DID or handle of the repository. | 138| `collection` | yes | NSID of the collection. | 139 140returns `{ count }`. 141 142### event stream 143 144- `GET /stream`: subscribe to the event stream. 145 - query parameters: 146 - `cursor` (optional): start streaming from a specific event ID. 147- `POST /stream/ack`: ack events. 148 - body: 149 - `ids`: list of event IDs to acknowledge. 150 151### stats 152 153- `GET /stats`: get aggregate counts of repos, records, events, and errors.