search for standard sites
pub-search.waow.tech/
search
zig
blog
atproto
1# tap (firehose sync)
2
3leaflet-search uses [tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) from bluesky-social/indigo to receive real-time events from the ATProto firehose.
4
5## what is tap?
6
7tap subscribes to the ATProto firehose, filters for specific collections (e.g., `pub.leaflet.document`), and broadcasts matching events to websocket clients. it also does initial crawling/backfilling of existing records.
8
9key behavior: **tap backfills historical data when repos are added**. when a repo is added to tracking:
101. tap fetches the full repo from the account's PDS using `com.atproto.sync.getRepo`
112. live firehose events during backfill are buffered in memory
123. historical events (marked `live: false`) are delivered first
134. after historical events complete, buffered live events are released
145. subsequent firehose events arrive immediately marked as `live: true`
15
16tap enforces strict per-repo ordering - live events are synchronization barriers that require all prior events to complete first.
17
18## message format
19
20tap sends JSON messages over websocket. record events look like:
21
22```json
23{
24 "type": "record",
25 "record": {
26 "live": true,
27 "did": "did:plc:abc123...",
28 "rev": "3mbspmpaidl2a",
29 "collection": "pub.leaflet.document",
30 "rkey": "3lzyrj6q6gs27",
31 "action": "create",
32 "record": { ... },
33 "cid": "bafyrei..."
34 }
35}
36```
37
38### field types (important!)
39
40| field | type | values | notes |
41|-------|------|--------|-------|
42| type | string | "record", "identity", "account" | message type |
43| action | **string** | "create", "update", "delete" | NOT an enum! |
44| live | bool | true/false | true = firehose, false = resync |
45| collection | string | e.g., "pub.leaflet.document" | lexicon collection |
46
47## gotchas
48
491. **action is a string, not an enum** - tap sends `"action": "create"` as a JSON string. if your parser expects an enum type, extraction will silently fail. use string comparison.
50
512. **collection filters apply to output** - `TAP_COLLECTION_FILTERS` controls which records tap sends to clients. records from other collections are fetched but not forwarded.
52
533. **signal collection vs collection filters** - `TAP_SIGNAL_COLLECTION` controls auto-discovery of repos (which repos to track), while `TAP_COLLECTION_FILTERS` controls which records from those repos to output. a repo must either be auto-discovered via signal collection OR manually added via `/repos/add`.
54
554. **silent extraction failures** - if using zat's `extractAt`, enable debug logging to see why parsing fails:
56 ```zig
57 pub const std_options = .{
58 .log_scope_levels = &.{.{ .scope = .zat, .level = .debug }},
59 };
60 ```
61 this will show messages like:
62 ```
63 debug(zat): extractAt: parse failed for Op at path { "op" }: InvalidEnumTag
64 ```
65
66## memory and performance tuning
67
68tap loads **entire repo CARs into memory** during resync. some bsky users have repos that are 100-300MB+. this causes spiky memory usage that can OOM the machine.
69
70### recommended settings for leaflet-search
71
72```toml
73[[vm]]
74 memory = '2gb' # 1gb is not enough
75
76[env]
77 TAP_RESYNC_PARALLELISM = '1' # only one repo CAR in memory at a time (default: 5)
78 TAP_FIREHOSE_PARALLELISM = '5' # concurrent event processors (default: 10)
79 TAP_OUTBOX_CAPACITY = '10000' # event buffer size (default: 100000)
80 TAP_IDENT_CACHE_SIZE = '10000' # identity cache entries (default: 2000000)
81```
82
83### why these values?
84
85- **2GB memory**: 1GB causes OOM kills when resyncing large repos
86- **resync parallelism 1**: prevents multiple large CARs in memory simultaneously
87- **lower firehose/outbox**: we track ~1000 repos, not millions - defaults are overkill
88- **smaller ident cache**: we don't need 2M cached identities
89
90if tap keeps OOM'ing, check logs for large repo resyncs:
91```bash
92fly logs -a leaflet-search-tap | grep "parsing repo CAR" | grep -E "size\":[0-9]{8,}"
93```
94
95## quick status check
96
97from the `tap/` directory:
98```bash
99just check
100```
101
102shows tap machine state, most recent indexed date, and 7-day timeline. useful for verifying indexing is working after restarts.
103
104example output:
105```
106=== tap status ===
107app 781417db604d48 23 ewr started ...
108
109=== Recent Indexing Activity ===
110Last indexed: 2026-01-08 (14 docs)
111Today: 2026-01-11
112Docs: 3742 | Pubs: 1231
113
114=== Timeline (last 7 days) ===
1152026-01-08: 14 docs
1162026-01-07: 29 docs
117...
118```
119
120if "Last indexed" is more than a day behind "Today", tap may be down or catching up.
121
122## checking catch-up progress
123
124when tap restarts after downtime, it replays the firehose from its saved cursor. to check progress:
125
126```bash
127# see current firehose position (look for timestamps in log messages)
128fly logs -a leaflet-search-tap | grep -E '"time".*"seq"' | tail -3
129```
130
131the `"time"` field in log messages shows how far behind tap is. compare to current time to estimate catch-up.
132
133catch-up speed varies:
134- **~0.3x** when resync queue is full (large repos being fetched)
135- **~1x or faster** once resyncs clear
136
137## debugging
138
139### check tap connection
140```bash
141fly logs -a leaflet-search-tap --no-tail | tail -30
142```
143
144look for:
145- `"connected to firehose"` - successfully connected to bsky relay
146- `"websocket connected"` - backend connected to tap
147- `"dialing failed"` / `"i/o timeout"` - network issues
148
149### check backend is receiving
150```bash
151fly logs -a leaflet-search-backend --no-tail | grep -E "(tap|indexed)"
152```
153
154look for:
155- `tap connected!` - connected to tap
156- `tap: msg_type=record` - receiving messages
157- `indexed document:` - successfully processing
158
159### common issues
160
161| symptom | cause | fix |
162|---------|-------|-----|
163| tap machine stopped, `oom_killed=true` | large repo CARs exhausted memory | increase memory to 2GB, reduce `TAP_RESYNC_PARALLELISM` to 1 |
164| `websocket handshake failed: error.Timeout` | tap not running or network issue | restart tap, check regions match |
165| `dialing failed: lookup ... i/o timeout` | DNS issues reaching bsky relay | restart tap, transient network issue |
166| messages received but not indexed | extraction failing (type mismatch) | enable zat debug logging, check field types |
167| repo shows `records: 0` after adding | resync failed or collection not in filters | check tap logs for resync errors, verify `TAP_COLLECTION_FILTERS` |
168| new platform records not appearing | platform's collection not in `TAP_COLLECTION_FILTERS` | add collection to filters, restart tap |
169| indexing stopped, tap shows "started" | tap catching up from downtime | check firehose position in logs, wait for catch-up |
170
171## tap API endpoints
172
173tap exposes HTTP endpoints for monitoring and control:
174
175| endpoint | description |
176|----------|-------------|
177| `/health` | health check |
178| `/stats/repo-count` | number of tracked repos |
179| `/stats/record-count` | total records processed |
180| `/stats/outbox-buffer` | events waiting to be sent |
181| `/stats/resync-buffer` | DIDs waiting to be resynced |
182| `/stats/cursors` | firehose cursor position |
183| `/info/:did` | repo status: `{"did":"...","state":"active","records":N}` |
184| `/repos/add` | POST with `{"dids":["did:plc:..."]}` to add repos |
185| `/repos/remove` | POST with `{"dids":["did:plc:..."]}` to remove repos |
186
187example: check repo status
188```bash
189fly ssh console -a leaflet-search-tap -C "curl -s localhost:2480/info/did:plc:abc123"
190```
191
192example: manually add a repo for backfill
193```bash
194fly ssh console -a leaflet-search-tap -C 'curl -X POST -H "Content-Type: application/json" -d "{\"dids\":[\"did:plc:abc123\"]}" localhost:2480/repos/add'
195```
196
197## fly.io deployment
198
199both tap and backend should be in the same region for internal networking:
200
201```bash
202# check current regions
203fly status -a leaflet-search-tap
204fly status -a leaflet-search-backend
205
206# restart tap if needed
207fly machine restart -a leaflet-search-tap <machine-id>
208```
209
210note: changing `primary_region` in fly.toml only affects new machines. to move existing machines, clone to new region and destroy old one.
211
212## references
213
214- [tap source (bluesky-social/indigo)](https://github.com/bluesky-social/indigo/tree/main/cmd/tap)
215- [ATProto firehose docs](https://atproto.com/specs/sync#firehose)