at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1# hydrant
2
3`hydrant` is an AT Protocol indexer built on the `fjall` database that handles sync for you. it's flexible, supporting both full-network indexing and filtered indexing (e.g., by DID), also allowing querying with XRPCs and providing an ordered event stream with cursor support.
4
5you can see [random.wisp.place](https://tangled.org/did:plc:dfl62fgb7wtjj3fcbb72naae/random.wisp.place) for an example on how to use hydrant.
6
7**WARNING: *the db format is not stable yet.*** it's in active development so if you are going to rely on the db format being stable, don't (eg. for query features, if you are using ephemeral mode this doesn't matter for example, or you dont mind losing your existing backfilled data in hydrant if you already processed them.).
8
9## vs `tap`
10
11while [`tap`](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) is designed as a firehose consumer and simply just propagates events while handling sync, `hydrant` is flexible, it allows you to directly query the database for records, and it also provides an ordered view of events, allowing the use of a cursor to fetch events from a specific point in time.
12
13### stream behavior
14
15the `WS /stream` (hydrant) and `WS /channel` (tap) endpoints have different designs:
16
17| aspect | `tap` (`/channel`) | `hydrant` (`/stream`) |
18| :--- | :--- | :--- |
19| distribution | sharded work queue: events are load-balanced across connected clients. If 5 clients connect, each receives ~20% of events. | broadcast: every connected client receives a full copy of the event stream. if 5 clients connect, all 5 receive 100% of events. |
20| cursors | server-managed: clients ACK messages. the server tracks progress and redelivers unacked messages. | client-managed: client provides `?cursor=123`. the server streams from that point. |
21| persistence | events are stored in an outbox and sent to the consumer, removing them, so they can't be replayed once they are acked. | `record` events are replayable. `identity`/`account` are ephemeral. use `GET /repos/:did` to query identity / account info (handle, pds, signing key, etc.). |
22| backfill | backfill events are mixed into the live queue and prioritized (per-repo, acting as synchronization barrier) by the server. | backfill simply inserts historical events (`live: false`) into the global event log. streaming is just reading this log sequentially. synchronization is the same as tap, `live: true` vs `live: false`. |
23| event types | `record`, `identity` (includes status) | `record`, `identity` (handle), `account` (status) |
24
25### multiple relay support
26
27`hydrant` supports connecting to multiple relays simultaneously for both firehose ingestion and crawling. when `RELAY_HOSTS` is configured with multiple URLs:
28
29- one independent firehose stream loop is spawned per relay
30- one independent crawling loop is spawned per relay
31- each relay maintains its own firehose / crawler cursor state
32- all ingestion loops and crawlers share the same worker pool and database
33- all crawlers share the same pending queue for backfill
34
35## configuration
36
37`hydrant` is configured via environment variables. all variables are prefixed with `HYDRANT_` (except `RUST_LOG`).
38
39| variable | default | description |
40| :--- | :--- | :--- |
41| `DATABASE_PATH` | `./hydrant.db` | path to the database folder. |
42| `RUST_LOG` | `info` | log filter directives (e.g., `debug`, `hydrant=trace`). standard [`tracing` env-filter syntax](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html). |
43| `RELAY_HOST` | `http://relay.fire.hose.cam` | URL of the relay. |
44| `RELAY_HOSTS` | | comma-separated list of relay URLs. if unset, falls back to `RELAY_HOST`. |
45| `PLC_URL` | `https://plc.wtf` | base URL(s) of the PLC directory (comma-separated for multiple). |
46| `FULL_NETWORK` | `false` | if `true`, discovers and indexes all repositories in the network. |
47| `FILTER_SIGNALS` | | comma-separated list of NSID patterns to use for the filter on startup (e.g. `app.bsky.feed.post,app.bsky.graph.*`). |
48| `FILTER_COLLECTIONS` | | comma-separated list of NSID patterns to use for the collections filter on startup. |
49| `FILTER_EXCLUDES` | | comma-separated list of DIDs to exclude from indexing on startup. |
50| `FIREHOSE_WORKERS` | `8` (`32` if full network) | number of concurrent workers for firehose events. |
51| `BACKFILL_CONCURRENCY_LIMIT` | `128` | maximum number of concurrent backfill tasks. |
52| `VERIFY_SIGNATURES` | `full` | signature verification level: `full`, `backfill-only`, or `none`. |
53| `CURSOR_SAVE_INTERVAL` | `5` | interval (in seconds) to save the firehose cursor. |
54| `REPO_FETCH_TIMEOUT` | `300` | timeout (in seconds) for fetching repositories. |
55| `CACHE_SIZE` | `256` | size of the database cache in MB. |
56| `IDENTITY_CACHE_SIZE` | `1000000` | number of identity entries to cache. |
57| `API_PORT` | `3000` | port for the API server. |
58| `ENABLE_DEBUG` | `false` | enable debug endpoints. |
59| `DEBUG_PORT` | `3001` | port for debug endpoints (if enabled). |
60| `NO_LZ4_COMPRESSION` | `false` | disable lz4 compression for storage. |
61| `EPHEMERAL` | `false` | if enabled, no records are stored (XRPCs won't be reliable). events are only stored up to an hour for playback. |
62| `ENABLE_FIREHOSE` | `true` | whether to ingest relay subscriptions. |
63| `ENABLE_BACKFILL` | `true` | whether to backfill from PDS instances. |
64| `ENABLE_CRAWLER` | `false` (if Filter), `true` (if Full) | whether to actively query the network for unknown repositories. |
65| `CRAWLER_MAX_PENDING_REPOS` | `2000` | max pending repos for crawler. |
66| `CRAWLER_RESUME_PENDING_REPOS` | `1000` | resume threshold for crawler pending repos. |
67
68## api
69
70### management
71
72- `GET /filter`: get the current filter configuration.
73- `PATCH /filter`: update the filter configuration.
74
75#### filter mode
76
77the `mode` field controls what gets indexed:
78
79| mode | behaviour |
80| :--- | :--- |
81| `filter` | auto-discovers and backfills any account whose firehose commit touches a collection matching one of the `signals` patterns. you can also explicitly track individual repositories via the `/repos` endpoint regardless of matching signals. |
82| `full` | index the entire network. `signals` are ignored for discovery, but `excludes` and `collections` still apply. |
83
84#### fields
85
86| field | type | description |
87| :--- | :--- | :--- |
88| `mode` | `"filter"` \| `"full"` | indexing mode (see above). |
89| `signals` | set update | NSID patterns (e.g. `app.bsky.feed.post` or `app.bsky.*`) that trigger auto-discovery in `filter` mode. |
90| `collections` | set update | NSID patterns used to filter which records are stored. if empty, all collections are stored. applies in all modes. |
91| `excludes` | set update | set of DIDs to always skip, regardless of mode. checked before any other filter logic. |
92
93#### set updates
94
95each set field accepts one of two forms:
96
97- **replace**: an array replaces the entire set — `["did:plc:abc", "did:web:example.org"]`
98- **patch**: an object maps items to `true` (add) or `false` (remove) — `{"did:plc:abc": true, "did:web:example.org": false}`
99
100#### NSID patterns
101
102`signals` and `collections` support an optional `.*` suffix to match an entire namespace:
103
104- `app.bsky.feed.post` — exact match only
105- `app.bsky.feed.*` — matches any collection under `app.bsky.feed`
106
107### repository management
108
109- `GET /repos`: get an NDJSON stream of repositories and their sync status. supports pagination and filtering:
110 - `limit`: max results (default 100, max 1000)
111 - `cursor`: opaque key for paginating.
112 - `partition`: `all` (default), `pending` (backfill queue), or `resync` (retries)
113- `GET /repos/{did}`: get the sync status and metadata of a specific repository. also returns the handle, PDS URL and the atproto signing key (these won't be available before the repo has been backfilled once at least).
114- `PUT /repos`: explicitly track repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same).
115- `DELETE /repos`: untrack repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). optionally include `"deleteData": true` to also purge the repository from the database.
116
117### data access (xrpc)
118
119`hydrant` implements the following XRPC endpoints under `/xrpc/`:
120
121#### com.atproto.*
122
123the following are implemented currently:
124- `com.atproto.repo.getRecord`
125- `com.atproto.repo.listRecords`
126
127#### systems.gaze.hydrant.*
128
129these are some non-standard XRPCs that might be useful.
130
131##### systems.gaze.hydrant.countRecords
132
133return the total number of stored records in a collection.
134
135| param | required | description |
136| :--- | :--- | :--- |
137| `identifier` | yes | DID or handle of the repository. |
138| `collection` | yes | NSID of the collection. |
139
140returns `{ count }`.
141
142### event stream
143
144- `GET /stream`: subscribe to the event stream.
145 - query parameters:
146 - `cursor` (optional): start streaming from a specific event ID.
147- `POST /stream/ack`: ack events.
148 - body:
149 - `ids`: list of event IDs to acknowledge.
150
151### stats
152
153- `GET /stats`: get aggregate counts of repos, records, events, and errors.