tap (firehose sync)#
leaflet-search uses TAP from bluesky-social/indigo to receive real-time events from the ATProto firehose.
what is tap?#
tap subscribes to the ATProto firehose, filters for specific collections (e.g., pub.leaflet.document), and broadcasts matching events to websocket clients. it also does initial crawling/backfilling of existing records.
key behavior: TAP backfills historical data when repos are added. when a repo is added to tracking:
- TAP fetches the full repo from the account's PDS using
com.atproto.sync.getRepo - live firehose events during backfill are buffered in memory
- historical events (marked
live: false) are delivered first - after historical events complete, buffered live events are released
- subsequent firehose events arrive immediately marked as
live: true
TAP enforces strict per-repo ordering - live events are synchronization barriers that require all prior events to complete first.
message format#
TAP sends JSON messages over websocket. record events look like:
{
"type": "record",
"record": {
"live": true,
"did": "did:plc:abc123...",
"rev": "3mbspmpaidl2a",
"collection": "pub.leaflet.document",
"rkey": "3lzyrj6q6gs27",
"action": "create",
"record": { ... },
"cid": "bafyrei..."
}
}
field types (important!)#
| field | type | values | notes |
|---|---|---|---|
| type | string | "record", "identity", "account" | message type |
| action | string | "create", "update", "delete" | NOT an enum! |
| live | bool | true/false | true = firehose, false = resync |
| collection | string | e.g., "pub.leaflet.document" | lexicon collection |
gotchas#
-
action is a string, not an enum - TAP sends
"action": "create"as a JSON string. if your parser expects an enum type, extraction will silently fail. use string comparison. -
collection filters apply to output -
TAP_COLLECTION_FILTERScontrols which records TAP sends to clients. records from other collections are fetched but not forwarded. -
signal collection vs collection filters -
TAP_SIGNAL_COLLECTIONcontrols auto-discovery of repos (which repos to track), whileTAP_COLLECTION_FILTERScontrols which records from those repos to output. a repo must either be auto-discovered via signal collection OR manually added via/repos/add. -
silent extraction failures - if using zat's
extractAt, enable debug logging to see why parsing fails:pub const std_options = .{ .log_scope_levels = &.{.{ .scope = .zat, .level = .debug }}, };this will show messages like:
debug(zat): extractAt: parse failed for Op at path { "op" }: InvalidEnumTag
debugging#
check tap connection#
fly logs -a leaflet-search-tap --no-tail | tail -30
look for:
"connected to firehose"- successfully connected to bsky relay"websocket connected"- backend connected to TAP"dialing failed"/"i/o timeout"- network issues
check backend is receiving#
fly logs -a leaflet-search-backend --no-tail | grep -E "(tap|indexed)"
look for:
tap connected!- connected to TAPtap: msg_type=record- receiving messagesindexed document:- successfully processing
common issues#
| symptom | cause | fix |
|---|---|---|
websocket handshake failed: error.Timeout |
TAP not running or network issue | restart TAP, check regions match |
dialing failed: lookup ... i/o timeout |
DNS issues reaching bsky relay | restart TAP, transient network issue |
| messages received but not indexed | extraction failing (type mismatch) | enable zat debug logging, check field types |
repo shows records: 0 after adding |
resync failed or collection not in filters | check TAP logs for resync errors, verify TAP_COLLECTION_FILTERS |
| new platform records not appearing | platform's collection not in TAP_COLLECTION_FILTERS |
add collection to filters, restart TAP |
TAP API endpoints#
TAP exposes HTTP endpoints for monitoring and control:
| endpoint | description |
|---|---|
/health |
health check |
/stats/repo-count |
number of tracked repos |
/stats/record-count |
total records processed |
/stats/outbox-buffer |
events waiting to be sent |
/stats/resync-buffer |
DIDs waiting to be resynced |
/stats/cursors |
firehose cursor position |
/info/:did |
repo status: {"did":"...","state":"active","records":N} |
/repos/add |
POST with {"dids":["did:plc:..."]} to add repos |
/repos/remove |
POST with {"dids":["did:plc:..."]} to remove repos |
example: check repo status
fly ssh console -a leaflet-search-tap -C "curl -s localhost:2480/info/did:plc:abc123"
example: manually add a repo for backfill
fly ssh console -a leaflet-search-tap -C 'curl -X POST -H "Content-Type: application/json" -d "{\"dids\":[\"did:plc:abc123\"]}" localhost:2480/repos/add'
fly.io deployment#
both TAP and backend should be in the same region for internal networking:
# check current regions
fly status -a leaflet-search-tap
fly status -a leaflet-search-backend
# restart TAP if needed
fly machine restart -a leaflet-search-tap <machine-id>
note: changing primary_region in fly.toml only affects new machines. to move existing machines, clone to new region and destroy old one.