Ramjet is a relay consumer that supports configurable forward and track collections, as well as record reconciliation.
event-stream relay firehose riblt atprotocol
Rust 99.5%
Dockerfile 0.5%
4 1 0

Clone this repository

https://tangled.org/ngerakines.me/ramjet https://tangled.org/did:plc:cbkjy5n7bk3ax2wplmtjofq2/ramjet
git@tangled.org:ngerakines.me/ramjet git@tangled.org:did:plc:cbkjy5n7bk3ax2wplmtjofq2/ramjet

For self-hosted knots, clone URLs may differ based on your setup.

Download tar.gz
README.md

Ramjet#

Single-node Rust service that consumes the ATProtocol firehose, persists records for tracked collections in fjall (pure-Rust LSM-tree), and re-emits events via WebSocket fan-out with priority ordering.

Motivation#

ATProtocol applications often need a mix of precisely tracked collections and best-effort network-wide intake. Ramjet solves this with separately configured tracked and forwarded collection sets:

  • Tracked collections are persisted to storage and emitted on the high-priority fan-out channel — these are the records your application depends on for correctness.
  • Forwarded collections are emitted on the low-priority fan-out channel without being persisted — useful for analytics, search indexing, or other consumers that want broad visibility without the storage cost.

For example, Lexicon Garden needs accurately tracked lexicon schema and AppView-specific collections, but also wants all records for analytics. Smoke Signal similarly tracks event, RSVP, profile, and location records from different collections explicitly, while still consuming the full network stream.

Both collection configurations can be updated at runtime, and the admin API supports on-demand backfill and resync operations for any repository — making it straightforward to add new tracked collections and catch up on historical data.

Quick start#

cargo build --release

cargo run --release -- \
  --db-path /var/lib/ramjet/data \
  --listen-addr 0.0.0.0:8080 \
  --relay-host bsky.network \
  --tracked-collections "app.bsky.feed.*" \
  --forward-collections "app.bsky.**"

Configuration#

All options are available as CLI flags or environment variables.

Flag Env Default Description
--db-path RAMJET_DB_PATH ./data/ramjet.db fjall database directory
--listen-addr RAMJET_LISTEN_ADDR 0.0.0.0:8080 HTTP listen address
--relay-host RAMJET_RELAY_HOST bsky.network Firehose relay hostname
--tracked-collections RAMJET_TRACKED_COLLECTIONS * Collections to persist (space-separated patterns)
--forward-collections RAMJET_FORWARD_COLLECTIONS * Collections to forward via WebSocket only
--event-retention-hours RAMJET_EVENT_RETENTION_HOURS 72 Hours to retain events
--batch-size RAMJET_BATCH_SIZE 500 Ingester batch size
--batch-timeout-ms RAMJET_BATCH_TIMEOUT_MS 100 Batch flush timeout (ms)
--admin-dids RAMJET_ADMIN_DIDS (empty) Comma-separated admin DIDs for protected endpoints
--zstd-dict-path RAMJET_ZSTD_DICT_PATH (none) Path to zstd dictionary for event compression
--backfill RAMJET_BACKFILL (empty) Comma-separated DIDs to backfill on startup
--consumer-group RAMJET_CONSUMER_GROUPS (empty) Consumer group definitions (name:partitions, comma-separated)

Collection patterns#

  • * — match all collections
  • com.example.feed — exact NSID match
  • com.example.* — single-segment wildcard (matches com.example.feed but not com.example.feed.like)
  • com.example.** — glob wildcard (matches com.example.feed, com.example.feed.like, etc.)

Zstd dictionary compression#

When --zstd-dict-path is provided, Ramjet compresses events in the events keyspace using a pre-trained zstd dictionary. The dictionary is identified by hash and stored in the meta keyspace so dictionary changes are detected on restart. Clients can download the active dictionary via the GET /dictionary endpoint (supports If-None-Match with CID-based ETags).

Consumer groups#

Consumer groups partition events across multiple WebSocket connections by DID hash, similar to Kafka consumer groups. Define groups at startup:

cargo run --release -- \
  --consumer-group indexers:3 \
  --consumer-group notifiers:2

Clients connect with ?group=indexers&partition=0 to receive only events for DIDs that hash to partition 0. All events for the same DID always route to the same partition. Omitting group/partition gives broadcast mode (all events).

Architecture#

Five async pipelines connected by channels:

Firehose ──→ Ingester ──mpsc──→ Writer ──broadcast──→ WebSocket Fan-out
                                  │          │               ↑
                                  │          └─partitioned──→┤ (consumer groups)
                                  ├─→ fjall storage          │
                                  └─→ events keyspace ───────┘ (cursor replay)

Backfill Worker ──→ CAR fetch ──→ fjall storage
Identity Worker ──→ DID/handle resolution ──→ fjall storage
  1. Firehose Ingester — connects to the relay via WebSocket, decodes CBOR frames (#commit, #identity, #account), and sends events over an mpsc channel. Auto-reconnects with a 2s delay.
  2. Batch Writer — collects events into configurable batches (size + timeout), writes to fjall atomically via WriteBatch, and broadcasts to fan-out channels with priority routing (tracked = high, forwarded = low). Filters denied repos. Queues tooBig commits and desynced repos for backfill.
  3. WebSocket Fan-out — serves dev.ngerakines.ramjet.stream.subscribe with cursor-based replay from the events keyspace, then switches to live streaming with biased priority delivery (high > low > client recv). Supports both broadcast and partitioned (consumer group) modes.
  4. Identity Worker — resolves DID documents and handle mappings via concurrent resolution (bounded by a semaphore), updating the did_to_doc and handle_to_did keyspaces.
  5. Backfill Worker — polls a backfill queue every 5s, fetches full repo CARs from PDS endpoints using DiskRepository (spills to disk for large repos), parses records, and writes to storage. Repos exceeding 5 consecutive failures are auto-denied.

Storage#

Eight fjall keyspaces:

Keyspace Key format Value Purpose
records did\0collection\0rkey\0rev CID + DAG-CBOR (empty = tombstone) Versioned record history
events u64 BE Compact binary event Event log for cursor replay
meta string key varies Cursor, sequence counter, backfill queue, RIBLT cache
repo_state DID bytes RepoState (rev + status + denied + backfilled) Per-repo state
did_to_doc DID bytes Timestamped JSON DID document Identity cache
handle_to_did handle bytes DID string Handle-to-DID mapping
blobs blob key blob data Blob storage
blob_meta blob key metadata Blob metadata

Records are versioned by including the repository revision in the key. Each create/update appends a new entry; deletes append a tombstone (empty value). The latest version of a record is the last entry in a prefix scan over did\0collection\0rkey\0. This provides a built-in change history without a separate temporal keyspace.

Event encoding#

Events are stored in a compact binary format with tag-based versioning:

  • V1 tags (0x01-0x03): 1B tag + 8B sequence + variable fields
  • V2 tags (0x04-0x06): 1B tag + 8B sequence + 8B timestamp (microseconds) + variable fields

Both commit operations, identity events, and account events have dedicated compact encodings. The V2 format adds microsecond timestamps for latency tracking. Events are optionally compressed with zstd dictionary compression before storage.

API#

Health and metrics#

  • GET /_health — returns {"status":"ok","version":"1.2.0"}
  • GET /metrics — Prometheus text format (includes HTTP metrics, pipeline counters, tokio task metrics, fan-out queue depths)

XRPC endpoints#

Method Endpoint Description
GET com.atproto.repo.getRecord Fetch a single record by repo/collection/rkey
GET com.atproto.repo.listRecords List records in a collection with cursor pagination
GET com.atproto.repo.describeRepo Repo metadata, collections, rev, DID document
GET com.atproto.identity.resolveIdentity Resolve DID or handle to DID document (supports Cache-Control: max-stale=N)
GET com.atproto.identity.resolveHandle Resolve handle to DID
GET com.atproto.identity.resolveDid Fetch DID document
GET dev.ngerakines.ramjet.stream.subscribe WebSocket event stream with cursor replay and consumer group support
GET dev.ngerakines.ramjet.repos.getReconciliation RIBLT sketch for set reconciliation

Set reconciliation (RIBLT)#

The getReconciliation endpoint returns an RIBLT sketch for a DID's tracked records, allowing consumers to efficiently determine which records they're missing without transferring the full record list.

GET /xrpc/dev.ngerakines.ramjet.repos.getReconciliation?did=did:plc:abc123

Returns application/x-riblt binary with response headers:

  • ETag — repo rev (supports If-None-Match for 304 Not Modified)
  • X-Riblt-Records — number of records encoded
  • X-Riblt-Rev — repo revision at time of sketch generation

Optional collection parameter filters the sketch to a single collection.

Client workflow:

  1. Build a local sketch from your records using the same symbol encoding (collection\0rkey\0cid)
  2. Fetch Ramjet's sketch
  3. Subtract and decode — the symmetric difference tells you which records you're missing (and which you have that Ramjet doesn't)
  4. Fetch missing records via getRecord or listRecords

Sketch sizes are allocated in 25,000-cell increments, bumping to the next tier when the record count exceeds 50% past the current size. Sketches are cached per-DID and automatically invalidated when records change.

Admin endpoints (JWT auth required)#

Method Endpoint Description
GET dev.ngerakines.ramjet.repos.getState Get repo state (rev, status, denied)
POST dev.ngerakines.ramjet.repos.setState Set repo denied flag
POST dev.ngerakines.ramjet.repos.resync Queue a repo for backfill
POST dev.ngerakines.ramjet.repos.rebuildReconciliation Force rebuild of RIBLT sketch cache

Other endpoints#

Method Endpoint Description
GET /dictionary Download the active zstd dictionary (supports If-None-Match with CID ETags)

Development#

cargo check          # type-check
cargo test           # run tests (50 unit tests)
cargo fmt            # format code
cargo build          # debug build

Binaries#

  • ramjet — main service
  • ramjet-writer — benchmark binary for ingester-to-writer pipeline analysis
  • ramjet-data — data inspection tool
  • ramjet-dictgen — zstd dictionary generator
  • ramjet-forecast — capacity forecasting
  • ramjet-consumer — example WebSocket consumer
  • rjtop — TUI dashboard

License#

MIT