search for standard sites pub-search.waow.tech/
search zig blog atproto
1# tap (firehose sync) 2 3leaflet-search uses [tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) from bluesky-social/indigo to receive real-time events from the ATProto firehose. 4 5## what is tap? 6 7tap subscribes to the ATProto firehose, filters for specific collections (e.g., `pub.leaflet.document`), and broadcasts matching events to websocket clients. it also does initial crawling/backfilling of existing records. 8 9key behavior: **tap backfills historical data when repos are added**. when a repo is added to tracking: 101. tap fetches the full repo from the account's PDS using `com.atproto.sync.getRepo` 112. live firehose events during backfill are buffered in memory 123. historical events (marked `live: false`) are delivered first 134. after historical events complete, buffered live events are released 145. subsequent firehose events arrive immediately marked as `live: true` 15 16tap enforces strict per-repo ordering - live events are synchronization barriers that require all prior events to complete first. 17 18## message format 19 20tap sends JSON messages over websocket. record events look like: 21 22```json 23{ 24 "type": "record", 25 "record": { 26 "live": true, 27 "did": "did:plc:abc123...", 28 "rev": "3mbspmpaidl2a", 29 "collection": "pub.leaflet.document", 30 "rkey": "3lzyrj6q6gs27", 31 "action": "create", 32 "record": { ... }, 33 "cid": "bafyrei..." 34 } 35} 36``` 37 38### field types (important!) 39 40| field | type | values | notes | 41|-------|------|--------|-------| 42| type | string | "record", "identity", "account" | message type | 43| action | **string** | "create", "update", "delete" | NOT an enum! | 44| live | bool | true/false | true = firehose, false = resync | 45| collection | string | e.g., "pub.leaflet.document" | lexicon collection | 46 47## gotchas 48 491. **action is a string, not an enum** - tap sends `"action": "create"` as a JSON string. if your parser expects an enum type, extraction will silently fail. use string comparison. 50 512. **collection filters apply to output** - `TAP_COLLECTION_FILTERS` controls which records tap sends to clients. records from other collections are fetched but not forwarded. 52 533. **signal collection vs collection filters** - `TAP_SIGNAL_COLLECTION` controls auto-discovery of repos (which repos to track), while `TAP_COLLECTION_FILTERS` controls which records from those repos to output. a repo must either be auto-discovered via signal collection OR manually added via `/repos/add`. 54 554. **silent extraction failures** - if using zat's `extractAt`, enable debug logging to see why parsing fails: 56 ```zig 57 pub const std_options = .{ 58 .log_scope_levels = &.{.{ .scope = .zat, .level = .debug }}, 59 }; 60 ``` 61 this will show messages like: 62 ``` 63 debug(zat): extractAt: parse failed for Op at path { "op" }: InvalidEnumTag 64 ``` 65 66## memory and performance tuning 67 68tap loads **entire repo CARs into memory** during resync. some bsky users have repos that are 100-300MB+. this causes spiky memory usage that can OOM the machine. 69 70### recommended settings for leaflet-search 71 72```toml 73[[vm]] 74 memory = '2gb' # 1gb is not enough 75 76[env] 77 TAP_RESYNC_PARALLELISM = '1' # only one repo CAR in memory at a time (default: 5) 78 TAP_FIREHOSE_PARALLELISM = '5' # concurrent event processors (default: 10) 79 TAP_OUTBOX_CAPACITY = '10000' # event buffer size (default: 100000) 80 TAP_IDENT_CACHE_SIZE = '10000' # identity cache entries (default: 2000000) 81``` 82 83### why these values? 84 85- **2GB memory**: 1GB causes OOM kills when resyncing large repos 86- **resync parallelism 1**: prevents multiple large CARs in memory simultaneously 87- **lower firehose/outbox**: we track ~1000 repos, not millions - defaults are overkill 88- **smaller ident cache**: we don't need 2M cached identities 89 90if tap keeps OOM'ing, check logs for large repo resyncs: 91```bash 92fly logs -a leaflet-search-tap | grep "parsing repo CAR" | grep -E "size\":[0-9]{8,}" 93``` 94 95## quick status check 96 97from the `tap/` directory: 98```bash 99just check 100``` 101 102shows tap machine state, most recent indexed date, and 7-day timeline. useful for verifying indexing is working after restarts. 103 104example output: 105``` 106=== tap status === 107app 781417db604d48 23 ewr started ... 108 109=== Recent Indexing Activity === 110Last indexed: 2026-01-08 (14 docs) 111Today: 2026-01-11 112Docs: 3742 | Pubs: 1231 113 114=== Timeline (last 7 days) === 1152026-01-08: 14 docs 1162026-01-07: 29 docs 117... 118``` 119 120if "Last indexed" is more than a day behind "Today", tap may be down or catching up. 121 122## checking catch-up progress 123 124when tap restarts after downtime, it replays the firehose from its saved cursor. to check progress: 125 126```bash 127# see current firehose position (look for timestamps in log messages) 128fly logs -a leaflet-search-tap | grep -E '"time".*"seq"' | tail -3 129``` 130 131the `"time"` field in log messages shows how far behind tap is. compare to current time to estimate catch-up. 132 133catch-up speed varies: 134- **~0.3x** when resync queue is full (large repos being fetched) 135- **~1x or faster** once resyncs clear 136 137## debugging 138 139### check tap connection 140```bash 141fly logs -a leaflet-search-tap --no-tail | tail -30 142``` 143 144look for: 145- `"connected to firehose"` - successfully connected to bsky relay 146- `"websocket connected"` - backend connected to tap 147- `"dialing failed"` / `"i/o timeout"` - network issues 148 149### check backend is receiving 150```bash 151fly logs -a leaflet-search-backend --no-tail | grep -E "(tap|indexed)" 152``` 153 154look for: 155- `tap connected!` - connected to tap 156- `tap: msg_type=record` - receiving messages 157- `indexed document:` - successfully processing 158 159### common issues 160 161| symptom | cause | fix | 162|---------|-------|-----| 163| tap machine stopped, `oom_killed=true` | large repo CARs exhausted memory | increase memory to 2GB, reduce `TAP_RESYNC_PARALLELISM` to 1 | 164| `websocket handshake failed: error.Timeout` | tap not running or network issue | restart tap, check regions match | 165| `dialing failed: lookup ... i/o timeout` | DNS issues reaching bsky relay | restart tap, transient network issue | 166| messages received but not indexed | extraction failing (type mismatch) | enable zat debug logging, check field types | 167| repo shows `records: 0` after adding | resync failed or collection not in filters | check tap logs for resync errors, verify `TAP_COLLECTION_FILTERS` | 168| new platform records not appearing | platform's collection not in `TAP_COLLECTION_FILTERS` | add collection to filters, restart tap | 169| indexing stopped, tap shows "started" | tap catching up from downtime | check firehose position in logs, wait for catch-up | 170 171## tap API endpoints 172 173tap exposes HTTP endpoints for monitoring and control: 174 175| endpoint | description | 176|----------|-------------| 177| `/health` | health check | 178| `/stats/repo-count` | number of tracked repos | 179| `/stats/record-count` | total records processed | 180| `/stats/outbox-buffer` | events waiting to be sent | 181| `/stats/resync-buffer` | DIDs waiting to be resynced | 182| `/stats/cursors` | firehose cursor position | 183| `/info/:did` | repo status: `{"did":"...","state":"active","records":N}` | 184| `/repos/add` | POST with `{"dids":["did:plc:..."]}` to add repos | 185| `/repos/remove` | POST with `{"dids":["did:plc:..."]}` to remove repos | 186 187example: check repo status 188```bash 189fly ssh console -a leaflet-search-tap -C "curl -s localhost:2480/info/did:plc:abc123" 190``` 191 192example: manually add a repo for backfill 193```bash 194fly ssh console -a leaflet-search-tap -C 'curl -X POST -H "Content-Type: application/json" -d "{\"dids\":[\"did:plc:abc123\"]}" localhost:2480/repos/add' 195``` 196 197## fly.io deployment 198 199both tap and backend should be in the same region for internal networking: 200 201```bash 202# check current regions 203fly status -a leaflet-search-tap 204fly status -a leaflet-search-backend 205 206# restart tap if needed 207fly machine restart -a leaflet-search-tap <machine-id> 208``` 209 210note: changing `primary_region` in fly.toml only affects new machines. to move existing machines, clone to new region and destroy old one. 211 212## references 213 214- [tap source (bluesky-social/indigo)](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) 215- [ATProto firehose docs](https://atproto.com/specs/sync#firehose)