Ramjet#
Single-node Rust service that consumes the ATProtocol firehose, persists records for tracked collections in fjall (pure-Rust LSM-tree), and re-emits events via WebSocket fan-out with priority ordering.
Motivation#
ATProtocol applications often need a mix of precisely tracked collections and best-effort network-wide intake. Ramjet solves this with separately configured tracked and forwarded collection sets:
- Tracked collections are persisted to storage and emitted on the high-priority fan-out channel — these are the records your application depends on for correctness.
- Forwarded collections are emitted on the low-priority fan-out channel without being persisted — useful for analytics, search indexing, or other consumers that want broad visibility without the storage cost.
For example, Lexicon Garden needs accurately tracked lexicon schema and AppView-specific collections, but also wants all records for analytics. Smoke Signal similarly tracks event, RSVP, profile, and location records from different collections explicitly, while still consuming the full network stream.
Both collection configurations can be updated at runtime, and the admin API supports on-demand backfill and resync operations for any repository — making it straightforward to add new tracked collections and catch up on historical data.
Quick start#
cargo build --release
cargo run --release -- \
--db-path /var/lib/ramjet/data \
--listen-addr 0.0.0.0:8080 \
--relay-host bsky.network \
--tracked-collections "app.bsky.feed.*" \
--forward-collections "app.bsky.**"
Configuration#
All options are available as CLI flags or environment variables.
| Flag | Env | Default | Description |
|---|---|---|---|
--db-path |
RAMJET_DB_PATH |
./data/ramjet.db |
fjall database directory |
--listen-addr |
RAMJET_LISTEN_ADDR |
0.0.0.0:8080 |
HTTP listen address |
--relay-host |
RAMJET_RELAY_HOST |
bsky.network |
Firehose relay hostname |
--tracked-collections |
RAMJET_TRACKED_COLLECTIONS |
* |
Collections to persist (space-separated patterns) |
--forward-collections |
RAMJET_FORWARD_COLLECTIONS |
* |
Collections to forward via WebSocket only |
--event-retention-hours |
RAMJET_EVENT_RETENTION_HOURS |
72 |
Hours to retain events |
--batch-size |
RAMJET_BATCH_SIZE |
500 |
Ingester batch size |
--batch-timeout-ms |
RAMJET_BATCH_TIMEOUT_MS |
100 |
Batch flush timeout (ms) |
--admin-dids |
RAMJET_ADMIN_DIDS |
(empty) | Comma-separated admin DIDs for protected endpoints |
--zstd-dict-path |
RAMJET_ZSTD_DICT_PATH |
(none) | Path to zstd dictionary for event compression |
--backfill |
RAMJET_BACKFILL |
(empty) | Comma-separated DIDs to backfill on startup |
--consumer-group |
RAMJET_CONSUMER_GROUPS |
(empty) | Consumer group definitions (name:partitions, comma-separated) |
Collection patterns#
*— match all collectionscom.example.feed— exact NSID matchcom.example.*— single-segment wildcard (matchescom.example.feedbut notcom.example.feed.like)com.example.**— glob wildcard (matchescom.example.feed,com.example.feed.like, etc.)
Zstd dictionary compression#
When --zstd-dict-path is provided, Ramjet compresses events in the events keyspace using a pre-trained zstd dictionary. The dictionary is identified by hash and stored in the meta keyspace so dictionary changes are detected on restart. Clients can download the active dictionary via the GET /dictionary endpoint (supports If-None-Match with CID-based ETags).
Consumer groups#
Consumer groups partition events across multiple WebSocket connections by DID hash, similar to Kafka consumer groups. Define groups at startup:
cargo run --release -- \
--consumer-group indexers:3 \
--consumer-group notifiers:2
Clients connect with ?group=indexers&partition=0 to receive only events for DIDs that hash to partition 0. All events for the same DID always route to the same partition. Omitting group/partition gives broadcast mode (all events).
Architecture#
Five async pipelines connected by channels:
Firehose ──→ Ingester ──mpsc──→ Writer ──broadcast──→ WebSocket Fan-out
│ │ ↑
│ └─partitioned──→┤ (consumer groups)
├─→ fjall storage │
└─→ events keyspace ───────┘ (cursor replay)
Backfill Worker ──→ CAR fetch ──→ fjall storage
Identity Worker ──→ DID/handle resolution ──→ fjall storage
- Firehose Ingester — connects to the relay via WebSocket, decodes CBOR frames (
#commit,#identity,#account), and sends events over an mpsc channel. Auto-reconnects with a 2s delay. - Batch Writer — collects events into configurable batches (size + timeout), writes to fjall atomically via
WriteBatch, and broadcasts to fan-out channels with priority routing (tracked = high, forwarded = low). Filters denied repos. QueuestooBigcommits and desynced repos for backfill. - WebSocket Fan-out — serves
dev.ngerakines.ramjet.stream.subscribewith cursor-based replay from the events keyspace, then switches to live streaming with biased priority delivery (high > low > client recv). Supports both broadcast and partitioned (consumer group) modes. - Identity Worker — resolves DID documents and handle mappings via concurrent resolution (bounded by a semaphore), updating the
did_to_docandhandle_to_didkeyspaces. - Backfill Worker — polls a backfill queue every 5s, fetches full repo CARs from PDS endpoints using
DiskRepository(spills to disk for large repos), parses records, and writes to storage. Repos exceeding 5 consecutive failures are auto-denied.
Storage#
Eight fjall keyspaces:
| Keyspace | Key format | Value | Purpose |
|---|---|---|---|
records |
did\0collection\0rkey\0rev |
CID + DAG-CBOR (empty = tombstone) | Versioned record history |
events |
u64 BE |
Compact binary event | Event log for cursor replay |
meta |
string key | varies | Cursor, sequence counter, backfill queue, RIBLT cache |
repo_state |
DID bytes | RepoState (rev + status + denied + backfilled) |
Per-repo state |
did_to_doc |
DID bytes | Timestamped JSON DID document | Identity cache |
handle_to_did |
handle bytes | DID string | Handle-to-DID mapping |
blobs |
blob key | blob data | Blob storage |
blob_meta |
blob key | metadata | Blob metadata |
Records are versioned by including the repository revision in the key. Each create/update appends a new entry; deletes append a tombstone (empty value). The latest version of a record is the last entry in a prefix scan over did\0collection\0rkey\0. This provides a built-in change history without a separate temporal keyspace.
Event encoding#
Events are stored in a compact binary format with tag-based versioning:
- V1 tags (0x01-0x03): 1B tag + 8B sequence + variable fields
- V2 tags (0x04-0x06): 1B tag + 8B sequence + 8B timestamp (microseconds) + variable fields
Both commit operations, identity events, and account events have dedicated compact encodings. The V2 format adds microsecond timestamps for latency tracking. Events are optionally compressed with zstd dictionary compression before storage.
API#
Health and metrics#
GET /_health— returns{"status":"ok","version":"1.2.0"}GET /metrics— Prometheus text format (includes HTTP metrics, pipeline counters, tokio task metrics, fan-out queue depths)
XRPC endpoints#
| Method | Endpoint | Description |
|---|---|---|
| GET | com.atproto.repo.getRecord |
Fetch a single record by repo/collection/rkey |
| GET | com.atproto.repo.listRecords |
List records in a collection with cursor pagination |
| GET | com.atproto.repo.describeRepo |
Repo metadata, collections, rev, DID document |
| GET | com.atproto.identity.resolveIdentity |
Resolve DID or handle to DID document (supports Cache-Control: max-stale=N) |
| GET | com.atproto.identity.resolveHandle |
Resolve handle to DID |
| GET | com.atproto.identity.resolveDid |
Fetch DID document |
| GET | dev.ngerakines.ramjet.stream.subscribe |
WebSocket event stream with cursor replay and consumer group support |
| GET | dev.ngerakines.ramjet.repos.getReconciliation |
RIBLT sketch for set reconciliation |
Set reconciliation (RIBLT)#
The getReconciliation endpoint returns an RIBLT sketch for a DID's tracked records, allowing consumers to efficiently determine which records they're missing without transferring the full record list.
GET /xrpc/dev.ngerakines.ramjet.repos.getReconciliation?did=did:plc:abc123
Returns application/x-riblt binary with response headers:
ETag— repo rev (supportsIf-None-Matchfor 304 Not Modified)X-Riblt-Records— number of records encodedX-Riblt-Rev— repo revision at time of sketch generation
Optional collection parameter filters the sketch to a single collection.
Client workflow:
- Build a local sketch from your records using the same symbol encoding (
collection\0rkey\0cid) - Fetch Ramjet's sketch
- Subtract and decode — the symmetric difference tells you which records you're missing (and which you have that Ramjet doesn't)
- Fetch missing records via
getRecordorlistRecords
Sketch sizes are allocated in 25,000-cell increments, bumping to the next tier when the record count exceeds 50% past the current size. Sketches are cached per-DID and automatically invalidated when records change.
Admin endpoints (JWT auth required)#
| Method | Endpoint | Description |
|---|---|---|
| GET | dev.ngerakines.ramjet.repos.getState |
Get repo state (rev, status, denied) |
| POST | dev.ngerakines.ramjet.repos.setState |
Set repo denied flag |
| POST | dev.ngerakines.ramjet.repos.resync |
Queue a repo for backfill |
| POST | dev.ngerakines.ramjet.repos.rebuildReconciliation |
Force rebuild of RIBLT sketch cache |
Other endpoints#
| Method | Endpoint | Description |
|---|---|---|
| GET | /dictionary |
Download the active zstd dictionary (supports If-None-Match with CID ETags) |
Development#
cargo check # type-check
cargo test # run tests (50 unit tests)
cargo fmt # format code
cargo build # debug build
Binaries#
ramjet— main serviceramjet-writer— benchmark binary for ingester-to-writer pipeline analysisramjet-data— data inspection toolramjet-dictgen— zstd dictionary generatorramjet-forecast— capacity forecastingramjet-consumer— example WebSocket consumerrjtop— TUI dashboard
License#
MIT