···11+## Unreleased
22+33+The main change in this version is that inline YARD documentation has been added. This was also a good opportunity to review some APIs and tweak some things in order to get Skyfall a bit closer to a 1.0.
44+55+New APIs:
66+77+- added `Skyfall::Jetstream::CommitMessage#operation` (aliased as `op`) which returns the (always single) operation in the `operations` array
88+- added `#kind` as alias for `#type` in both `Message` classes
99+- added a base class for error types, `Skyfall::Error`
1010+- added `#blocks` to `Skyfall::Firehose::SyncMessage`
1111+- added `#rev`, `#since` and `#prev_data` to `Skyfall::Firehose::CommitMessage`
1212+1313+Deprecated & removed APIs:
1414+1515+- removed deprecated `HandleMessage` and `TombstoneMessage` message classes
1616+- removed deprecated `CommitMessage#prev`
1717+- deprecated `#path` in both `Operation` classes
1818+1919+Optimizations:
2020+2121+- much faster `Skyfall::Firehose::Message#time` parsing on Ruby 3.2+
2222+- lazy decoding of sections in `CarArchive` โ saves quite a lot of work if sections are only accessed through `Operation#raw_record`
2323+- added `frozen_string_literal: true` in all files to reduce garbage collection
2424+2525+Access level changes:
2626+2727+- restricted `Stream#start_heartbeat_timer` & `Stream#stop_heartbeat_timer` methods access to private
2828+- restricted `Stream#handle_message` method access to protected
2929+- restricted `Stream#last_update` to read-only access
3030+- restricted `#inspectable_variables` methods access to either private or protected
3131+- relaxed `Stream#build_websocket_url` & `Stream#build_websocket_client` methods access from private to protected
3232+- fixed private class method `Skyfall::Firehose::Message.decode_cbor_objects` which wasn't actually private
3333+3434+Additional validations and other changes:
3535+3636+- `Stream#connect` throws an error if neither `on_message` nor `on_raw_message` handlers have been configured
3737+- `Message` subclasses do additional checks if the fields they require to not be nil aren't nil
3838+- `Message` subclasses raise an error if `.new` is called on a subclass (and not on the base `Message`) passing the data of a wrong kind of message (instead of returning e.g. a `CommitMessage` from `AccountMessage.new` as it worked previously)
3939+- made `LabelsMessage` a subclass of `Firehose::Message`
4040+- fixed the `require`s config in some files so they can be loaded in any order
4141+4242+4343+## [0.6.1] - 2026-01-08
4444+4545+- added `:bsky_notif_declaration` shortcode for `app.bsky.notification.declaration` collection
4646+- throw error when trying to run two streams in one process (see b4a1514f5da28983205765e55724b5c4abe6c5e4 for details)
4747+- added protected `#send_data` and `#socket` methods in `Stream` for use in `Stream` subclasses (currently for the Tapfall gem)
4848+- added a way to customize headers sent when connecting in `Stream` subclasses through the `#request_headers` method
4949+5050+## [0.6.0] - 2025-06-25
5151+5252+- significantly speeded up reading of events from the binary firehose (`Skyfall::Firehose`) โ up to 4-5x faster than before
5353+- removed the `Skyfall::Stream.new` constructor deprecated in 0.5.0
5454+5555+## [0.5.1] - 2025-05-18
5656+5757+- added support for the new `#sync` message type
5858+- added `#unknown?` helper in `Skyfall::Firehose::Message` and `Skyfall::Jetstream::Message`, which returns true if the message type is `UnknownMessage`
5959+- added `:bsky_verification` and `:bsky_actor_status` lexicon type and shortcode
6060+- added one missing require
6161+6262+## [0.5.0] - 2024-11-15
6363+6464+Jetstream support! You can now connect to [Jetstream](https://github.com/bluesky-social/jetstream) sources using `Skyfall::Jetstream` (see readme).
6565+6666+This required some breaking changes in the existing API:
6767+6868+- `Skyfall::Stream` has been renamed to `Skyfall::Firehose`, `Skyfall::Stream` is now a base class of both `Firehose` and `Jetstream`; the existing `Skyfall::Stream` constructor works for now but will be removed soon
6969+- `Skyfall::WebsocketMessage` and its subclasses have been separated into two parallel families under `Skyfall::Firehose` and `Skyfall::Jetstream`, with the base classes just named `Message`
7070+- same thing happened with `Skyfall::Operation`
7171+- `data_object` and `type_object` properties in `WebsocketMessage` are considered semi-private API now ("nodoc")
7272+7373+In most cases, you should only need to update the `Skyfall::Stream` class name in the constructor. If you've referenced message classes like `Skyfall::CommitMessage` directly, it's probably better to just check the `#type` property instead.
7474+7575+Also, small change to the user agent API: `Skyfall::Stream` now has an additional metod `version_string`, which will always return `Skyfall/0.x.y` โ it's recommended to use that instead of `default_user_agent` to build your own user agent string that includes the library version. `default_user_agent` now passes through to `version_string`, but it could be changed in future to return something else.
7676+7777+## [0.4.1] - 2024-10-04
7878+7979+- performance fix โ don't decode CAR sections which aren't needed, which is most of them; this cuts the amount of memory that GC has to free up by about one third, and should speed up processing by around ~10%
8080+8181+## [0.4.0] - 2024-09-23
8282+8383+- (re)added a "hearbeat" feature (removed earlier in 0.2.0) to fix the occasional issue when the websocket stops receiving data, but doesn't disconnect (not enabled by default, turn it on by setting `check_heartbeat` to true)
8484+- added a way to set the user agent sent when connecting using the `user_agent` field (default is `"Skyfall/#{version}"`)
8585+- added `app.bsky.feed.postgate` record type
8686+8787+## [0.3.1] - 2024-06-28
8888+8989+- added `app.bsky.graph.starterpack` and `chat.bsky.actor.declaration` record types
9090+- added `#account` event type (`AccountMessage`)
9191+- added `handle` field to `IdentityMessage`
9292+- fixed param validation on `Stream` initialization
9393+- reverted the change that added Ruby stdlib dependencies explicitly to the gemspec, since this causes more problems than it's worth โ only `base64` is left there, since it's the one now required to be listed
9494+9595+## [0.3.0] - 2024-03-21
9696+9797+- added support for labeller firehose, served by labeller services at the `com.atproto.label.subscribeLabels` endpoint (aliased as `:subscribe_labels`)
9898+- the `#labels` messages from the labeller firehose are parsed into a `LabelsMessage`, which includes a `labels` array of `Label` objects
9999+- `Stream` callbacks can now also be assigned via setters, e.g. `stream.on_message = proc { ... }`
100100+- added default error handler to `Stream` which logs the error to `$stdout` โ set `stream.on_error = nil` to disable
101101+- added Ruby stdlib dependencies explicitly to the gemspec โ fixes a warning in Ruby 3.3 when requiring `base64`, which will be extracted as an optional gem in 3.4
102102+103103+## [0.2.5] - 2024-03-14
104104+105105+- added `:bsky_labeler` record type symbol & collection constant
106106+107107+## [0.2.4] - 2024-02-27
108108+109109+- added support for `#identity` message type
110110+- added `Operation#did` as an alias of `#repo`
111111+- added `Stream#reconnect` method which forces the websocket to reconnect
112112+- added some validation for the `cursor` parameter in `Stream` initializer
113113+- the `server` parameter in `Stream` initializer can be a full URL with scheme, which lets you connect to e.g. `ws://localhost` (since by default, `wss://` is used)
114114+- tweaked `#inspect` output of `Stream` and `Operation`
115115+116116+## [0.2.3] - 2023-09-28
117117+118118+- fixed encoding of image CIDs again (they should be wrapped in a `$link` object)
119119+- binary strings are now correctly returned as `$bytes` objects
120120+- added `list`, `listblock` and `threadgate` to record type symbols and collection constants
121121+1122## [0.2.2] - 2023-09-06
21233124- fixed image CIDs returned in the record JSON as CBOR tag objects (they are now returned decoded to the string form)
41255126## [0.2.1] - 2023-08-19
612777-- optimized `WebsocketMessage` parsing performance - lazy parsing of most properties (message decoding should be over 50% faster on average)
128128+- optimized `WebsocketMessage` parsing performance โ lazy parsing of most properties (message decoding should be over 50% faster on average)
8129- added separate subclasses of `WebsocketMessage` for different message types
9130- added support for `#handle`, `#info` and `#tombstone` message types
10131- `UnknownMessage` is returned for unrecognized message types
···12133## [0.2.0] - 2023-07-24
1313414135- switched the websocket library from `websocket-client-simple` to `faye-websocket`, which should make event parsing up to ~30ร faster (!)
1515-- added `auto_reconnect` property to `Stream` (on by default) - if true, it will try to reconnect with an exponential backoff when the websocket disconnects, until you call `Stream#disconnect`
136136+- added `auto_reconnect` property to `Stream` (on by default) โ if true, it will try to reconnect with an exponential backoff when the websocket disconnects, until you call `Stream#disconnect`
1613717138Note:
181391919-- calling `sleep` is no longer needed after connecting - call `connect` on a new thread instead to get previously default behavior of running the event loop asynchronously
140140+- calling `sleep` is no longer needed after connecting โ call `connect` on a new thread instead to get previously default behavior of running the event loop asynchronously
20141- the disconnect event no longer passes an error object in the argument
2121-- there is currently no "heartbeat" feature as in 0.1.x that checks for a stuck connection - but it doesn't seem to be needed
142142+- there is currently no "heartbeat" feature as in 0.1.x that checks for a stuck connection โ but it doesn't seem to be needed
2214323144## [0.1.3] - 2023-07-04
24145
···11The zlib License
2233-Copyright (c) 2023 Jakub Suder
33+Copyright (c) 2026 Jakub Suder
4455This software is provided 'as-is', without any express or implied
66warranty. In no event will the authors be held liable for any damages
+240-23
README.md
···11# Skyfall
2233-๐ค A Ruby gem for streaming data from the Bluesky/AtProto firehose ๐ฆ
33+A Ruby gem for streaming data from the Bluesky/ATProto firehose ๐ฆ
44+55+> [!NOTE]
66+> Part of ATProto Ruby SDK: [ruby.sdk.blue](https://ruby.sdk.blue)
475869## What does it do
71088-Skyfall is a Ruby library for connecting to the *"firehose"* of the Bluesky social network, i.e. a websocket which
99-streams all new posts and everything else happening on the Bluesky network in real time. The code connects to the
1010-websocket endpoint, decodes the messages which are encoded in some binary formats like DAG-CBOR, and returns the data as Ruby objects, which you can filter and save to some kind of database (e.g. in order to create a custom feed).
1111+Skyfall is a Ruby library for connecting to the *"[firehose](https://atproto.com/specs/event-stream)"* of the Bluesky social network, i.e. a websocket which streams all new posts and everything else happening on the Bluesky network in real time. The code connects to the websocket endpoint, decodes the messages which are encoded in some binary formats like DAG-CBOR, and returns the data as Ruby objects, which you can filter and save to some kind of database (e.g. in order to create a custom feed).
1212+1313+Since version 0.5, Skyfall also supports connecting to [Jetstream](https://github.com/bluesky-social/jetstream/) sources, which serve the same kind of stream, but as JSON messages instead of CBOR.
111412151316## Installation
14171515- gem install skyfall
1818+To use Skyfall, you need a reasonably new version of Ruby โ it should run on Ruby 2.6 and above, although it's recommended to use a version that's still getting maintainance updates, i.e. currently 3.2+. A compatible version should be preinstalled on macOS Big Sur and above and on many Linux systems. Otherwise, you can install one using tools such as [RVM](https://rvm.io), [asdf](https://asdf-vm.com), [ruby-install](https://github.com/postmodern/ruby-install) or [ruby-build](https://github.com/rbenv/ruby-build), or `rpm` or `apt-get` on Linux (see more installation options on [ruby-lang.org](https://www.ruby-lang.org/en/downloads/)).
1919+2020+To install the gem, run the command:
2121+2222+ [sudo] gem install skyfall
2323+2424+Or add this to your app's `Gemfile`:
2525+2626+ gem 'skyfall', '~> 0.6'
162717281829## Usage
19302020-Start a connection to the firehose by creating a `Skyfall::Stream` object, passing the server hostname and endpoint name:
3131+### Standard ATProto firehose
3232+3333+To connect to the firehose, start by creating a `Skyfall::Firehose` object, specifying the server hostname and endpoint name:
21342235```rb
2336require 'skyfall'
24372525-sky = Skyfall::Stream.new('bsky.social', :subscribe_repos)
3838+sky = Skyfall::Firehose.new('bsky.network', :subscribe_repos)
2639```
27402828-Add event listeners to handle incoming messages and get notified of errors:
4141+The server name can be just a hostname, or a full URL with a `ws:` or `wss:` scheme, which is useful if you want to use a non-encrypted websocket connection, e.g. `"ws://localhost:8000"`. The endpoint can be either a full NSID string like `"com.atproto.sync.subscribeRepos"`, or one of the defined symbol shortcuts - you will almost always want to pass `:subscribe_repos` here.
4242+4343+Next, set up event listeners to handle incoming messages and get notified of errors. Here are all the available listeners (you will need at least either `on_message` or `on_raw_message`):
29443045```rb
4646+# this gives you a parsed message object, one of subclasses of Skyfall::Firehose::Message
4747+sky.on_message { |msg| p msg }
4848+4949+# this gives you raw binary data as received from the websocket
5050+sky.on_raw_message { |data| p data }
5151+5252+# lifecycle events
5353+sky.on_connecting { |url| puts "Connecting to #{url}..." }
3154sky.on_connect { puts "Connected" }
3255sky.on_disconnect { puts "Disconnected" }
5656+sky.on_reconnect { puts "Connection lost, trying to reconnect..." }
5757+sky.on_timeout { puts "Connection stalled, triggering a reconnect..." }
33583434-sky.on_message { |m| p m }
5959+# handling errors (there's a default error handler that does exactly this)
3560sky.on_error { |e| puts "ERROR: #{e}" }
6161+```
6262+6363+You can also call these as setters accepting a `Proc` - e.g. to disable default error handling, you can do:
6464+6565+```rb
6666+sky.on_error = nil
3667```
37683869When you're ready, open the connection by calling `connect`:
···4172sky.connect
4273```
43747575+The `#connect` method blocks until the connection is explicitly closed with `#disconnect` from an event or interrupt handler. Skyfall uses [EventMachine](https://github.com/eventmachine/eventmachine) under the hood, so in order to run some things in parallel, you can use e.g. `EM::PeriodicTimer`.
7676+7777+7878+### Using a Jetstream source
7979+8080+Alternatively, you can connect to a [Jetstream](https://github.com/bluesky-social/jetstream/) server. Jetstream is a firehose proxy that lets you stream data as simple JSON instead, which uses much less bandwidth, and allows you to pick only a subset of events that you're interested in, e.g. only posts or only from specific accounts. (See the [configuration section](#jetstream-filters) for more info on Jetstream filtering.)
8181+8282+Jetstream connections are made using a `Skyfall::Jetstream` instance, which has more or less the same API as `Skyfall::Firehose`, so it should be possible to switch between those by just changing the line that creates the client instance:
8383+8484+```rb
8585+sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network')
8686+8787+sky.on_message { |msg| ... }
8888+sky.on_error { |e| ... }
8989+sky.on_connect { ... }
9090+...
9191+9292+sky.connect
9393+```
9494+9595+### Cursors
9696+9797+ATProto websocket endpoints implement a "*cursor*" feature to help you make sure that you don't miss anything if your connection is down for a bit (because of a network issue, server restart, deploy etc.). Each message includes a `seq` field, which is the sequence number of the event. You can keep track of the last seq you've seen, and when you reconnect, you pass that number as a cursor parameter - the server will then "replay" all events you might have missed since that last one. (The `bsky.network` Relay firehose currently has a buffer of about 72 hours, though that's not something required by specification.)
9898+9999+To use a cursor when connecting to the firehose, pass it as the third parameter to `Skyfall::Firehose`. You should then regularly save the `seq` of the last event to some permanent storage, and then load it from there when reconnecting.
100100+101101+A full-network firehose sends many hundreds of events per second, so depending on your use case, it might be enough if you save it every n events (e.g. every 100 or 1000) and on clean shutdown:
102102+103103+```rb
104104+cursor = load_cursor
105105+106106+sky = Skyfall::Firehose.new('bsky.network', :subscribe_repos, cursor)
107107+sky.on_message do |msg|
108108+ save_cursor(msg.seq) if msg.seq % 1000 == 0
109109+ process_message(msg)
110110+end
111111+```
112112+113113+Jetstream has a similar mechanism, except the cursor is the event's timestamp in Unix time microseconds instead of just a number incrementing by 1. For `Skyfall::Jetstream`, pass the cursor as a key in an options hash:
114114+115115+```rb
116116+cursor = load_cursor
117117+118118+sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', { cursor: cursor })
119119+sky.on_message do |msg|
120120+ save_cursor(msg.seq)
121121+ process_message(msg)
122122+end
123123+```
124124+4412545126### Processing messages
461274747-Each message passed to `on_message` is an instance of the `WebsocketMessage` class and has such properties:
128128+Each message passed to `on_message` is an instance of a subclass of either `Skyfall::Firehose::Message` or `Skyfall::Jetstream::Message`, depending on the selected source. The supported message types are:
129129+130130+- `CommitMessage` (`#commit`) - represents a change in a user's repo; most messages are of this type
131131+- `IdentityMessage` (`#identity`) - notifies about a change in user's DID document, e.g. a handle change or a migration to a new PDS
132132+- `AccountMessage` (`#account`) - notifies about a change of an account's status (de/activation, suspension, deletion)
133133+- `SyncMessage` (`#sync`) - updates repository state, can be used to trigger account resynchronization
134134+- `LabelsMessage` (`#labels`) - only used in `subscribe_labels` endpoint
135135+- `InfoMessage` (`#info`) - a protocol error message, e.g. about an invalid cursor parameter
136136+- `UnknownMessage` is used for other unrecognized message types
137137+138138+`Skyfall::Firehose::Message` and `Skyfall::Jetstream::Message` variants of message classes should have more or less the same interface, except when a given field is not included in one of the formats.
139139+140140+All message objects have the following shared properties:
141141+142142+- `type` (symbol) - the message type identifier, e.g. `:commit`
143143+- `seq` (integer) - a sequential index of the message; Jetstream messages instead have a `time_us` value, which is a Unix timestamp in microseconds (also aliased as `seq` for compatibility)
144144+- `repo` or `did` (string) - DID of the repository (user account)
145145+- `time` (Time) - timestamp of the described action
481464949-- `type` (symbol) - usually `:commit`
5050-- `seq` (sequential number)
5151-- `time` (Time)
5252-- `repo` (string) - DID of the repository (user account)
147147+All properties except `type` may be nil for some message types that aren't related to a specific user, like `#info`.
148148+149149+Commit messages additionally have:
150150+53151- `commit` - CID of the commit
5454-- `prev` - CID of the previous commit in that repo
55152- `operations` - list of operations (usually one)
561535757-Operations are objects of type `Operation` and have such properties:
154154+Handle and Identity messages additionally have:
155155+156156+- `handle` - the new handle assigned to the DID
157157+158158+Account messages additionally have:
159159+160160+- `active?` - whether the account is active, or inactive for any reason
161161+- `status` - if not active, shows the status of the account (`:deactivated`, `:deleted`, `:takendown`)
162162+163163+Info messages additionally have:
164164+165165+- `name` - identifier of the message/error
166166+- `message` - a human-readable description
167167+581685959-- `repo` (string) - DID of the repository (user account)
169169+### Commit operations
170170+171171+Operations are objects of type `Skyfall::Firehose::Operation` or `Skyfall::Jetstream::Operation` and have such properties:
172172+173173+- `repo` or `did` (string) - DID of the repository (user account)
60174- `collection` (string) - name of the relevant collection in the repository, e.g. `app.bsky.feed.post` for posts
175175+- `type` (symbol) - short name of the collection, e.g. `:bsky_post`
176176+- `rkey` (string) - identifier of a record in a collection
61177- `path` (string) - the path part of the at:// URI - collection name + ID (rkey) of the item
178178+- `uri` (string) - the complete at:// URI
62179- `action` (symbol) - `:create`, `:update` or `:delete`
6363-- `uri` (string) - the at:// URI
6464-- `type` (symbol) - short name of the collection, e.g. `:bsky_post`
6565-- `cid` - CID of the operation/record (`nil` for delete operations)
180180+- `cid` (CID) - CID of the operation/record (`nil` for delete operations)
661816767-Create and update operations will also have an attached record (JSON object) with details of the post, like etc. The record data is currently available as a Ruby hash via `raw_record` property (custom types will be added in a later version).
182182+Create and update operations will also have an attached record (JSON object) with details of the post, like etc. The record data is currently available as a Ruby hash via `raw_record` property (custom types will be added in future).
6818369184So for example, in order to filter only "create post" operations and print their details, you can do something like this:
70185···82197end
83198```
841998585-See complete example in [example/firehose.rb](https://github.com/mackuba/skyfall/blob/master/example/firehose.rb).
200200+For more examples, see the [examples page](https://ruby.sdk.blue/examples/) on [ruby.sdk.blue](https://ruby.sdk.blue), or the [bluesky-feeds-rb](https://tangled.org/mackuba.eu/bluesky-feeds-rb/blob/master/app/firehose_stream.rb) project, which implements a feed generator service.
201201+202202+203203+### Note on custom lexicons
204204+205205+Note that the `Operation` objects have two properties that tell you the kind of record they're about: `#collection`, which is a string containing the official name of the collection/lexicon, e.g. `"app.bsky.feed.post"`; and `#type`, which is a symbol meant to save you some typing, e.g. `:bsky_post`.
206206+207207+When Skyfall receives a message about a record type that's not on the list, whether in the `app.bsky` namespace or not, the operation `type` will be `:unknown`, while the `collection` will be the original string. So if an app like e.g. "Skygram" appears with a `zz.skygram.*` namespace that lets you share photos on ATProto, the operations will have a type `:unknown` and collection names like `zz.skygram.feed.photo`, and you can check the `collection` field for record types known to you and process them in some appropriate way, even if Skyfall doesn't recognize the record type.
208208+209209+Do not however check if such operations have a `type` equal to `:unknown` first - just ignore the type and only check the `collection` string. The reason is that some next version of Skyfall might start recognizing those records and add a new `type` value for them like e.g. `:skygram_photo`, and then they won't match your condition anymore.
210210+211211+212212+## Reconnection logic
213213+214214+In a perfect world, the websocket would never disconnect until you disconnect it, but unfortunately we don't live in a perfect world. The socket sometimes disconnects or stops responding, and Skyfall has some built-in protections to make sure it can operate without much oversight.
215215+216216+217217+### Broken connections
218218+219219+If the connection is randomly closed for some reason, Skyfall will by default try to reconnect automatically. If the reconnection fails (e.g. because the network is down), it will wait with an [exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) up to 5 minute intervals and keep retrying forever until it connects again. The `on_reconnect` callback is triggered when the connection is closed (before the wait delay). This mechanism should generally solve most of the problem.
220220+221221+The auto reconnecting feature is enabled by default, but you can turn it off by setting `auto_reconnect` to `false`.
222222+223223+### Stalled connections & heartbeat
224224+225225+Occasionally, especially during times of very heavy traffic, the websocket can get into a stuck state where it stops receiving any data, but doesn't disconnect and just hangs like this forever. To work around this, there is a "heartbeat" feature which starts a background timer, which periodically checks how much time has passed since the last received event, and if the time exceeds a set limit, it manually disconnects and reconnects the stream.
226226+227227+This feature is not enabled by default, because there are some firehoses which will not be sending events often, possibly only once in a while โ e.g. labellers and independent PDS firehoses โ and in this case we don't want any heartbeat since it will be completely normal not to have any events for a long time. It's not really possible to detect easily if we're connecting to a full network relay or one of those, so in order to avoid false alarms, you need to enable this manually using the `check_heartbeat` property.
228228+229229+You can also change the `heartbeat_interval`, i.e. how often the timer is triggered (default: 10s), and the `heartbeat_timeout`, i.e. the amount of time passed without events needed to cause a reconnect (default: 5 min):
230230+231231+```rb
232232+sky.check_heartbeat = true
233233+sky.heartbeat_interval = 5
234234+sky.heartbeat_timeout = 120
235235+```
236236+237237+### Cursors when reconnecting
238238+239239+Skyfall keeps track of the last event's `seq` internally in the `cursor` property, so if the client reconnects for whatever reason, it will automatically use the latest cursor in the URL.
240240+241241+> [!NOTE]
242242+> This only happens if you use the `on_message` callback and not `on_raw_message`, since the event is not parsed from binary data into a `Message` object if you use `on_raw_message`, so Skyfall won't have access to the `seq` field then.
243243+244244+245245+## Streaming from labellers
246246+247247+Apart from `subscribe_repos`, there is a second endpoint `subscribe_labels`, which is used to stream labels from [labellers](https://atproto.com/specs/label) (ATProto moderation services). This endpoint only sends `#labels` events (and possibly `#info`).
248248+249249+To connect to a labeller, pass `:subscribe_labels` as the endpoint name to `Skyfall::Firehose`. The `on_message` callback will get called with `Skyfall::Firehose::LabelsMessage` events, each of which includes one or more labels as `Skyfall::Label`:
250250+251251+```rb
252252+cursor = load_cursor(service)
253253+sky = Skyfall::Firehose.new(service, :subscribe_labels, cursor)
254254+sky.on_message do |msg|
255255+ if msg.type == :labels
256256+ msg.labels.each do |l|
257257+ puts "[#{l.created_at}] #{l.subject} => #{l.value}"
258258+ end
259259+ end
260260+end
261261+```
262262+263263+See [ATProto label docs](https://atproto.com/specs/label) for info on what fields are included with each label - `Skyfall::Label` includes properties with these original names, and also more friendly aliases for each (e.g. `value` instead of `val`).
264264+265265+266266+## Other configuration
267267+268268+### User agent
269269+270270+Skyfall sends a user agent header when making a connection. This is set by default to `"Skyfall/0.x.y"`, but it's recommended that you override it using the `user_agent` field to something that identifies your app and its author โ this will let the owner of the server you're connecting to know who to contact in case the client is causing some problems.
271271+272272+You can also append your user agent info to the default value like this:
273273+274274+```rb
275275+sky.user_agent = "NewsBot (@news.bot) #{sky.version_string}"
276276+```
277277+278278+### Jetstream filters
279279+280280+Jetstream allows you to specify [filters](https://github.com/bluesky-social/jetstream?tab=readme-ov-file#consuming-jetstream) of collection types and/or tracked DIDs when you connect, so it will send you only the events you're interested in. You can e.g. ask only for posts and ignore likes, or only profile events and ignore everything else, or only listen for posts from a few specific accounts.
281281+282282+To use these filters, pass the "wantedCollections" and/or "wantedDids" parameters in the options hash when initializing `Skyfall::Jetstream`. You can use the original JavaScript param names, or a more Ruby-like snake_case form:
283283+284284+```rb
285285+sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', {
286286+ wanted_collections: 'app.bsky.feed.post',
287287+ wanted_dids: @dids
288288+})
289289+```
290290+291291+For collections, you can also use the symbol codes used in `Operation#type`, e.g. `:bsky_post`:
292292+293293+```rb
294294+sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', {
295295+ wanted_collections: [:bsky_post]
296296+})
297297+```
298298+299299+See [Jetstream docs](https://github.com/bluesky-social/jetstream?tab=readme-ov-file#consuming-jetstream) for more info on available filters.
300300+301301+> [!NOTE]
302302+> The `compress` and `requireHello` options (and zstd compression) are not available at the moment. Also the "subscriber sourced messages" aren't implemented yet.
863038730488305## Credits
893069090-Copyright ยฉ 2023 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/mackuba.eu)).
307307+Copyright ยฉ 2026 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/did:plc:oio4hkxaop4ao4wz2pp3f4cr)).
9130892309The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT).
93310
···11+# frozen_string_literal: true
22+13module Skyfall
44+55+ #
66+ # This module defines constants for known Bluesky record collection types, and a mapping of those
77+ # names to symbol short codes which can be used as shorthand when processing events or in
88+ # Jetstream filters.
99+ #
1010+211 module Collection
33- BSKY_POST = "app.bsky.feed.post"
44- BSKY_LIKE = "app.bsky.feed.like"
55- BSKY_FOLLOW = "app.bsky.graph.follow"
66- BSKY_REPOST = "app.bsky.feed.repost"
77- BSKY_BLOCK = "app.bsky.graph.block"
88- BSKY_PROFILE = "app.bsky.actor.profile"
99- BSKY_LISTITEM = "app.bsky.graph.listitem"
1010- BSKY_FEED = "app.bsky.feed.generator"
1212+ BSKY_PROFILE = "app.bsky.actor.profile"
1313+ BSKY_ACTOR_STATUS = "app.bsky.actor.status"
1414+ BSKY_FEED = "app.bsky.feed.generator"
1515+ BSKY_LIKE = "app.bsky.feed.like"
1616+ BSKY_POST = "app.bsky.feed.post"
1717+ BSKY_POSTGATE = "app.bsky.feed.postgate"
1818+ BSKY_REPOST = "app.bsky.feed.repost"
1919+ BSKY_THREADGATE = "app.bsky.feed.threadgate"
2020+ BSKY_BLOCK = "app.bsky.graph.block"
2121+ BSKY_FOLLOW = "app.bsky.graph.follow"
2222+ BSKY_LIST = "app.bsky.graph.list"
2323+ BSKY_LISTBLOCK = "app.bsky.graph.listblock"
2424+ BSKY_LISTITEM = "app.bsky.graph.listitem"
2525+ BSKY_STARTERPACK = "app.bsky.graph.starterpack"
2626+ BSKY_VERIFICATION = "app.bsky.graph.verification"
2727+ BSKY_LABELER = "app.bsky.labeler.service"
2828+2929+ BSKY_NOTIF_DECLARATION = "app.bsky.notification.declaration"
3030+ BSKY_CHAT_DECLARATION = "chat.bsky.actor.declaration"
3131+3232+ # Mapping of NSID collection names to symbol short codes
3333+3434+ SHORT_CODES = {
3535+ BSKY_ACTOR_STATUS => :bsky_actor_status,
3636+ BSKY_BLOCK => :bsky_block,
3737+ BSKY_FEED => :bsky_feed,
3838+ BSKY_FOLLOW => :bsky_follow,
3939+ BSKY_LABELER => :bsky_labeler,
4040+ BSKY_LIKE => :bsky_like,
4141+ BSKY_LIST => :bsky_list,
4242+ BSKY_LISTBLOCK => :bsky_listblock,
4343+ BSKY_LISTITEM => :bsky_listitem,
4444+ BSKY_POST => :bsky_post,
4545+ BSKY_POSTGATE => :bsky_postgate,
4646+ BSKY_PROFILE => :bsky_profile,
4747+ BSKY_REPOST => :bsky_repost,
4848+ BSKY_STARTERPACK => :bsky_starterpack,
4949+ BSKY_THREADGATE => :bsky_threadgate,
5050+ BSKY_VERIFICATION => :bsky_verification,
5151+ BSKY_CHAT_DECLARATION => :bsky_chat_declaration,
5252+ BSKY_NOTIF_DECLARATION => :bsky_notif_declaration
5353+ }
5454+5555+ # Returns a symbol short code for a given collection NSID, or `:unknown`
5656+ # if NSID is not on the list.
5757+ # @param collection [String] collection NSID
5858+ # @return [Symbol] short code or :unknown
5959+6060+ def self.short_code(collection)
6161+ SHORT_CODES[collection] || :unknown
6262+ end
6363+6464+ # Returns a collection NSID assigned to a given short code symbol, if one is defined.
6565+ # @param code [Symbol] one of the symbols listed in {SHORT_CODES}
6666+ # @return [String, nil] assigned NSID string, or nil when code is not known
6767+6868+ def self.from_short_code(code)
6969+ SHORT_CODES.detect { |k, v| v == code }&.first
7070+ end
1171 end
1272end
+58-4
lib/skyfall/errors.rb
···11+# frozen_string_literal: true
22+13module Skyfall
22- class DecodeError < StandardError
44+ #
55+ # Wrapper base class for Skyfall error classes.
66+ #
77+ class Error < StandardError
38 end
4955- class UnsupportedError < StandardError
1010+ #
1111+ # Raised when some code is not configured or configured incorrectly.
1212+ #
1313+ class ConfigError < Error
614 end
71588- class SubscriptionError < StandardError
99- attr_reader :error_type, :error_message
1616+ #
1717+ # Raised when some part of the message being decoded has invalid format.
1818+ #
1919+ class DecodeError < Error
2020+ end
10212222+ #
2323+ # Raised when {Stream#connect} is called and there's already another instance of {Stream} or its
2424+ # subclass like {Firehose} that's connected to another websocket.
2525+ #
2626+ # This is currently not supported in Skyfall, because it uses EventMachine behind the scenes, which
2727+ # runs everything on a single "reactor" thread, and there can be only one such reactor thread in
2828+ # a given process. In theory, it should be possible for two connections to run inside a single
2929+ # shared EventMachine event loop, but it would require some more coordination and it might have
3030+ # unexpected side effects - e.g. synchronous work (including I/O and network requests) done during
3131+ # processing of an event from one connection would be blocking the other connection.
3232+ #
3333+ class ReactorActiveError < Error
3434+ def initialize
3535+ super(
3636+ "An EventMachine reactor thread is already running, but it seems to have been launched by another Stream. " +
3737+ "Skyfall doesn't currently support running two different Stream instances in a single process."
3838+ )
3939+ end
4040+ end
4141+4242+ #
4343+ # Raised when the server sends a message which is formatted correctly, but describes some kind of
4444+ # error condition that the server has detected.
4545+ #
4646+ class SubscriptionError < Error
4747+4848+ # @return [String] a short machine-readable error code
4949+ attr_reader :error_type
5050+5151+ # @return [String] a human-readable error message
5252+ attr_reader :error_message
5353+5454+ #
5555+ # @param error_type [String] a short machine-readable error code
5656+ # @param error_message [String, nil] a human-readable error message
5757+ #
1158 def initialize(error_type, error_message = nil)
1259 @error_type = error_type
1360 @error_message = error_message
14611562 super("Subscription error: #{error_type}" + (error_message ? " (#{error_message})" : ""))
1663 end
6464+ end
6565+6666+ #
6767+ # Raised when the server sends a message which is formatted correctly, but written in a version
6868+ # that's not supported by this library.
6969+ #
7070+ class UnsupportedError < Error
1771 end
1872end
+19
lib/skyfall/events.rb
···11+# frozen_string_literal: true
22+33+module Skyfall
44+55+ # @private
66+ module Events
77+ protected
88+99+ def event_handler(name)
1010+ define_method("on_#{name}") do |&block|
1111+ @handlers[name.to_sym] = block
1212+ end
1313+1414+ define_method("on_#{name}=") do |block|
1515+ @handlers[name.to_sym] = block
1616+ end
1717+ end
1818+ end
1919+end
···11+# frozen_string_literal: true
22+33+require_relative '../firehose'
44+require_relative 'message'
55+66+module Skyfall
77+88+ #
99+ # Firehose message sent when the status of an account changes. This can be:
1010+ #
1111+ # - an account being created, sending its initial state (should be active)
1212+ # - an account being deactivated or suspended
1313+ # - an account being restored back to an active state from deactivation/suspension
1414+ # - an account being deleted (the status returning `:deleted`)
1515+ #
1616+1717+ class Firehose::AccountMessage < Firehose::Message
1818+1919+ #
2020+ # @private
2121+ # @param type_object [Hash] first decoded CBOR frame with metadata
2222+ # @param data_object [Hash] second decoded CBOR frame with payload
2323+ # @raise [DecodeError] if the message doesn't include required data
2424+ #
2525+ def initialize(type_object, data_object)
2626+ super
2727+ check_if_not_nil 'seq', 'did', 'time', 'active'
2828+2929+ @active = @data_object['active']
3030+ @status = @data_object['status']&.to_sym
3131+ end
3232+3333+ # @return [Boolean] true if the account is active, false if it's deactivated/suspended etc.
3434+ def active?
3535+ @active
3636+ end
3737+3838+ # @return [Symbol, nil] for inactive accounts, specifies the exact state; nil for active accounts
3939+ attr_reader :status
4040+ end
4141+end
+69
lib/skyfall/firehose/commit_message.rb
···11+# frozen_string_literal: true
22+33+require_relative '../car_archive'
44+require_relative '../cid'
55+require_relative '../firehose'
66+require_relative 'message'
77+require_relative 'operation'
88+99+module Skyfall
1010+1111+ #
1212+ # Firehose message which includes one or more operations on records in the repo (a record was
1313+ # created, updated or deleted). In most cases this is a single record operation.
1414+ #
1515+ # Most of the messages received from the firehose are of this type, and this is the type you
1616+ # will usually be most interested in.
1717+ #
1818+1919+ class Firehose::CommitMessage < Firehose::Message
2020+2121+ #
2222+ # @private
2323+ # @param type_object [Hash] first decoded CBOR frame with metadata
2424+ # @param data_object [Hash] second decoded CBOR frame with payload
2525+ # @raise [DecodeError] if the message doesn't include required data
2626+ #
2727+ def initialize(type_object, data_object)
2828+ super
2929+ check_if_not_nil 'seq', 'repo', 'commit', 'blocks', 'ops', 'time', 'rev'
3030+ end
3131+3232+ # @return [String] current revision of the repo
3333+ def rev
3434+ @data_object['rev']
3535+ end
3636+3737+ # @return [String, nil] revision of the previous commit in the repo
3838+ def since
3939+ @data_object['since']
4040+ end
4141+4242+ # @return [CID, nil] CID (Content Identifier) of data of the previous commit in the repo
4343+ def prev_data
4444+ @prev_data ||= CID.from_cbor_tag(@data_object['prevData'])
4545+ end
4646+4747+ # @return [CID] CID (Content Identifier) of the commit
4848+ def commit
4949+ @commit ||= CID.from_cbor_tag(@data_object['commit'])
5050+ end
5151+5252+ # @return [Skyfall::CarArchive] commit data in the form of a parsed CAR archive
5353+ def blocks
5454+ @blocks ||= CarArchive.new(@data_object['blocks'])
5555+ end
5656+5757+ # @return [Array<Firehose::Operation>] record operations (usually one) included in the commit
5858+ def operations
5959+ @operations ||= @data_object['ops'].map { |op| Firehose::Operation.new(self, op) }
6060+ end
6161+6262+ # Looks up record data assigned to a given operation in the commit's CAR archive.
6363+ # @param op [Firehose::Operation]
6464+ # @return [Hash, nil]
6565+ def raw_record_for_operation(op)
6666+ op.cid && blocks.section_with_cid(op.cid)
6767+ end
6868+ end
6969+end
+37
lib/skyfall/firehose/identity_message.rb
···11+# frozen_string_literal: true
22+33+require_relative '../firehose'
44+require_relative 'message'
55+66+module Skyfall
77+88+ #
99+ # Firehose message sent when a new DID is created or when the details of someone's DID document
1010+ # are changed (usually either a handle change or a migration to a different PDS). The message
1111+ # may include currently assigned handle, though it's not required that this field is set.
1212+ #
1313+ # Note: the message is originally emitted from the account's PDS and is passed as is by relays,
1414+ # which means you can't fully trust that the handle is actually correctly assigned to the DID
1515+ # and verified by DNS or well-known. To confirm that, use `DID.resolve_handle` from
1616+ # [DIDKit](https://ruby.sdk.blue/didkit/).
1717+ #
1818+1919+ class Firehose::IdentityMessage < Firehose::Message
2020+2121+ #
2222+ # @private
2323+ # @param type_object [Hash] first decoded CBOR frame with metadata
2424+ # @param data_object [Hash] second decoded CBOR frame with payload
2525+ # @raise [DecodeError] if the message doesn't include required data
2626+ #
2727+ def initialize(type_object, data_object)
2828+ super
2929+ check_if_not_nil 'seq', 'did', 'time'
3030+3131+ @handle = @data_object['handle']
3232+ end
3333+3434+ # @return [String, nil] current handle assigned to the DID
3535+ attr_reader :handle
3636+ end
3737+end
+58
lib/skyfall/firehose/info_message.rb
···11+# frozen_string_literal: true
22+33+require_relative '../firehose'
44+require_relative 'message'
55+66+module Skyfall
77+88+ #
99+ # An informational firehose message from the websocket service itself, unrelated to any repos.
1010+ #
1111+ # Currently there is only one type of message defined, `"OutdatedCursor"`, which is sent when
1212+ # the client connects with a cursor that is older than the oldest event currently kept in the
1313+ # backfill buffer. This message means that you're likely missing some events that were sent
1414+ # since the last time the client was connected but which were already deleted from the buffer.
1515+ #
1616+ # Note: the {#did}, {#seq} and {#time} properties are always `nil` for `#info` messages.
1717+ #
1818+1919+ class Firehose::InfoMessage < Firehose::Message
2020+2121+ # @return [String] short machine-readable code of the info message
2222+ attr_reader :name
2323+2424+ # @return [String, nil] a human-readable description
2525+ attr_reader :message
2626+2727+ # Message which means that the cursor passed when connecting is older than the oldest event
2828+ # currently kept in the backfill buffer, and that you've likely missed some events that have
2929+ # already been deleted
3030+ OUTDATED_CURSOR = "OutdatedCursor"
3131+3232+ #
3333+ # @private
3434+ # @param type_object [Hash] first decoded CBOR frame with metadata
3535+ # @param data_object [Hash] second decoded CBOR frame with payload
3636+ # @raise [DecodeError] if the message doesn't include required data
3737+ #
3838+ def initialize(type_object, data_object)
3939+ super
4040+ check_if_not_nil 'name'
4141+4242+ @name = @data_object['name']
4343+ @message = @data_object['message']
4444+ end
4545+4646+ # @return [String] a formatted summary
4747+ def to_s
4848+ (@name || "InfoMessage") + (@message ? ": #{@message}" : "")
4949+ end
5050+5151+ protected
5252+5353+ # @return [Array<Symbol>] list of instance variables to be printed in the {#inspect} output
5454+ def inspectable_variables
5555+ super - [:@did, :@seq]
5656+ end
5757+ end
5858+end
+41
lib/skyfall/firehose/labels_message.rb
···11+# frozen_string_literal: true
22+33+require_relative '../firehose'
44+require_relative '../label'
55+require_relative 'message'
66+77+module Skyfall
88+99+ #
1010+ # A message which includes one or more labels (as {Skyfall::Label}). This type of message
1111+ # is only sent from a `:subscribe_labels` firehose from a labeller service.
1212+ #
1313+ # Note: the {#did} and {#time} properties are always `nil` for `#labels` messages.
1414+ #
1515+1616+ class Firehose::LabelsMessage < Firehose::Message
1717+1818+ # @return [Array<Skyfall::Label>] labels included in the batch
1919+ attr_reader :labels
2020+2121+ #
2222+ # @private
2323+ # @param type_object [Hash] first decoded CBOR frame with metadata
2424+ # @param data_object [Hash] second decoded CBOR frame with payload
2525+ # @raise [DecodeError] if the message doesn't include required data
2626+ #
2727+ def initialize(type_object, data_object)
2828+ super
2929+ check_if_not_nil 'seq', 'labels'
3030+3131+ @labels = @data_object['labels'].map { |x| Label.new(x) }
3232+ end
3333+3434+ protected
3535+3636+ # @return [Array<Symbol>] list of instance variables to be printed in the {#inspect} output
3737+ def inspectable_variables
3838+ super - [:@did]
3939+ end
4040+ end
4141+end
+213
lib/skyfall/firehose/message.rb
···11+# frozen_string_literal: true
22+33+require_relative '../errors'
44+require_relative '../extensions'
55+require_relative '../firehose'
66+77+require 'cbor'
88+require 'time'
99+1010+module Skyfall
1111+1212+ # @abstract
1313+ # Abstract base class representing a CBOR firehose message.
1414+ #
1515+ # Actual messages are returned as instances of one of the subclasses of this class,
1616+ # depending on the type of message, most commonly as {Skyfall::Firehose::CommitMessage}.
1717+ #
1818+ # The {new} method is overridden here so that it can be called with a binary data message
1919+ # from the websocket, and it parses the type from the appropriate frame and builds an
2020+ # instance of a matching subclass.
2121+ #
2222+ # You normally don't need to call this class directly, unless you're building a custom
2323+ # subclass of {Skyfall::Stream}, or reading raw data packets from the websocket through
2424+ # the {Skyfall::Stream#on_raw_message} event handler.
2525+2626+ class Firehose::Message
2727+ using Skyfall::Extensions
2828+2929+ # Type of the message (e.g. `:commit`, `:identity` etc.)
3030+ # @return [Symbol]
3131+ attr_reader :type
3232+3333+ # DID of the account (repo) that the event is sent by.
3434+ # @return [String, nil]
3535+ attr_reader :did
3636+3737+ # Sequential number of the message, to be used as a cursor when reconnecting.
3838+ # @return [Integer, nil]
3939+ attr_reader :seq
4040+4141+ alias repo did
4242+ alias kind type
4343+4444+ # First of the two CBOR objects forming the message payload, which mostly just includes the type field.
4545+ # @api private
4646+ # @return [Hash]
4747+ attr_reader :type_object
4848+4949+ # Second of the two CBOR objects forming the message payload, which contains the rest of the data.
5050+ # @api private
5151+ # @return [Hash]
5252+ attr_reader :data_object
5353+5454+ #
5555+ # Parses the CBOR objects from the binary data and returns an instance of an appropriate subclass.
5656+ #
5757+ # {Skyfall::Firehose::UnknownMessage} is returned if the message type is not recognized.
5858+ #
5959+ # @param data [String] binary payload of a firehose websocket message
6060+ # @return [Skyfall::Firehose::Message]
6161+ # @raise [Skyfall::DecodeError] if the structure of the message is invalid
6262+ # @raise [Skyfall::UnsupportedError] if the message has an unknown future version
6363+ # @raise [Skyfall::SubscriptionError] if the data contains an error message from the server
6464+ #
6565+ def self.new(data)
6666+ type_object, data_object = decode_cbor_objects(data)
6767+6868+ message_class = case type_object['t']
6969+ when '#account' then Firehose::AccountMessage
7070+ when '#commit' then Firehose::CommitMessage
7171+ when '#identity' then Firehose::IdentityMessage
7272+ when '#info' then Firehose::InfoMessage
7373+ when '#labels' then Firehose::LabelsMessage
7474+ when '#sync' then Firehose::SyncMessage
7575+ else Firehose::UnknownMessage
7676+ end
7777+7878+ if self != Firehose::Message && self != message_class
7979+ expected_type = self.name.split('::').last.gsub(/Message$/, '').downcase
8080+ raise DecodeError, "Expected ##{expected_type} message, got #{type_object['t']}"
8181+ end
8282+8383+ message = message_class.allocate
8484+ message.send(:initialize, type_object, data_object)
8585+ message
8686+ end
8787+8888+ #
8989+ # @private
9090+ # @param type_object [Hash] first decoded CBOR frame with metadata
9191+ # @param data_object [Hash] second decoded CBOR frame with payload
9292+ #
9393+ def initialize(type_object, data_object)
9494+ @type_object = type_object
9595+ @data_object = data_object
9696+9797+ @type = @type_object['t'][1..-1].to_sym
9898+ @did = @data_object['repo'] || @data_object['did']
9999+ @seq = @data_object['seq']
100100+ end
101101+102102+ #
103103+ # List of operations on records included in the message. Only `#commit` messages include
104104+ # operations, but for convenience the method is declared here and returns an empty array
105105+ # in other messages.
106106+ # @return [Array<Firehose::Operation>]
107107+ #
108108+ def operations
109109+ []
110110+ end
111111+112112+ #
113113+ # @return [Boolean] true if the message is {Firehose::UnknownMessage} (of unrecognized type)
114114+ #
115115+ def unknown?
116116+ self.is_a?(Firehose::UnknownMessage)
117117+ end
118118+119119+ #
120120+ # Timestamp decoded from the message.
121121+ #
122122+ # Note: this represents the time when the message was emitted from the original PDS, which
123123+ # might differ a lot from the `created_at` time saved in the record data, e.g. if user's local
124124+ # time is set incorrectly, or if an archive of existing posts was imported from another platform.
125125+ #
126126+ # @return [Time, nil]
127127+ #
128128+ def time
129129+ @time ||= @data_object['time'] && Time.iso8601(@data_object['time'])
130130+ end
131131+132132+ # Much faster version for Ruby 3.2+
133133+134134+ if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.2')
135135+ def time
136136+ @time ||= @data_object['time'] && Time.new(@data_object['time'])
137137+ end
138138+ end
139139+140140+ # Returns a string with a representation of the object for debugging purposes.
141141+ # @return [String]
142142+ def inspect
143143+ vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
144144+ "#<#{self.class}:0x#{object_id} #{vars}>"
145145+ end
146146+147147+148148+ protected
149149+150150+ # @return [Array<Symbol>] list of instance variables to be printed in the {#inspect} output
151151+ def inspectable_variables
152152+ instance_variables - [:@type_object, :@data_object, :@blocks]
153153+ end
154154+155155+156156+ private
157157+158158+ # Note: this method is written this way as an optimization
159159+ def check_if_not_nil(a, b = nil, c = nil, d = nil, e = nil, f = nil, g = nil)
160160+ ok = @data_object.has_key?(a)
161161+ ok &&= @data_object.has_key?(b) if b
162162+ ok &&= @data_object.has_key?(c) if c
163163+ ok &&= @data_object.has_key?(d) if d
164164+ ok &&= @data_object.has_key?(e) if e
165165+ ok &&= @data_object.has_key?(f) if f
166166+ ok &&= @data_object.has_key?(g) if g
167167+168168+ if !ok
169169+ expected_fields = [a, b, c, d, e, f, g].compact
170170+ missing_fields = expected_fields.select { |x| @data_object[x].nil? }
171171+ raise DecodeError.new("Missing event details (#{missing_fields.map(&:to_s).join(', ')})")
172172+ end
173173+ end
174174+175175+ def self.decode_cbor_objects(data)
176176+ objects = CBOR.decode_sequence(data)
177177+178178+ if objects.length < 2
179179+ raise DecodeError.new("Malformed message: #{objects.inspect}")
180180+ elsif objects.length > 2
181181+ raise DecodeError.new("Invalid number of objects: #{objects.length}")
182182+ end
183183+184184+ type, data = objects
185185+186186+ if data['error']
187187+ raise SubscriptionError.new(data['error'], data['message'])
188188+ end
189189+190190+ raise DecodeError.new("Invalid object type: #{type.inspect}") unless type.is_a?(Hash)
191191+ raise DecodeError.new("Missing data: #{type.inspect}") unless type['op'] && type['t']
192192+ raise DecodeError.new("Invalid object type: #{type['op'].inspect}") unless type['op'].is_a?(Integer)
193193+ raise DecodeError.new("Invalid object type: #{type['t'].inspect}") unless type['t'].is_a?(String)
194194+ raise DecodeError.new("Invalid message type: #{type['t'].inspect}") unless type['t'].start_with?('#')
195195+ raise UnsupportedError.new("Unsupported version: #{type['op']}") unless type['op'] == 1
196196+ raise DecodeError.new("Invalid object type: #{data.inspect}") unless data.is_a?(Hash)
197197+198198+ [type, data]
199199+ end
200200+201201+ private_class_method :decode_cbor_objects
202202+ end
203203+end
204204+205205+# need to be at the end because of a circular dependency
206206+207207+require_relative 'account_message'
208208+require_relative 'commit_message'
209209+require_relative 'identity_message'
210210+require_relative 'info_message'
211211+require_relative 'labels_message'
212212+require_relative 'sync_message'
213213+require_relative 'unknown_message'
+110
lib/skyfall/firehose/operation.rb
···11+# frozen_string_literal: true
22+33+require_relative '../collection'
44+require_relative '../firehose'
55+66+module Skyfall
77+88+ #
99+ # A single record operation from a firehose commit event. An operation is a new record being
1010+ # created, or an existing record modified or deleted. It includes the URI and other details of
1111+ # the record in question, type of the action taken, and record data for "created" and "update"
1212+ # actions.
1313+ #
1414+ # Note: when a record is deleted, the previous record data is *not* included in the commit, only
1515+ # its URI. This means that if you're tracking records which are referencing other records, e.g.
1616+ # follow, block, or like records, you need to store information about this referencing record
1717+ # including an URI or rkey, because in case of a delete, you will not get information about which
1818+ # post was unliked or which account was unfollowed, only which like/follow record was deleted.
1919+ #
2020+ # At the moment, Skyfall doesn't parse the record data into any rich models specific for a given
2121+ # record type with a convenient API, but simply returns them as `Hash` objects (see {#raw_record}).
2222+ # In the future, a separate `#record` method might be added which returns a parsed record model.
2323+ #
2424+2525+ class Firehose::Operation
2626+2727+ #
2828+ # @param message [Skyfall::Firehose::Message] commit message the operation is included in
2929+ # @param json [Hash] operation data
3030+ #
3131+ def initialize(message, json)
3232+ @message = message
3333+ @json = json
3434+ end
3535+3636+ # @return [String] DID of the account/repository in which the operation happened
3737+ def repo
3838+ @message.repo
3939+ end
4040+4141+ alias did repo
4242+4343+ # @return [String] path part of the record URI (collection + rkey)
4444+ # @deprecated Use {#collection} + {#rkey}
4545+ def path
4646+ @@path_warning_printed ||= false
4747+4848+ unless @@path_warning_printed
4949+ $stderr.puts "Warning: Skyfall::Firehose::Operation#path is deprecated - use #collection + #rkey"
5050+ @@path_warning_printed = true
5151+ end
5252+5353+ @json['path']
5454+ end
5555+5656+ # @return [Symbol] type of the operation (`:create`, `:update` or `:delete`)
5757+ def action
5858+ @json['action'].to_sym
5959+ end
6060+6161+ # @return [String] record collection NSID
6262+ def collection
6363+ @json['path'].split('/')[0]
6464+ end
6565+6666+ # @return [String] record rkey
6767+ def rkey
6868+ @json['path'].split('/')[1]
6969+ end
7070+7171+ # @return [String] full AT URI of the record
7272+ def uri
7373+ "at://#{repo}/#{@json['path']}"
7474+ end
7575+7676+ # @return [CID, nil] CID (Content Identifier) of the record (nil for delete operations)
7777+ def cid
7878+ @cid ||= @json['cid'] && CID.from_cbor_tag(@json['cid'])
7979+ end
8080+8181+ # @return [Hash, nil] record data as a plain Ruby Hash (nil for delete operations)
8282+ def raw_record
8383+ @raw_record ||= @message.raw_record_for_operation(self)
8484+ end
8585+8686+ # Symbol short code of the collection, like `:bsky_post`. If the collection NSID is not
8787+ # recognized, the type is `:unknown`. The full NSID is always available through the
8888+ # `#collection` property.
8989+ #
9090+ # @return [Symbol]
9191+ # @see Skyfall::Collection
9292+ #
9393+ def type
9494+ Collection.short_code(collection)
9595+ end
9696+9797+ # Returns a string with a representation of the object for debugging purposes.
9898+ # @return [String]
9999+ def inspect
100100+ vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
101101+ "#<#{self.class}:0x#{object_id} #{vars}>"
102102+ end
103103+104104+ private
105105+106106+ def inspectable_variables
107107+ instance_variables - [:@message]
108108+ end
109109+ end
110110+end
+40
lib/skyfall/firehose/sync_message.rb
···11+# frozen_string_literal: true
22+33+require_relative '../car_archive'
44+require_relative '../firehose'
55+require_relative 'message'
66+77+module Skyfall
88+99+ #
1010+ # Firehose message which declares the current state of the repository. The message is meant to
1111+ # trigger a resynchronization of the repository from a receiving consumer, if the consumer detects
1212+ # from the message rev that it must have missed some events from that repository.
1313+ #
1414+ # The sync message can be emitted by a PDS or relay to force a repair of a broken account state,
1515+ # or e.g. when an account is created, migrated or recovered from a CAR backup.
1616+ #
1717+1818+ class Firehose::SyncMessage < Firehose::Message
1919+2020+ #
2121+ # @private
2222+ # @param type_object [Hash] first decoded CBOR frame with metadata
2323+ # @param data_object [Hash] second decoded CBOR frame with payload
2424+ # @raise [DecodeError] if the message doesn't include required data
2525+ #
2626+ def initialize(type_object, data_object)
2727+ super
2828+ check_if_not_nil 'seq', 'did', 'blocks', 'rev', 'time'
2929+ end
3030+3131+ def rev
3232+ @rev ||= @data_object['rev']
3333+ end
3434+3535+ # @return [Skyfall::CarArchive] commit data in the form of a parsed CAR archive
3636+ def blocks
3737+ @blocks ||= CarArchive.new(@data_object['blocks'])
3838+ end
3939+ end
4040+end
+14
lib/skyfall/firehose/unknown_message.rb
···11+# frozen_string_literal: true
22+33+require_relative '../firehose'
44+require_relative 'message'
55+66+module Skyfall
77+88+ #
99+ # Firehose message of an unrecognized type.
1010+ #
1111+1212+ class Firehose::UnknownMessage < Firehose::Message
1313+ end
1414+end
+166
lib/skyfall/firehose.rb
···11+# frozen_string_literal: true
22+33+require_relative 'stream'
44+require 'uri'
55+66+module Skyfall
77+88+ #
99+ # Client of a standard AT Protocol firehose websocket.
1010+ #
1111+ # This is the main Skyfall class to use to connect to a CBOR-based firehose
1212+ # websocket endpoint like `subscribeRepos` (on a PDS or a relay).
1313+ #
1414+ # To connect to the firehose, you need to:
1515+ #
1616+ # * create an instance of {Firehose}, passing it the hostname/URL of the server,
1717+ # name of the endpoint (normally `:subscribe_repos`) and optionally a cursor
1818+ # * set up callbacks to be run when connecting, disconnecting, when a message
1919+ # is received etc. (you need to set at least a message handler)
2020+ # * call {#connect} to start the connection
2121+ # * handle the received messages (instances of a {Skyfall::Firehose::Message}
2222+ # subclass)
2323+ #
2424+ # @example
2525+ # client = Skyfall::Firehose.new('bsky.network', :subscribe_repos, last_cursor)
2626+ # # or: client = Skyfall::Firehose.new('bsky.network', last_cursor)
2727+ #
2828+ # client.on_message do |msg|
2929+ # next unless msg.type == :commit
3030+ #
3131+ # msg.operations.each do |op|
3232+ # if op.type == :bsky_post && op.action == :create
3333+ # puts "[#{msg.time}] #{msg.repo}: #{op.raw_record['text']}"
3434+ # end
3535+ # end
3636+ # end
3737+ #
3838+ # client.connect
3939+ #
4040+ # # You might also want to set some or all of these lifecycle callback handlers:
4141+ #
4242+ # client.on_connecting { |url| puts "Connecting to #{url}..." }
4343+ # client.on_connect { puts "Connected" }
4444+ # client.on_disconnect { puts "Disconnected" }
4545+ # client.on_reconnect { puts "Connection lost, trying to reconnect..." }
4646+ # client.on_timeout { puts "Connection stalled, triggering a reconnect..." }
4747+ # client.on_error { |e| puts "ERROR: #{e}" }
4848+ #
4949+ # @note Most of the methods of this class that you might want to use are defined in {Skyfall::Stream}.
5050+ #
5151+5252+ class Firehose < Stream
5353+5454+ # the main firehose endpoint on a PDS or relay
5555+ SUBSCRIBE_REPOS = "com.atproto.sync.subscribeRepos"
5656+5757+ # only used with moderation services (labellers)
5858+ SUBSCRIBE_LABELS = "com.atproto.label.subscribeLabels"
5959+6060+ NAMED_ENDPOINTS = {
6161+ :subscribe_repos => SUBSCRIBE_REPOS,
6262+ :subscribe_labels => SUBSCRIBE_LABELS
6363+ }
6464+6565+ # Current cursor (seq of the last seen message)
6666+ # @return [Integer, nil]
6767+ attr_accessor :cursor
6868+6969+ #
7070+ # @overload initialize(server, endpoint, cursor = nil)
7171+ # Returns a new instance of a firehose client connecting to a given endpoint.
7272+ #
7373+ # @param server [String]
7474+ # Address of the server to connect to.
7575+ # Expects a string with either just a hostname, or a ws:// or wss:// URL with no path.
7676+ # @param endpoint [Symbol, String]
7777+ # XRPC method name.
7878+ # Pass either a full NSID, or a symbol shorthand from {NAMED_ENDPOINTS}
7979+ # @param cursor [Integer, String, nil]
8080+ # sequence number from which to resume
8181+ # @raise [ArgumentError] if any of the parameters is invalid
8282+ #
8383+ # @overload initialize(server, cursor = nil)
8484+ # Returns a new instance of a firehose client connecting to `subscribeRepos`.
8585+ #
8686+ # @param server [String]
8787+ # Address of the server to connect to.
8888+ # Expects a string with either just a hostname, or a ws:// or wss:// URL with no path.
8989+ # @param cursor [Integer, String, nil]
9090+ # sequence number from which to resume
9191+ # @raise [ArgumentError] if any of the parameters is invalid
9292+ #
9393+9494+ def initialize(server, endpoint = nil, cursor = nil)
9595+ require_relative 'firehose/message'
9696+ super(server)
9797+9898+ if cursor.nil? && (endpoint.nil? || endpoint.to_s =~ /\A\d+\z/)
9999+ cursor = endpoint
100100+ endpoint = :subscribe_repos
101101+ end
102102+103103+ @endpoint = check_endpoint(endpoint)
104104+ @cursor = check_cursor(cursor)
105105+ @root_url = ensure_empty_path(@root_url)
106106+ end
107107+108108+109109+ protected
110110+111111+ # Returns the full URL of the websocket endpoint to connect to.
112112+ # @return [String]
113113+114114+ def build_websocket_url
115115+ @root_url + "/xrpc/" + @endpoint + (@cursor ? "?cursor=#{@cursor}" : "")
116116+ end
117117+118118+ # Processes a single message received from the websocket. Passes the received data to the
119119+ # {#on_raw_message} handler, builds a {Skyfall::Firehose::Message} object, and passes it to
120120+ # the {#on_message} handler (if defined). Also updates the {#cursor} to this message's sequence
121121+ # number (note: this is skipped if {#on_message} is not set).
122122+ #
123123+ # @param msg
124124+ # {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/API/MessageEvent Faye::WebSocket::API::MessageEvent}
125125+ # @return [nil]
126126+127127+ def handle_message(msg)
128128+ data = msg.data
129129+ @handlers[:raw_message]&.call(data)
130130+131131+ if @handlers[:message]
132132+ atp_message = Message.new(data)
133133+ @cursor = atp_message.seq
134134+ @handlers[:message].call(atp_message)
135135+ else
136136+ @cursor = nil
137137+ end
138138+ end
139139+140140+141141+ private
142142+143143+ def check_cursor(cursor)
144144+ if cursor.nil?
145145+ nil
146146+ elsif cursor.is_a?(Integer) || cursor.is_a?(String) && cursor =~ /^[0-9]+$/
147147+ cursor.to_i
148148+ else
149149+ raise ArgumentError, "Invalid cursor: #{cursor.inspect} - cursor must be an integer number"
150150+ end
151151+ end
152152+153153+ def check_endpoint(endpoint)
154154+ if endpoint.is_a?(String)
155155+ raise ArgumentError.new("Invalid endpoint name: #{endpoint}") if endpoint.strip == '' || !endpoint.include?('.')
156156+ elsif endpoint.is_a?(Symbol)
157157+ raise ArgumentError.new("Unknown endpoint: #{endpoint}") if NAMED_ENDPOINTS[endpoint].nil?
158158+ endpoint = NAMED_ENDPOINTS[endpoint]
159159+ else
160160+ raise ArgumentError, "Endpoint should be a string or a symbol"
161161+ end
162162+163163+ endpoint
164164+ end
165165+ end
166166+end
+39
lib/skyfall/jetstream/account_message.rb
···11+# frozen_string_literal: true
22+33+require_relative '../errors'
44+require_relative '../jetstream'
55+require_relative 'message'
66+77+module Skyfall
88+99+ #
1010+ # Jetstream message sent when the status of an account changes. This can be:
1111+ #
1212+ # - an account being created, sending its initial state (should be active)
1313+ # - an account being deactivated or suspended
1414+ # - an account being restored back to an active state from deactivation/suspension
1515+ # - an account being deleted (the status returning `:deleted`)
1616+ #
1717+1818+ class Jetstream::AccountMessage < Jetstream::Message
1919+2020+ #
2121+ # @param json [Hash] message JSON decoded from the websocket message
2222+ # @raise [DecodeError] if the message doesn't include required data
2323+ #
2424+ def initialize(json)
2525+ raise DecodeError.new("Missing event details (account)") if json['account'].nil? || json['account']['active'].nil?
2626+ super
2727+ end
2828+2929+ # @return [Boolean] true if the account is active, false if it's deactivated/suspended etc.
3030+ def active?
3131+ @json['account']['active']
3232+ end
3333+3434+ # @return [Symbol, nil] for inactive accounts, specifies the exact state; nil for active accounts
3535+ def status
3636+ @json['account']['status']&.to_sym
3737+ end
3838+ end
3939+end
+49
lib/skyfall/jetstream/commit_message.rb
···11+# frozen_string_literal: true
22+33+require_relative '../errors'
44+require_relative '../jetstream'
55+require_relative 'message'
66+require_relative 'operation'
77+88+module Skyfall
99+1010+ #
1111+ # Jetstream message which includes a single operation on a record in the repo (a record was
1212+ # created, updated or deleted). Most of the messages received from Jetstream are of this type,
1313+ # and this is the type you will usually be most interested in.
1414+ #
1515+1616+ class Jetstream::CommitMessage < Jetstream::Message
1717+1818+ #
1919+ # @param json [Hash] message JSON decoded from the websocket message
2020+ # @raise [DecodeError] if the message doesn't include required data
2121+ #
2222+ def initialize(json)
2323+ raise DecodeError.new("Missing event details (commit)") if json['commit'].nil?
2424+2525+ %w(collection rkey operation).each { |f| raise DecodeError.new("Missing event details (#{f})") if json['commit'][f].nil? }
2626+2727+ super
2828+ end
2929+3030+ # Returns the record operation included in the commit.
3131+ # @return [Jetstream::Operation]
3232+ #
3333+ def operation
3434+ @operation ||= Jetstream::Operation.new(self, json['commit'])
3535+ end
3636+3737+ alias op operation
3838+3939+ # Returns record operations included in the commit. Currently a `:commit` message from
4040+ # Jetstream always includes exactly one operation, but for compatibility with
4141+ # {Skyfall::Firehose}'s API it's also returned in an array here.
4242+ #
4343+ # @return [Array<Jetstream::Operation>]
4444+ #
4545+ def operations
4646+ [operation]
4747+ end
4848+ end
4949+end
+36
lib/skyfall/jetstream/identity_message.rb
···11+# frozen_string_literal: true
22+33+require_relative '../errors'
44+require_relative '../jetstream'
55+require_relative 'message'
66+77+module Skyfall
88+99+ #
1010+ # Jetstream message sent when a new DID is created or when the details of someone's DID document
1111+ # are changed (usually either a handle change or a migration to a different PDS). The message
1212+ # should include currently assigned handle (though the field is not required).
1313+ #
1414+ # Note: the message is originally emitted from the account's PDS and is passed as is by relays,
1515+ # which means you can't fully trust that the handle is actually correctly assigned to the DID
1616+ # and verified by DNS or well-known. To confirm that, use `DID.resolve_handle` from
1717+ # [DIDKit](https://ruby.sdk.blue/didkit/).
1818+ #
1919+2020+ class Jetstream::IdentityMessage < Jetstream::Message
2121+2222+ #
2323+ # @param json [Hash] message JSON decoded from the websocket message
2424+ # @raise [DecodeError] if the message doesn't include required data
2525+ #
2626+ def initialize(json)
2727+ raise DecodeError.new("Missing event details (identity)") if json['identity'].nil?
2828+ super
2929+ end
3030+3131+ # @return [String, nil] current handle assigned to the DID
3232+ def handle
3333+ @json['identity']['handle']
3434+ end
3535+ end
3636+end
+141
lib/skyfall/jetstream/message.rb
···11+# frozen_string_literal: true
22+33+require_relative '../errors'
44+require_relative '../jetstream'
55+66+require 'time'
77+88+module Skyfall
99+1010+ # @abstract
1111+ # Abstract base class representing a Jetstream message.
1212+ #
1313+ # Actual messages are returned as instances of one of the subclasses of this class,
1414+ # depending on the type of message, most commonly as {Skyfall::Jetstream::CommitMessage}.
1515+ #
1616+ # The {new} method is overridden here so that it can be called with a JSON message from
1717+ # the websocket, and it parses the type from the JSON and builds an instance of a matching
1818+ # subclass.
1919+ #
2020+ # You normally don't need to call this class directly, unless you're building a custom
2121+ # subclass of {Skyfall::Stream} or reading raw data packets from the websocket through
2222+ # the {Skyfall::Stream#on_raw_message} event handler.
2323+2424+ class Jetstream::Message
2525+2626+ # Type of the message (e.g. `:commit`, `:identity` etc.)
2727+ # @return [Symbol]
2828+ attr_reader :type
2929+3030+ # DID of the account (repo) that the event is sent by
3131+ # @return [String]
3232+ attr_reader :did
3333+3434+ # Server timestamp of the message (in Unix time microseconds), which serves as a cursor
3535+ # when reconnecting; an equivalent of {Skyfall::Firehose::Message#seq} in CBOR firehose
3636+ # messages.
3737+ # @return [Integer]
3838+ attr_reader :time_us
3939+4040+ alias repo did
4141+ alias seq time_us
4242+ alias kind type
4343+4444+ # The raw JSON of the message as parsed from the websocket packet.
4545+ attr_reader :json
4646+4747+ #
4848+ # Parses the JSON data from a websocket message and returns an instance of an appropriate subclass.
4949+ #
5050+ # {Skyfall::Jetstream::UnknownMessage} is returned if the message type is not recognized.
5151+ #
5252+ # @param data [String] plain text payload of a Jetstream websocket message
5353+ # @return [Skyfall::Jetstream::Message]
5454+ # @raise [DecodeError] if the message doesn't include required data
5555+ #
5656+ def self.new(data)
5757+ json = JSON.parse(data)
5858+5959+ message_class = case json['kind']
6060+ when 'account' then Jetstream::AccountMessage
6161+ when 'commit' then Jetstream::CommitMessage
6262+ when 'identity' then Jetstream::IdentityMessage
6363+ else Jetstream::UnknownMessage
6464+ end
6565+6666+ if self != Jetstream::Message && self != message_class
6767+ expected_type = self.name.split('::').last.gsub(/Message$/, '').downcase
6868+ raise DecodeError, "Expected '#{expected_type}' message, got '#{json['kind']}'"
6969+ end
7070+7171+ message = message_class.allocate
7272+ message.send(:initialize, json)
7373+ message
7474+ end
7575+7676+ #
7777+ # @param json [Hash] message JSON decoded from the websocket message
7878+ # @raise [DecodeError] if the message doesn't include required data
7979+ #
8080+ def initialize(json)
8181+ %w(kind did time_us).each { |f| raise DecodeError.new("Missing event details (#{f})") if json[f].nil? }
8282+8383+ @json = json
8484+ @type = @json['kind'].to_sym
8585+ @did = @json['did']
8686+ @time_us = @json['time_us']
8787+ end
8888+8989+ #
9090+ # @return [Boolean] true if the message is {Jetstream::UnknownMessage} (of unrecognized type)
9191+ #
9292+ def unknown?
9393+ self.is_a?(Jetstream::UnknownMessage)
9494+ end
9595+9696+ # Returns a record operation included in the message. Only `:commit` messages include
9797+ # operations, but for convenience the method is declared here and returns nil in other messages.
9898+ #
9999+ # @return [nil]
100100+ #
101101+ def operation
102102+ nil
103103+ end
104104+105105+ alias op operation
106106+107107+ # List of operations on records included in the message. Only `:commit` messages include
108108+ # operations, but for convenience the method is declared here and returns an empty array
109109+ # in other messages.
110110+ #
111111+ # @return [Array<Jetstream::Operation>]
112112+ #
113113+ def operations
114114+ []
115115+ end
116116+117117+ #
118118+ # Timestamp decoded from the message.
119119+ #
120120+ # Note: the time is read from the {#time_us} field, which stores the event time as an integer in
121121+ # Unix time microseconds, and which is used as an equivalent of {Skyfall::Firehose::Message#seq}
122122+ # in CBOR firehose messages. This timestamp represents the time when the message was received
123123+ # and stored by Jetstream, which might differ a lot from the `created_at` time saved in the
124124+ # record data, e.g. if user's local time is set incorrectly or if an archive of existing posts
125125+ # was imported from another platform. It will also differ (usually only slightly) from the
126126+ # timestamp of the original CBOR message emitted from the PDS and passed through the relay.
127127+ #
128128+ # @return [Time]
129129+ #
130130+ def time
131131+ @time ||= Time.at(@time_us / 1_000_000.0)
132132+ end
133133+ end
134134+end
135135+136136+# need to be at the end because of a circular dependency
137137+138138+require_relative 'account_message'
139139+require_relative 'commit_message'
140140+require_relative 'identity_message'
141141+require_relative 'unknown_message'
+110
lib/skyfall/jetstream/operation.rb
···11+# frozen_string_literal: true
22+33+require_relative '../collection'
44+require_relative '../jetstream'
55+66+module Skyfall
77+88+ #
99+ # A single record operation from a Jetstream commit event. An operation is a new record being
1010+ # created, or an existing record modified or deleted. It includes the URI and other details of
1111+ # the record in question, type of the action taken, and record data for "created" and "update"
1212+ # actions.
1313+ #
1414+ # Note: when a record is deleted, the previous record data is *not* included in the commit, only
1515+ # its URI. This means that if you're tracking records which are referencing other records, e.g.
1616+ # follow, block, or like records, you need to store information about this referencing record
1717+ # including an URI or rkey, because in case of a delete, you will not get information about which
1818+ # post was unliked or which account was unfollowed, only which like/follow record was deleted.
1919+ #
2020+ # At the moment, Skyfall doesn't parse the record data into any rich models specific for a given
2121+ # record type with a convenient API, but simply returns them as `Hash` objects (see {#raw_record}).
2222+ # In the future, a second `#record` method might be added which returns a parsed record model.
2323+ #
2424+2525+ class Jetstream::Operation
2626+2727+ #
2828+ # @param message [Skyfall::Jetstream::Message] commit message the operation is parsed from
2929+ # @param json [Hash] operation data
3030+ #
3131+ def initialize(message, json)
3232+ @message = message
3333+ @json = json
3434+ end
3535+3636+ # @return [String] DID of the account/repository in which the operation happened
3737+ def repo
3838+ @message.repo
3939+ end
4040+4141+ alias did repo
4242+4343+ # @return [String] path part of the record URI (collection + rkey)
4444+ # @deprecated Use {#collection} + {#rkey}
4545+ def path
4646+ @@path_warning_printed ||= false
4747+4848+ unless @@path_warning_printed
4949+ $stderr.puts "Warning: Skyfall::Jetstream::Operation#path is deprecated - use #collection + #rkey"
5050+ @@path_warning_printed = true
5151+ end
5252+5353+ @json['collection'] + '/' + @json['rkey']
5454+ end
5555+5656+ # @return [Symbol] type of the operation (`:create`, `:update` or `:delete`)
5757+ def action
5858+ @json['operation'].to_sym
5959+ end
6060+6161+ # @return [String] record collection NSID
6262+ def collection
6363+ @json['collection']
6464+ end
6565+6666+ # @return [String] record rkey
6767+ def rkey
6868+ @json['rkey']
6969+ end
7070+7171+ # @return [String] full AT URI of the record
7272+ def uri
7373+ "at://#{repo}/#{collection}/#{rkey}"
7474+ end
7575+7676+ # @return [CID, nil] CID (Content Identifier) of the record (nil for delete operations)
7777+ def cid
7878+ @cid ||= @json['cid'] && CID.from_json(@json['cid'])
7979+ end
8080+8181+ # @return [Hash, nil] record data as a plain Ruby Hash (nil for delete operations)
8282+ def raw_record
8383+ @json['record']
8484+ end
8585+8686+ # Symbol short code of the collection, like `:bsky_post`. If the collection NSID is not
8787+ # recognized, the type is `:unknown`. The full NSID is always available through the
8888+ # `#collection` property.
8989+ #
9090+ # @return [Symbol]
9191+ # @see Skyfall::Collection
9292+ #
9393+ def type
9494+ Collection.short_code(collection)
9595+ end
9696+9797+ # Returns a string with a representation of the object for debugging purposes.
9898+ # @return [String]
9999+ def inspect
100100+ vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
101101+ "#<#{self.class}:0x#{object_id} #{vars}>"
102102+ end
103103+104104+ private
105105+106106+ def inspectable_variables
107107+ instance_variables - [:@message]
108108+ end
109109+ end
110110+end
+14
lib/skyfall/jetstream/unknown_message.rb
···11+# frozen_string_literal: true
22+33+require_relative '../jetstream'
44+require_relative 'message'
55+66+module Skyfall
77+88+ #
99+ # Jetstream message of an unrecognized type.
1010+ #
1111+1212+ class Jetstream::UnknownMessage < Jetstream::Message
1313+ end
1414+end
+196
lib/skyfall/jetstream.rb
···11+# frozen_string_literal: true
22+33+require_relative 'stream'
44+55+require 'json'
66+require 'time'
77+require 'uri'
88+99+module Skyfall
1010+1111+ #
1212+ # Client of a Jetstream service (JSON-based firehose).
1313+ #
1414+ # This is an equivalent of {Skyfall::Firehose} for Jetstream sources, mirroring its API.
1515+ # It returns messages as instances of subclasses of {Skyfall::Jetstream::Message}, which
1616+ # are generally equivalent to the respective {Skyfall::Firehose::Message} variants as much
1717+ # as possible.
1818+ #
1919+ # To connect to a Jetstream websocket, you need to:
2020+ #
2121+ # * create an instance of Jetstream, passing it the hostname/URL of the server, and optionally
2222+ # parameters such as cursor or collection/DID filters
2323+ # * set up callbacks to be run when connecting, disconnecting, when a message is received etc.
2424+ # (you need to set at least a message handler)
2525+ # * call {#connect} to start the connection
2626+ # * handle the received messages
2727+ #
2828+ # @example
2929+ # client = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', {
3030+ # wanted_collections: 'app.bsky.feed.post',
3131+ # wanted_dids: @dids
3232+ # })
3333+ #
3434+ # client.on_message do |msg|
3535+ # next unless msg.type == :commit
3636+ #
3737+ # op = msg.operation
3838+ #
3939+ # if op.type == :bsky_post && op.action == :create
4040+ # puts "[#{msg.time}] #{msg.repo}: #{op.raw_record['text']}"
4141+ # end
4242+ # end
4343+ #
4444+ # client.connect
4545+ #
4646+ # # You might also want to set some or all of these lifecycle callback handlers:
4747+ #
4848+ # client.on_connecting { |url| puts "Connecting to #{url}..." }
4949+ # client.on_connect { puts "Connected" }
5050+ # client.on_disconnect { puts "Disconnected" }
5151+ # client.on_reconnect { puts "Connection lost, trying to reconnect..." }
5252+ # client.on_timeout { puts "Connection stalled, triggering a reconnect..." }
5353+ # client.on_error { |e| puts "ERROR: #{e}" }
5454+ #
5555+ # @note Most of the methods of this class that you might want to use are defined in {Skyfall::Stream}.
5656+ #
5757+5858+ class Jetstream < Stream
5959+6060+ # Current cursor (time of the last seen message)
6161+ # @return [Integer, nil]
6262+ attr_accessor :cursor
6363+6464+ #
6565+ # @param server [String] Address of the server to connect to.
6666+ # Expects a string with either just a hostname, or a ws:// or wss:// URL with no path.
6767+ # @param params [Hash] options, see below:
6868+ #
6969+ # @option params [Integer] :cursor
7070+ # cursor from which to resume
7171+ #
7272+ # @option params [Array<String>] :wanted_dids
7373+ # DID filter to pass to the server (`:wantedDids` is also accepted);
7474+ # value should be a DID string or an array of those
7575+ #
7676+ # @option params [Array<String, Symbol>] :wanted_collections
7777+ # collection filter to pass to the server (`:wantedCollections` is also accepted);
7878+ # value should be an NSID string or a symbol shorthand, or an array of those
7979+ #
8080+ # @raise [ArgumentError] if the server parameter or the options are invalid
8181+ #
8282+ def initialize(server, params = {})
8383+ require_relative 'jetstream/message'
8484+ super(server)
8585+8686+ @params = check_params(params)
8787+ @cursor = @params.delete(:cursor)
8888+ @root_url = ensure_empty_path(@root_url)
8989+ end
9090+9191+9292+ protected
9393+9494+ # Returns the full URL of the websocket endpoint to connect to.
9595+ # @return [String]
9696+9797+ def build_websocket_url
9898+ params = @cursor ? @params.merge(cursor: @cursor) : @params
9999+ query = URI.encode_www_form(params)
100100+101101+ @root_url + "/subscribe" + (query.length > 0 ? "?#{query}" : '')
102102+ end
103103+104104+ # Processes a single message received from the websocket. Passes the received data to the
105105+ # {#on_raw_message} handler, builds a {Skyfall::Jetstream::Message} object, and passes it to
106106+ # the {#on_message} handler (if defined). Also updates the {#cursor} to this message's
107107+ # microsecond timestamp (note: this is skipped if {#on_message} is not set).
108108+ #
109109+ # @param msg
110110+ # {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/API/MessageEvent Faye::WebSocket::API::MessageEvent}
111111+ # @return [nil]
112112+113113+ def handle_message(msg)
114114+ data = msg.data
115115+ @handlers[:raw_message]&.call(data)
116116+117117+ if @handlers[:message]
118118+ jet_message = Message.new(data)
119119+ @cursor = jet_message.time_us
120120+ @handlers[:message].call(jet_message)
121121+ else
122122+ @cursor = nil
123123+ end
124124+ end
125125+126126+127127+ private
128128+129129+ def check_params(params)
130130+ params ||= {}
131131+ processed = {}
132132+133133+ raise ArgumentError.new("Params should be a hash") unless params.is_a?(Hash)
134134+135135+ params.each do |k, v|
136136+ next if v.nil?
137137+138138+ if k.is_a?(Symbol)
139139+ k = k.to_s
140140+ elsif !k.is_a?(String)
141141+ raise ArgumentError.new("Invalid params key: #{k.inspect}")
142142+ end
143143+144144+ k = k.gsub(/_([a-zA-Z])/) { $1.upcase }.to_sym
145145+ processed[k] = check_option(k, v)
146146+ end
147147+148148+ processed
149149+ end
150150+151151+ def check_option(k, v)
152152+ case k
153153+ when :wantedCollections
154154+ check_wanted_collections(v)
155155+ when :wantedDids
156156+ check_wanted_dids(v)
157157+ when :cursor
158158+ check_cursor(v)
159159+ when :compress, :requireHello
160160+ raise ArgumentError.new("Skyfall::Jetstream doesn't support the #{k.inspect} option yet")
161161+ else
162162+ raise ArgumentError.new("Unknown option: #{k.inspect}")
163163+ end
164164+ end
165165+166166+ def check_wanted_collections(list)
167167+ list = [list] unless list.is_a?(Array)
168168+169169+ list.map do |c|
170170+ if c.is_a?(String)
171171+ # TODO: more validation
172172+ c
173173+ elsif c.is_a?(Symbol)
174174+ Collection.from_short_code(c) or raise ArgumentError.new("Unknown collection symbol: #{c.inspect}")
175175+ else
176176+ raise ArgumentError.new("Invalid collection argument: #{c.inspect}")
177177+ end
178178+ end
179179+ end
180180+181181+ def check_wanted_dids(list)
182182+ list = [list] unless list.is_a?(Array)
183183+184184+ if x = list.detect { |c| !c.is_a?(String) || c !~ /\Adid:[a-z]+:/ }
185185+ raise ArgumentError.new("Invalid DID argument: #{x.inspect}")
186186+ end
187187+188188+ # TODO: more validation
189189+ list
190190+ end
191191+192192+ def check_cursor(cursor)
193193+ cursor.to_i
194194+ end
195195+ end
196196+end
+96
lib/skyfall/label.rb
···11+# frozen_string_literal: true
22+33+require_relative 'errors'
44+require 'time'
55+66+module Skyfall
77+88+ #
99+ # A single label emitted from the "subscribeLabels" firehose of a labeller service.
1010+ #
1111+ # The label assigns some specific value - from a list of available values defined by this
1212+ # labeller - to a specific target (at:// URI or a DID). In general, this will usually be either
1313+ # a "badge" that a user requested to be assigned to themselves from a fun/informative labeller,
1414+ # or some kind of (likely negative) label assigned to a user or post by a moderation labeller.
1515+ #
1616+ # You generally don't need to create instances of this class manually, but will receive them
1717+ # from {Skyfall::Firehose} that's connected to `:subscribe_labels` in the {Stream#on_message}
1818+ # callback handler (wrapped in a {Skyfall::Firehose::LabelsMessage}).
1919+ #
2020+2121+ class Label
2222+2323+ # @return [Hash] the label's JSON data
2424+ attr_reader :data
2525+2626+ #
2727+ # @param data [Hash] raw label JSON
2828+ # @raise [Skyfall::DecodeError] if the data has an invalid format
2929+ # @raise [Skyfall::UnsupportedError] if the label is in an unsupported future version
3030+ #
3131+ def initialize(data)
3232+ @data = data
3333+3434+ raise DecodeError.new("Missing version: #{data}") unless data.has_key?('ver')
3535+ raise DecodeError.new("Invalid version: #{ver}") unless ver.is_a?(Integer) && ver >= 1
3636+ raise UnsupportedError.new("Unsupported version: #{ver}") unless ver == 1
3737+3838+ raise DecodeError.new("Missing source: #{data}") unless data.has_key?('src')
3939+ raise DecodeError.new("Invalid source: #{src}") unless src.is_a?(String) && src.start_with?('did:')
4040+4141+ raise DecodeError.new("Missing uri: #{data}") unless data.has_key?('uri')
4242+ raise DecodeError.new("Invalid uri: #{uri}") unless uri.is_a?(String)
4343+ raise DecodeError.new("Invalid uri: #{uri}") unless uri.start_with?('at://') || uri.start_with?('did:')
4444+ end
4545+4646+ # @return [Integer] label format version number
4747+ def version
4848+ @data['ver']
4949+ end
5050+5151+ # DID of the labelling authority (the labeller service).
5252+ # @return [String]
5353+ def authority
5454+ @data['src']
5555+ end
5656+5757+ # AT URI or DID of the labelled subject (e.g. a user or post).
5858+ # @return [String]
5959+ def subject
6060+ @data['uri']
6161+ end
6262+6363+ # @return [CID, nil] CID of the specific version of the subject that this label applies to
6464+ def cid
6565+ @cid ||= @data['cid'] && CID.from_json(@data['cid'])
6666+ end
6767+6868+ # @return [String] label value
6969+ def value
7070+ @data['val']
7171+ end
7272+7373+ # @return [Boolean] if true, then this is a negation (delete) of an existing label
7474+ def negation?
7575+ !!@data['neg']
7676+ end
7777+7878+ # @return [Time] timestamp when the label was created
7979+ def created_at
8080+ @created_at ||= Time.parse(@data['cts'])
8181+ end
8282+8383+ # @return [Time, nil] optional timestamp when the label expires
8484+ def expires_at
8585+ @expires_at ||= @data['exp'] && Time.parse(@data['exp'])
8686+ end
8787+8888+ alias ver version
8989+ alias src authority
9090+ alias uri subject
9191+ alias val value
9292+ alias neg negation?
9393+ alias cts created_at
9494+ alias exp expires_at
9595+ end
9696+end
···11-module Skyfall
22- class HandleMessage < WebsocketMessage
33- def handle
44- @data_object['handle']
55- end
66- end
77-end
-22
lib/skyfall/messages/info_message.rb
···11-module Skyfall
22- class InfoMessage < WebsocketMessage
33- attr_reader :name, :message
44-55- OUTDATED_CURSOR = "OutdatedCursor"
66-77- def initialize(type_object, data_object)
88- super
99-1010- @name = @data_object['name']
1111- @message = @data_object['message']
1212- end
1313-1414- def to_s
1515- (@name || "InfoMessage") + (@message ? ": #{@message}" : "")
1616- end
1717-1818- def inspectable_variables
1919- super - [:@did, :@seq]
2020- end
2121- end
2222-end
-4
lib/skyfall/messages/tombstone_message.rb
···11-module Skyfall
22- class TombstoneMessage < WebsocketMessage
33- end
44-end
-4
lib/skyfall/messages/unknown_message.rb
···11-module Skyfall
22- class UnknownMessage < WebsocketMessage
33- end
44-end
-90
lib/skyfall/messages/websocket_message.rb
···11-require_relative '../errors'
22-require_relative '../extensions'
33-44-require 'cbor'
55-require 'time'
66-77-module Skyfall
88- class WebsocketMessage
99- using Skyfall::Extensions
1010-1111- require_relative 'commit_message'
1212- require_relative 'handle_message'
1313- require_relative 'info_message'
1414- require_relative 'tombstone_message'
1515- require_relative 'unknown_message'
1616-1717- attr_reader :type_object, :data_object
1818- attr_reader :type, :did, :seq
1919-2020- alias repo did
2121-2222- def self.new(data)
2323- type_object, data_object = decode_cbor_objects(data)
2424-2525- message_class = case type_object['t']
2626- when '#commit' then CommitMessage
2727- when '#handle' then HandleMessage
2828- when '#info' then InfoMessage
2929- when '#tombstone' then TombstoneMessage
3030- else UnknownMessage
3131- end
3232-3333- message = message_class.allocate
3434- message.send(:initialize, type_object, data_object)
3535- message
3636- end
3737-3838- def initialize(type_object, data_object)
3939- @type_object = type_object
4040- @data_object = data_object
4141-4242- @type = @type_object['t'][1..-1].to_sym
4343- @did = @data_object['repo'] || @data_object['did']
4444- @seq = @data_object['seq']
4545- end
4646-4747- def operations
4848- []
4949- end
5050-5151- def time
5252- @time ||= @data_object['time'] && Time.parse(@data_object['time'])
5353- end
5454-5555- def inspectable_variables
5656- instance_variables - [:@type_object, :@data_object, :@blocks]
5757- end
5858-5959- def inspect
6060- vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
6161- "#<#{self.class}:0x#{object_id} #{vars}>"
6262- end
6363-6464- private
6565-6666- def self.decode_cbor_objects(data)
6767- objects = CBOR.decode_sequence(data)
6868-6969- if objects.length < 2
7070- raise DecodeError.new("Malformed message: #{objects.inspect}")
7171- elsif objects.length > 2
7272- raise DecodeError.new("Invalid number of objects: #{objects.length}")
7373- end
7474-7575- type, data = objects
7676-7777- if data['error']
7878- raise SubscriptionError.new(data['error'], data['message'])
7979- end
8080-8181- raise DecodeError.new("Invalid object type: #{type}") unless type.is_a?(Hash)
8282- raise UnsupportedError.new("Unexpected CBOR object: #{type}") unless type['op'] == 1
8383- raise DecodeError.new("Missing data: #{type} #{objects.inspect}") unless type['op'] && type['t']
8484- raise DecodeError.new("Invalid message type: #{type['t']}") unless type['t'].start_with?('#')
8585- raise DecodeError.new("Invalid object type: #{data}") unless data.is_a?(Hash)
8686-8787- [type, data]
8888- end
8989- end
9090-end
-56
lib/skyfall/operation.rb
···11-require_relative 'collection'
22-33-module Skyfall
44- class Operation
55- def initialize(message, json)
66- @message = message
77- @json = json
88- end
99-1010- def repo
1111- @message.repo
1212- end
1313-1414- def path
1515- @json['path']
1616- end
1717-1818- def action
1919- @json['action'].to_sym
2020- end
2121-2222- def collection
2323- @json['path'].split('/')[0]
2424- end
2525-2626- def rkey
2727- @json['path'].split('/')[1]
2828- end
2929-3030- def uri
3131- "at://#{repo}/#{path}"
3232- end
3333-3434- def cid
3535- @cid ||= @json['cid'] && CID.from_cbor_tag(@json['cid'])
3636- end
3737-3838- def raw_record
3939- @raw_record ||= cid && @message.blocks.section_with_cid(cid)
4040- end
4141-4242- def type
4343- case collection
4444- when Collection::BSKY_POST then :bsky_post
4545- when Collection::BSKY_LIKE then :bsky_like
4646- when Collection::BSKY_FOLLOW then :bsky_follow
4747- when Collection::BSKY_REPOST then :bsky_repost
4848- when Collection::BSKY_BLOCK then :bsky_block
4949- when Collection::BSKY_PROFILE then :bsky_profile
5050- when Collection::BSKY_LISTITEM then :bsky_listitem
5151- when Collection::BSKY_FEED then :bsky_feed
5252- else :unknown
5353- end
5454- end
5555- end
5656-end
+352-64
lib/skyfall/stream.rb
···11-require_relative 'messages/websocket_message'
11+# frozen_string_literal: true
2233require 'eventmachine'
44require 'faye/websocket'
55require 'uri'
6677+require_relative 'errors'
88+require_relative 'events'
99+require_relative 'version'
1010+711module Skyfall
88- class Stream
99- SUBSCRIBE_REPOS = "com.atproto.sync.subscribeRepos"
10121111- NAMED_ENDPOINTS = {
1212- :subscribe_repos => SUBSCRIBE_REPOS
1313- }
1313+ # Base class of a websocket client. It provides basic websocket client functionality such as
1414+ # connecting to the service, keeping the connection alive and running lifecycle callbacks.
1515+ #
1616+ # In most cases, you will not create instances of this class directly, but rather use either
1717+ # {Firehose} or {Jetstream}. Use this class as a superclass if you need to implement some
1818+ # custom client for a websocket API that isn't supported yet.
1919+2020+ class Stream
2121+ extend Events
14221523 MAX_RECONNECT_INTERVAL = 300
16241717- attr_accessor :heartbeat_timeout, :heartbeat_interval, :cursor, :auto_reconnect
2525+ # If enabled, the client will try to reconnect if the connection is closed unexpectedly.
2626+ # (Default: true)
2727+ #
2828+ # When the reconnect attempt fails, it will wait with an exponential backoff delay before
2929+ # retrying again, up to {MAX_RECONNECT_INTERVAL} seconds.
3030+ #
3131+ # @return [Boolean]
3232+ attr_accessor :auto_reconnect
18331919- def initialize(server, endpoint, cursor = nil)
2020- @endpoint = check_endpoint(endpoint)
2121- @server = check_hostname(server)
2222- @cursor = cursor
3434+ # User agent sent in the header when connecting.
3535+ #
3636+ # Default value is {#default_user_agent} = {#version_string} `(Skyfall/x.y)`. It's recommended
3737+ # to set it or extend it with some information that indicates what service this is and who is
3838+ # running it (e.g. a Bluesky handle).
3939+ #
4040+ # @return [String]
4141+ # @example
4242+ # client.user_agent = "my.service (@my.handle) #{client.version_string}"
4343+ attr_accessor :user_agent
4444+4545+ # If enabled, runs a timer which does periodical "heatbeat checks".
4646+ #
4747+ # The heartbeat timer is started when the client connects to the service, and checks if the stream
4848+ # hasn't stalled and is still regularly sending new messages. If no messages are detected for some
4949+ # period of time, the client forces a reconnect.
5050+ #
5151+ # This is **not** enabled by default, because depending on the service you're connecting to, it
5252+ # might be normal to not receive any messages for a while.
5353+ #
5454+ # @see #heartbeat_timeout
5555+ # @see #heartbeat_interval
5656+ # @return [Boolean]
5757+ attr_accessor :check_heartbeat
5858+5959+ # Interval in seconds between heartbeat checks (default: 10). Only used if {#check_heartbeat} is set.
6060+ # @return [Numeric]
6161+ attr_accessor :heartbeat_interval
6262+6363+ # Number of seconds without messages after which reconnect is triggered (default: 300).
6464+ # Only used if {#check_heartbeat} is set.
6565+ # @return [Numeric]
6666+ attr_accessor :heartbeat_timeout
6767+6868+ # Time when the most recent message was received from the websocket.
6969+ #
7070+ # Note: this is _local time_ when the message was received; this is different from the timestamp
7171+ # of the message, which is the server time of the original source (PDS) when emitting the message,
7272+ # and different from a potential `created_at` saved in the record.
7373+ #
7474+ # @return [Time, nil]
7575+ attr_reader :last_update
7676+7777+ #
7878+ # @param server [String] Address of the server to connect to.
7979+ # Expects a string with either just a hostname, or a ws:// or wss:// URL.
8080+ #
8181+ # @raise [ArgumentError] if the server parameter is invalid
8282+ #
8383+ def initialize(server)
8484+ @root_url = build_root_url(server)
8585+2386 @handlers = {}
2424- @heartbeat_mutex = Mutex.new
2525- @heartbeat_interval = 5
2626- @heartbeat_timeout = 30
2727- @last_update = nil
2887 @auto_reconnect = true
8888+ @check_heartbeat = false
2989 @connection_attempts = 0
9090+ @heartbeat_interval = 10
9191+ @heartbeat_timeout = 300
9292+ @last_update = nil
9393+ @user_agent = default_user_agent
9494+9595+ @handlers[:error] = proc { |e| puts "ERROR: #{e}" }
3096 end
31979898+ #
9999+ # Opens a connection to the configured websocket.
100100+ #
101101+ # This method starts an EventMachine reactor on the current thread, and will only return
102102+ # once the connection is closed.
103103+ #
104104+ # @return [nil]
105105+ # @raise [ConfigError] if no message handler has been configured
106106+ # @raise [ReactorActiveError] if another stream is already running
107107+ #
32108 def connect
33109 return if @ws
34110111111+ if @handlers[:message].nil? && @handlers[:raw_message].nil?
112112+ raise ConfigError, "Either on_message or on_raw_message handler needs to be set"
113113+ end
114114+35115 url = build_websocket_url
3611637117 @handlers[:connecting]&.call(url)
118118+119119+ @reconnect_timer&.cancel
120120+ @reconnect_timer = nil
121121+122122+ raise ReactorActiveError if existing_reactor?
123123+38124 @engines_on = true
3912540126 EM.run do
···42128 @handlers[:error]&.call(e)
43129 end
441304545- @ws = Faye::WebSocket::Client.new(url)
131131+ @ws = build_websocket_client(url)
4613247133 @ws.on(:open) do |e|
48134 @handlers[:connect]&.call
135135+ @last_update = Time.now
136136+ start_heartbeat_timer
49137 end
5013851139 @ws.on(:message) do |msg|
140140+ @reconnecting = false
52141 @connection_attempts = 0
5353-5454- data = msg.data.pack('C*')
5555- @handlers[:raw_message]&.call(data)
5656-5757- if @handlers[:message]
5858- atp_message = Skyfall::WebsocketMessage.new(data)
5959- @cursor = atp_message.seq
6060- @handlers[:message].call(atp_message)
6161- else
6262- @cursor = nil
6363- end
142142+ @last_update = Time.now
143143+ handle_message(msg)
64144 end
6514566146 @ws.on(:error) do |e|
···70150 @ws.on(:close) do |e|
71151 @ws = nil
721527373- if @auto_reconnect && @engines_on
7474- EM.add_timer(reconnect_delay) do
153153+ if @reconnecting || @auto_reconnect && @engines_on
154154+ @handlers[:reconnect]&.call
155155+156156+ @reconnect_timer&.cancel
157157+ @reconnect_timer = EM::Timer.new(reconnect_delay) do
75158 @connection_attempts += 1
7676- @handlers[:reconnect]&.call
77159 connect
78160 end
79161 else
162162+ stop_heartbeat_timer
80163 @engines_on = false
81164 @handlers[:disconnect]&.call
82165 EM.stop_event_loop unless @ws
···85168 end
86169 end
87170171171+ #
172172+ # Forces a reconnect, closing the connection and calling {#connect} again.
173173+ # @return [nil]
174174+ #
175175+ def reconnect
176176+ @reconnecting = true
177177+ @connection_attempts = 0
178178+179179+ @ws ? @ws.close : connect
180180+ end
181181+182182+ #
183183+ # Closes the connection and stops the EventMachine reactor thread.
184184+ # @return [nil]
185185+ #
88186 def disconnect
89187 return unless EM.reactor_running?
90188189189+ @reconnecting = false
91190 @engines_on = false
92191 EM.stop_event_loop
93192 end
9419395194 alias close disconnect
961959797- def on_message(&block)
9898- @handlers[:message] = block
196196+ #
197197+ # Default user agent sent when connecting to the service. (Currently `"#{version_string}"`)
198198+ # @return [String]
199199+ #
200200+ def default_user_agent
201201+ version_string
99202 end
100203101101- def on_raw_message(&block)
102102- @handlers[:raw_message] = block
204204+ #
205205+ # Skyfall version string for use in user agent strings (`"Skyfall/x.y"`).
206206+ # @return [String]
207207+ #
208208+ def version_string
209209+ "Skyfall/#{Skyfall::VERSION}"
103210 end
104211105105- def on_connecting(&block)
106106- @handlers[:connecting] = block
212212+ def check_heartbeat=(value)
213213+ @check_heartbeat = value
214214+215215+ if @check_heartbeat && @engines_on && @ws && !@heartbeat_timer
216216+ start_heartbeat_timer
217217+ elsif !@check_heartbeat && @heartbeat_timer
218218+ stop_heartbeat_timer
219219+ end
107220 end
108221109109- def on_connect(&block)
110110- @handlers[:connect] = block
222222+223223+ # @!method on_connecting(block)
224224+ # Defines a callback to be run when the client tries to open a connection to the websocket.
225225+ # Can be also run as a setter `on_connecting=`.
226226+ # @param [Proc] block
227227+ # @yieldparam [String] url URL to which the client is connecting
228228+ # @return [nil]
229229+230230+ event_handler :connecting
231231+232232+ # @!method on_connect(block)
233233+ # Defines a callback to be run after a connection to the websocket is opened.
234234+ # Can be also run as a setter `on_connect=`.
235235+ # @param [Proc] block
236236+ # @return [nil]
237237+238238+ event_handler :connect
239239+240240+ # @!method on_raw_message(block)
241241+ # Defines a callback to be run when a message is received, passing a raw data packet as
242242+ # received from the websocket (plain text or binary). Can be also run as a setter `on_raw_message=`.
243243+ # @param [Proc] block
244244+ # @yieldparam [String] data payload of the received message
245245+ # @return [nil]
246246+247247+ event_handler :raw_message
248248+249249+ # @!method on_message(block)
250250+ # Defines a callback to be run when a message is received, passing the message as a parsed
251251+ # object of an appropriate message class. Can be also run as a setter `on_message=`.
252252+ # @param [Proc] block
253253+ # @yieldparam [Object] message parsed message of an appropriate class
254254+ # @return [nil]
255255+256256+ event_handler :message
257257+258258+ # @!method on_disconnect(block)
259259+ # Defines a callback to be run after a connection to the websocket is closed (and the client
260260+ # does not reconnect). Can be also run as a setter `on_disconnect=`.
261261+ #
262262+ # This callback is not run when `on_reconnect` fires.
263263+ # @param [Proc] block
264264+ # @return [nil]
265265+266266+ event_handler :disconnect
267267+268268+ # @!method on_reconnect(block)
269269+ # Defines a callback to be run when a connection to the websocket is broken, but the client
270270+ # initiates or schedules a reconnect (which may happen after a delay). Can be also run as
271271+ # a setter `on_reconnect=`.
272272+ # @param [Proc] block
273273+ # @return [nil]
274274+275275+ event_handler :reconnect
276276+277277+ # @!method on_timeout(block)
278278+ # Defines a callback to be run when the heartbeat timer forces a reconnect. A reconnect is
279279+ # triggered after not receiving any messages for a period of time specified in {#heartbeat_timeout}
280280+ # (if {#check_heartbeat} is enabled). Can be also run as a setter `on_timeout=`.
281281+ #
282282+ # This callback is also followed by `on_reconnect`.
283283+ # @param [Proc] block
284284+ # @return [nil]
285285+286286+ event_handler :timeout
287287+288288+ # @!method on_error(block)
289289+ # Defines a callback to be run when the websocket connection returns an error. Can be also
290290+ # run as a setter `on_error=`.
291291+ #
292292+ # Default handler prints the error to stdout.
293293+ #
294294+ # @param [Proc] block
295295+ # @yieldparam [Exception] error the received error
296296+ # @return [nil]
297297+298298+ event_handler :error
299299+300300+301301+ # Returns a string with a representation of the object for debugging purposes.
302302+ # @return [String]
303303+ def inspect
304304+ vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
305305+ "#<#{self.class}:0x#{object_id} #{vars}>"
111306 end
112307113113- def on_disconnect(&block)
114114- @handlers[:disconnect] = block
308308+309309+ protected
310310+311311+ # @note This method is designed to be overridden in subclasses.
312312+ #
313313+ # Returns the full URL of the websocket endpoint to connect to, with path and query parameters
314314+ # if needed. The base implementation simply returns the base URL passed to the initializer.
315315+ #
316316+ # Override this method in subclasses to point to the specific endpoint and add necessary
317317+ # parameters like cursor or filters, depending on the arguments passed to the constructor.
318318+ #
319319+ # @return [String]
320320+321321+ def build_websocket_url
322322+ @root_url
115323 end
116324117117- def on_error(&block)
118118- @handlers[:error] = block
325325+ # Builds and configures a websocket client object that is used to connect to the requested service.
326326+ #
327327+ # @return [Faye::WebSocket::Client]
328328+ # see {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/Client Faye::WebSocket::Client}
329329+330330+ def build_websocket_client(url)
331331+ Faye::WebSocket::Client.new(url, nil, { headers: { 'User-Agent' => user_agent }.merge(request_headers) })
119332 end
120333121121- def on_reconnect(&block)
122122- @handlers[:reconnect] = block
334334+ # @note This method is designed to be overridden in subclasses.
335335+ #
336336+ # Processes a single message received from the websocket. The implementation is expected to
337337+ # parse the message from a plain text or binary form, build an appropriate message object,
338338+ # and call the `:message` and/or `:raw_message` callback handlers, passing the right parameters.
339339+ #
340340+ # The base implementation simply takes the message data and passes it as is to `:raw_message`,
341341+ # and does not call `:message` at all.
342342+ #
343343+ # @param msg
344344+ # {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/API/MessageEvent Faye::WebSocket::API::MessageEvent}
345345+ # @return [nil]
346346+347347+ def handle_message(msg)
348348+ data = msg.data
349349+ @handlers[:raw_message]&.call(data)
350350+ end
351351+352352+ # Additional headers to pass with the request when connecting to the websocket endpoint.
353353+ # The user agent header (built from {#user_agent}) is added separately.
354354+ #
355355+ # The base implementation returns an empty hash.
356356+ #
357357+ # @return [Hash] a hash of `{ header_name => header_value }`
358358+359359+ def request_headers
360360+ {}
361361+ end
362362+363363+ # Returns the underlying websocket client object. It can be used e.g. to send messages back
364364+ # to the server (but see also: {#send_data}).
365365+ #
366366+ # @return [Faye::WebSocket::Client]
367367+ # see {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/Client Faye::WebSocket::Client}
368368+369369+ def socket
370370+ @ws
371371+ end
372372+373373+ # Sends a message back to the server.
374374+ #
375375+ # @param data [String, Array] the message to send -
376376+ # a string for text websockets, a binary string or byte array for binary websockets
377377+ # @return [Boolean] true if the message was sent successfully
378378+379379+ def send_data(data)
380380+ @ws.send(data)
381381+ end
382382+383383+ # @return [Array<Symbol>] list of instance variables to be printed in the {#inspect} output
384384+ def inspectable_variables
385385+ instance_variables - [:@handlers, :@ws]
123386 end
124387125388126389 private
127390391391+ def existing_reactor?
392392+ EM.reactor_running? && !@engines_on
393393+ end
394394+395395+ def start_heartbeat_timer
396396+ return if !@check_heartbeat || @heartbeat_interval.to_f <= 0 || @heartbeat_timeout.to_f <= 0
397397+ return if @heartbeat_timer
398398+399399+ @heartbeat_timer = EM::PeriodicTimer.new(@heartbeat_interval) do
400400+ next if @ws.nil? || @heartbeat_timeout.to_f <= 0
401401+ time_passed = Time.now - @last_update
402402+403403+ if time_passed > @heartbeat_timeout
404404+ @handlers[:timeout]&.call
405405+ reconnect
406406+ end
407407+ end
408408+ end
409409+410410+ def stop_heartbeat_timer
411411+ @heartbeat_timer&.cancel
412412+ @heartbeat_timer = nil
413413+ end
414414+128415 def reconnect_delay
129416 if @connection_attempts == 0
130417 0
···133420 end
134421 end
135422136136- def build_websocket_url
137137- url = "wss://#{@server}/xrpc/#{@endpoint}"
138138- url += "?cursor=#{@cursor}" if @cursor
139139- url
140140- end
423423+ def build_root_url(server)
424424+ if !server.is_a?(String)
425425+ raise ArgumentError, "Server parameter should be a string"
426426+ end
141427142142- def check_endpoint(endpoint)
143143- if endpoint.is_a?(String)
144144- raise ArgumentError("Invalid endpoint name: #{endpoint}") if endpoint.strip.empty? || !endpoint.include?('.')
145145- elsif endpoint.is_a?(Symbol)
146146- raise ArgumentError("Unknown endpoint: #{endpoint}") if NAMED_ENDPOINTS[endpoint].nil?
147147- endpoint = NAMED_ENDPOINTS[endpoint]
428428+ if server.include?('://')
429429+ uri = URI(server)
430430+431431+ if uri.scheme != 'ws' && uri.scheme != 'wss'
432432+ raise ArgumentError, "Server parameter should be a hostname or a ws:// or wss:// URL"
433433+ end
434434+435435+ uri.to_s
148436 else
149149- raise ArgumentError("Endpoint should be a string or a symbol")
437437+ server = "wss://#{server}"
438438+ uri = URI(server) # raises if invalid
439439+ server
150440 end
151151-152152- endpoint
153441 end
154442155155- def check_hostname(server)
156156- if server.is_a?(String)
157157- raise ArgumentError("Invalid server name: #{server}") if server.strip.empty? || server.include?('/')
158158- else
159159- raise ArgumentError("Server name should be a string")
443443+ def ensure_empty_path(url)
444444+ url = url.chomp('/')
445445+446446+ if URI(url).path != ''
447447+ raise ArgumentError, "Server URL should only include a hostname, without any path"
160448 end
161449162162- server
450450+ url
163451 end
164452 end
165453end
+1-1
lib/skyfall/version.rb
···11# frozen_string_literal: true
2233module Skyfall
44- VERSION = "0.2.2"
44+ VERSION = "0.6.1"
55end