Ruby gem for ingesting ATProto repo data from a Tap service (extension of Skyfall gem)
1# Tapfall
2
3A Ruby gem for ingesting ATProto repository data from a [Tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) service (extension of the [Skyfall](https://tangled.org/mackuba.eu/skyfall) gem).
4
5> [!NOTE]
6> Part of ATProto Ruby SDK: [ruby.sdk.blue](https://ruby.sdk.blue)
7
8
9## What does it do
10
11[Tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) is a [tool made by Bluesky](https://docs.bsky.app/blog/introducing-tap), which combines a firehose client/adapter with a JSON output like [Jetstream](https://github.com/bluesky-social/jetstream) and a repository/PDS crawler and importer. It can be useful if you're building some kind of backend app that needs to import and store some set of records from the Atmosphere. It's meant to be run locally on your app's server, with only your app connecting to it, and it simplifies a lot of code for you if you need to both import & backfill existing records of some kind and also stream any new ones that are added after you start the import.
12
13Basically, before Tap, your app code needed to:
14
151) Connect to a relay or Jetstream
162) Filter only records of selected kinds
173) Find out which repos on which PDSes have records that are relevant to you
184) Connect to those PDSes and get those records via listRecords or getRepo
195) Handle possible duplicates between imported repo and the firehose
20
21If you only want some kinds of records from some specific repos, that still leaves you with both a firehose client and a repo importer and merging the results from both somehow.
22
23With Tap, you only need to:
24
251) Run Tap, passing it a list of record types to sync in command line parameters or in env
262) Connect to the Tap stream on localhost
273) Save everything coming from the stream
28
29So instead of two ways of importing the records, you only have one and it's the much less involved one. And this library also handles a lot of this for you.
30
31**Tapfall** is an extension of [Skyfall](https://tangled.org/mackuba.eu/skyfall), which is a gem for streaming records from a relay/PDS firehose or Jetstream, and it adds support for the event format used by Tap and for some additional HTTP APIs it provides.
32
33
34## Installation
35
36Tapfall should run on any somewhat recent version of Ruby (3.x/4.x), although it's recommended to use one that's still getting maintenance updates, ideally the latest one. In production, it's also recommended to install it with [YJIT support](https://shopify.engineering/ruby-yjit-is-production-ready) and with [jemalloc](https://scalingo.com/blog/improve-ruby-application-memory-jemalloc). A compatible version should be available on most Linux systems, otherwise you can install one using tools such as [RVM](https://rvm.io), [asdf](https://asdf-vm.com), [ruby-install](https://github.com/postmodern/ruby-install) or [ruby-build](https://github.com/rbenv/ruby-build), or `rpm` or `apt-get` on Linux (see more installation options on [ruby-lang.org](https://www.ruby-lang.org/en/downloads/)).
37
38To use it in your app, add this to your `Gemfile`:
39
40 gem 'tapfall'
41
42
43## Usage
44
45Create a `Tapfall::Stream` object, specifying the address of the Tap service websocket:
46
47```rb
48require 'tapfall'
49
50tap = Tapfall::Stream.new('ws://localhost:2480')
51```
52
53You can also just pass a hostname, but then it's interpreted as HTTPS/WSS, which might not be what you want.
54
55Next, set up event listeners to handle incoming messages and get notified of errors. Here are all the available listeners (you will need at least `on_message`):
56
57```rb
58# this gives you a parsed message object, one of subclasses of Tapfall::TapMessage
59tap.on_message { |msg| p msg }
60
61# lifecycle events
62tap.on_connecting { |url| puts "Connecting to #{url}..." }
63tap.on_connect { puts "Connected" }
64tap.on_disconnect { puts "Disconnected" }
65tap.on_reconnect { puts "Connection lost, trying to reconnect..." }
66tap.on_timeout { puts "Connection stalled, triggering a reconnect..." }
67
68# handling errors (there's a default error handler that does exactly this)
69tap.on_error { |e| puts "ERROR: #{e}" }
70```
71
72You can also call these as setters accepting a `Proc` – e.g. to disable default error handling, you can do:
73
74```rb
75tap.on_error = nil
76```
77
78When you're ready, open the connection by calling `connect`:
79
80```rb
81tap.connect
82```
83
84The `#connect` method blocks until the connection is explicitly closed with `#disconnect` from an event or interrupt handler. Tapfall & Skyfall use [EventMachine](https://github.com/eventmachine/eventmachine) under the hood, so in order to run some things in parallel, you can use e.g. `EM::PeriodicTimer`.
85
86Tapfall also supports Skyfall's `on_raw_message` handler version, but only if you use Tap in "disable acks" mode (see below), which is not recommended beyond testing, unless you're doing the acks yourself. (This is because Tapfall needs to parse the message into a JSON form in order to get the `id` of the event to send the "ack".)
87
88> [!NOTE]
89> Unlike standard firehose and Jetstream, Tap streams don't have a cursor that you store and pass when reconnecting. It's meant to be used only by one client, and it tracks internally itself which events have been sent to you and which weren't. You can think about it this way: it's not a public service like Jetstream that you can share with others, it's a microservice that you run as a component of your app.
90
91
92### Acks
93
94Tap by default runs in a mode where it expects the client to send back an "ack" after receiving and processing each event. When it gets the ack, it marks the event as processed and will not send it again. If you don't send an ack, it tries to retransmit the event after a moment.
95
96You can also run it with acks disabled, by passing a `--disable-acks` option or `TAP_DISABLE_ACKS=true` env var, in which case it will assume an event has been processed as soon as it's sent to you. This is not recommended to do in production, since if your process crashes during an event processing loop, that event will be lost (and you can't ask for an earlier cursor because there's no cursor).
97
98Tapfall handles the acks for you automatically. If you want it to not send acks, pass an `:ack => false` option to the constructor:
99
100```rb
101tap = Tapfall::Stream.new(server, { ack: false })
102```
103
104### Password-protected access
105
106Tap also lets you set an admin password, which you can set with the `--admin-password` option or `TAP_ADMIN_PASSWORD` env var. This locks the stream and the API behind HTTP Basic auth (with the user `admin`). Pass the password to Tapfall constructor like this:
107
108```rb
109tap = Tapfall::Stream.new(server { admin_password: 'abracadabra' })
110```
111
112### Processing messages
113
114Each message passed to `on_message` is an instance of a subclass of `Tapfall::TapMessage`. The main event type is `Tapfall::RecordMessage`, which includes a record operation; you will also receive `Tapfall::IdentityMessage` events, which provide info about an account change like changed handle or migration to a new PDS. `UnknownMessage` might be sent if new unrecognized message types are sent in the future.
115
116All message types share these properties:
117
118- `type` (symbol) – the message type identifier, e.g. `:record`
119- `id` (integer), aliased as `seq` – a sequential index of the message
120
121The `:record` messages have an `operation` method (aliased as `op`), which returns an `Operation` object with details of an create/update/delete operation done a record. (For symmetry with the `Skyfall::Firehose` stream version, there's also an `operations` method which returns an array.)
122
123An `Operation` has such fields (also matching the API of `Skyfall::Firehose::Operation` and `Skyfall::Jetstream::Operation`):
124
125- `repo` or `did` (string) – DID of the repository (user account)
126- `collection` (string) – name of the collection / record type, e.g. `app.bsky.feed.post` for posts
127- `type` (symbol) – short name of the collection, e.g. `:bsky_post`
128- `rkey` (string) – identifier of a record in a collection
129- `path` (string) – the path part of the at:// URI – collection name + ID (rkey) of the item
130- `uri` (string) – the complete at:// URI
131- `action` (symbol) – `:create`, `:update` or `:delete`
132- `cid` (CID) – CID of the operation/record (`nil` for delete operations)
133- `live?` (boolean) – true if the record was received from the firehose, false if it was backfilled from the repo
134
135Create and update operations will also have an attached record (JSON object) with details of the post, like etc. The record data is currently available as a Ruby hash via `raw_record` property (custom types may be added in future).
136
137So for example, in order to filter only "create post" operations and print their details, you can do something like this:
138
139```rb
140tap.on_message do |m|
141 next if m.type != :record
142
143 m.operations.each do |op|
144 next unless op.action == :create && op.type == :bsky_post
145
146 puts "#{op.repo}:"
147 puts op.raw_record['text']
148 puts
149 end
150end
151```
152
153> [!NOTE]
154> If you're doing a full network backfill of some app.bsky.* lexicons, that's going to be a *lot* of events that Tap will be sending to you, on localhost (so not limited by network bandwidth), likely in large bursts. In that case it's [recommended](https://bsky.app/profile/did:plc:ragtjsm2j2vknwkz3zp4oxrd/post/3mawmnwukws2w) to try to do as little processing as possible in the event handling loop, and especially avoid any sync network requests there. If you're working with a limited number of repos and/or with non-Bluesky lexicons only, this is probably much less of an issue.
155
156
157### Note on custom lexicons
158
159Note that the `Operation` objects have two properties that tell you the kind of record they're about: `#collection`, which is a string containing the official name of the collection/lexicon, e.g. `"app.bsky.feed.post"`; and `#type`, which is a symbol meant to save you some typing, e.g. `:bsky_post`.
160
161When Tapfall receives a message about a record type that's not on the list, whether in the `app.bsky` namespace or not, the operation `type` will be `:unknown`, while the `collection` will be the original string. So if an app like e.g. "Skygram" appears with a `zz.skygram.*` namespace that lets you share photos on ATProto, the operations will have a type `:unknown` and collection names like `zz.skygram.feed.photo`, and you can check the `collection` field for record types known to you and process them in some appropriate way, even if Tapfall doesn't recognize the record type.
162
163Do not however check if such operations have a `type` equal to `:unknown` first – just ignore the type and only check the `collection` string. The reason is that some next version might start recognizing those records and add a new `type` value for them like e.g. `:skygram_photo`, and then they won't match your condition anymore.
164
165
166### Reconnection logic
167
168See the section in the [Skyfall readme](https://tangled.org/mackuba.eu/skyfall#reconnection-logic) about the options for handling reconnecting to a flaky firehose – but since in this case the Tap service will run under your control, likely on the same machine, this might not be as useful in practice.
169
170
171## HTTP API
172
173Apart from the `/channel` websocket endpoint, Tap also has [a few other endpoints](https://github.com/bluesky-social/indigo/tree/main/cmd/tap#http-api) for adding/removing repos and checking various stats.
174
175You can call all the below methods either on the `Tapfall::Stream` instance you use for connecting to the websocket, or on a separate `Tapfall::API` object if you prefer.
176
177Currently implemented endpoints:
178
179### /repos/add
180
181Tap can work in three possible ways regarding the subset of repos it tracks:
182
1831) `TAP_FULL_NETWORK`, when it tracks *all repos everywhere*
1842) `TAP_SIGNAL_COLLECTION`, when it finds and tracks all repos that have some specific types of records you're interested in
1853) default mode, when it only tracks repos you've added manually
186
187In that third mode, use this to add repos to the tracking list:
188
189```rb
190@tap.add_repo('did:plc:uh4errluyq5thgszrwwrtpuq')
191
192# or:
193@tap.add_repos(did_list)
194```
195
196### /repos/remove
197
198To remove repos from the list, use:
199
200```rb
201@tap.remove_repo('did:plc:uh4errluyq5thgszrwwrtpuq')
202
203# or:
204@tap.remove_repos(did_list)
205```
206
207
208## Credits
209
210Copyright © 2025 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/did:plc:oio4hkxaop4ao4wz2pp3f4cr)).
211
212The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT).
213
214Bug reports and pull requests are welcome 😎