···00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001## [0.2.2] - 2023-09-06
23- fixed image CIDs returned in the record JSON as CBOR tag objects (they are now returned decoded to the string form)
45## [0.2.1] - 2023-08-19
67-- optimized `WebsocketMessage` parsing performance - lazy parsing of most properties (message decoding should be over 50% faster on average)
8- added separate subclasses of `WebsocketMessage` for different message types
9- added support for `#handle`, `#info` and `#tombstone` message types
10- `UnknownMessage` is returned for unrecognized message types
···12## [0.2.0] - 2023-07-24
1314- switched the websocket library from `websocket-client-simple` to `faye-websocket`, which should make event parsing up to ~30ร faster (!)
15-- added `auto_reconnect` property to `Stream` (on by default) - if true, it will try to reconnect with an exponential backoff when the websocket disconnects, until you call `Stream#disconnect`
1617Note:
1819-- calling `sleep` is no longer needed after connecting - call `connect` on a new thread instead to get previously default behavior of running the event loop asynchronously
20- the disconnect event no longer passes an error object in the argument
21-- there is currently no "heartbeat" feature as in 0.1.x that checks for a stuck connection - but it doesn't seem to be needed
2223## [0.1.3] - 2023-07-04
24
···1+## Unreleased
2+3+The main change in this version is that inline YARD documentation has been added. This was also a good opportunity to review some APIs and tweak some things in order to get Skyfall a bit closer to a 1.0.
4+5+New APIs:
6+7+- added `Skyfall::Jetstream::CommitMessage#operation` (aliased as `op`) which returns the (always single) operation in the `operations` array
8+- added `#kind` as alias for `#type` in both `Message` classes
9+- added a base class for error types, `Skyfall::Error`
10+- added `#blocks` to `Skyfall::Firehose::SyncMessage`
11+- added `#rev`, `#since` and `#prev_data` to `Skyfall::Firehose::CommitMessage`
12+13+Deprecated & removed APIs:
14+15+- removed deprecated `HandleMessage` and `TombstoneMessage` message classes
16+- removed deprecated `CommitMessage#prev`
17+- deprecated `#path` in both `Operation` classes
18+19+Optimizations:
20+21+- much faster `Skyfall::Firehose::Message#time` parsing on Ruby 3.2+
22+- lazy decoding of sections in `CarArchive` โ saves quite a lot of work if sections are only accessed through `Operation#raw_record`
23+- added `frozen_string_literal: true` in all files to reduce garbage collection
24+25+Access level changes:
26+27+- restricted `Stream#start_heartbeat_timer` & `Stream#stop_heartbeat_timer` methods access to private
28+- restricted `Stream#handle_message` method access to protected
29+- restricted `Stream#last_update` to read-only access
30+- restricted `#inspectable_variables` methods access to either private or protected
31+- relaxed `Stream#build_websocket_url` & `Stream#build_websocket_client` methods access from private to protected
32+- fixed private class method `Skyfall::Firehose::Message.decode_cbor_objects` which wasn't actually private
33+34+Additional validations and other changes:
35+36+- `Stream#connect` throws an error if neither `on_message` nor `on_raw_message` handlers have been configured
37+- `Message` subclasses do additional checks if the fields they require to not be nil aren't nil
38+- `Message` subclasses raise an error if `.new` is called on a subclass (and not on the base `Message`) passing the data of a wrong kind of message (instead of returning e.g. a `CommitMessage` from `AccountMessage.new` as it worked previously)
39+- made `LabelsMessage` a subclass of `Firehose::Message`
40+- fixed the `require`s config in some files so they can be loaded in any order
41+42+43+## [0.6.1] - 2026-01-08
44+45+- added `:bsky_notif_declaration` shortcode for `app.bsky.notification.declaration` collection
46+- throw error when trying to run two streams in one process (see b4a1514f5da28983205765e55724b5c4abe6c5e4 for details)
47+- added protected `#send_data` and `#socket` methods in `Stream` for use in `Stream` subclasses (currently for the Tapfall gem)
48+- added a way to customize headers sent when connecting in `Stream` subclasses through the `#request_headers` method
49+50+## [0.6.0] - 2025-06-25
51+52+- significantly speeded up reading of events from the binary firehose (`Skyfall::Firehose`) โ up to 4-5x faster than before
53+- removed the `Skyfall::Stream.new` constructor deprecated in 0.5.0
54+55+## [0.5.1] - 2025-05-18
56+57+- added support for the new `#sync` message type
58+- added `#unknown?` helper in `Skyfall::Firehose::Message` and `Skyfall::Jetstream::Message`, which returns true if the message type is `UnknownMessage`
59+- added `:bsky_verification` and `:bsky_actor_status` lexicon type and shortcode
60+- added one missing require
61+62+## [0.5.0] - 2024-11-15
63+64+Jetstream support! You can now connect to [Jetstream](https://github.com/bluesky-social/jetstream) sources using `Skyfall::Jetstream` (see readme).
65+66+This required some breaking changes in the existing API:
67+68+- `Skyfall::Stream` has been renamed to `Skyfall::Firehose`, `Skyfall::Stream` is now a base class of both `Firehose` and `Jetstream`; the existing `Skyfall::Stream` constructor works for now but will be removed soon
69+- `Skyfall::WebsocketMessage` and its subclasses have been separated into two parallel families under `Skyfall::Firehose` and `Skyfall::Jetstream`, with the base classes just named `Message`
70+- same thing happened with `Skyfall::Operation`
71+- `data_object` and `type_object` properties in `WebsocketMessage` are considered semi-private API now ("nodoc")
72+73+In most cases, you should only need to update the `Skyfall::Stream` class name in the constructor. If you've referenced message classes like `Skyfall::CommitMessage` directly, it's probably better to just check the `#type` property instead.
74+75+Also, small change to the user agent API: `Skyfall::Stream` now has an additional metod `version_string`, which will always return `Skyfall/0.x.y` โ it's recommended to use that instead of `default_user_agent` to build your own user agent string that includes the library version. `default_user_agent` now passes through to `version_string`, but it could be changed in future to return something else.
76+77+## [0.4.1] - 2024-10-04
78+79+- performance fix โ don't decode CAR sections which aren't needed, which is most of them; this cuts the amount of memory that GC has to free up by about one third, and should speed up processing by around ~10%
80+81+## [0.4.0] - 2024-09-23
82+83+- (re)added a "hearbeat" feature (removed earlier in 0.2.0) to fix the occasional issue when the websocket stops receiving data, but doesn't disconnect (not enabled by default, turn it on by setting `check_heartbeat` to true)
84+- added a way to set the user agent sent when connecting using the `user_agent` field (default is `"Skyfall/#{version}"`)
85+- added `app.bsky.feed.postgate` record type
86+87+## [0.3.1] - 2024-06-28
88+89+- added `app.bsky.graph.starterpack` and `chat.bsky.actor.declaration` record types
90+- added `#account` event type (`AccountMessage`)
91+- added `handle` field to `IdentityMessage`
92+- fixed param validation on `Stream` initialization
93+- reverted the change that added Ruby stdlib dependencies explicitly to the gemspec, since this causes more problems than it's worth โ only `base64` is left there, since it's the one now required to be listed
94+95+## [0.3.0] - 2024-03-21
96+97+- added support for labeller firehose, served by labeller services at the `com.atproto.label.subscribeLabels` endpoint (aliased as `:subscribe_labels`)
98+- the `#labels` messages from the labeller firehose are parsed into a `LabelsMessage`, which includes a `labels` array of `Label` objects
99+- `Stream` callbacks can now also be assigned via setters, e.g. `stream.on_message = proc { ... }`
100+- added default error handler to `Stream` which logs the error to `$stdout` โ set `stream.on_error = nil` to disable
101+- added Ruby stdlib dependencies explicitly to the gemspec โ fixes a warning in Ruby 3.3 when requiring `base64`, which will be extracted as an optional gem in 3.4
102+103+## [0.2.5] - 2024-03-14
104+105+- added `:bsky_labeler` record type symbol & collection constant
106+107+## [0.2.4] - 2024-02-27
108+109+- added support for `#identity` message type
110+- added `Operation#did` as an alias of `#repo`
111+- added `Stream#reconnect` method which forces the websocket to reconnect
112+- added some validation for the `cursor` parameter in `Stream` initializer
113+- the `server` parameter in `Stream` initializer can be a full URL with scheme, which lets you connect to e.g. `ws://localhost` (since by default, `wss://` is used)
114+- tweaked `#inspect` output of `Stream` and `Operation`
115+116+## [0.2.3] - 2023-09-28
117+118+- fixed encoding of image CIDs again (they should be wrapped in a `$link` object)
119+- binary strings are now correctly returned as `$bytes` objects
120+- added `list`, `listblock` and `threadgate` to record type symbols and collection constants
121+122## [0.2.2] - 2023-09-06
123124- fixed image CIDs returned in the record JSON as CBOR tag objects (they are now returned decoded to the string form)
125126## [0.2.1] - 2023-08-19
127128+- optimized `WebsocketMessage` parsing performance โ lazy parsing of most properties (message decoding should be over 50% faster on average)
129- added separate subclasses of `WebsocketMessage` for different message types
130- added support for `#handle`, `#info` and `#tombstone` message types
131- `UnknownMessage` is returned for unrecognized message types
···133## [0.2.0] - 2023-07-24
134135- switched the websocket library from `websocket-client-simple` to `faye-websocket`, which should make event parsing up to ~30ร faster (!)
136+- added `auto_reconnect` property to `Stream` (on by default) โ if true, it will try to reconnect with an exponential backoff when the websocket disconnects, until you call `Stream#disconnect`
137138Note:
139140+- calling `sleep` is no longer needed after connecting โ call `connect` on a new thread instead to get previously default behavior of running the event loop asynchronously
141- the disconnect event no longer passes an error object in the argument
142+- there is currently no "heartbeat" feature as in 0.1.x that checks for a stuck connection โ but it doesn't seem to be needed
143144## [0.1.3] - 2023-07-04
145
···1The zlib License
23-Copyright (c) 2023 Jakub Suder
45This software is provided 'as-is', without any express or implied
6warranty. In no event will the authors be held liable for any damages
···1The zlib License
23+Copyright (c) 2026 Jakub Suder
45This software is provided 'as-is', without any express or implied
6warranty. In no event will the authors be held liable for any damages
+240-23
README.md
···1# Skyfall
23-๐ค A Ruby gem for streaming data from the Bluesky/AtProto firehose ๐ฆ
000456## What does it do
78-Skyfall is a Ruby library for connecting to the *"firehose"* of the Bluesky social network, i.e. a websocket which
9-streams all new posts and everything else happening on the Bluesky network in real time. The code connects to the
10-websocket endpoint, decodes the messages which are encoded in some binary formats like DAG-CBOR, and returns the data as Ruby objects, which you can filter and save to some kind of database (e.g. in order to create a custom feed).
111213## Installation
1415- gem install skyfall
00000000161718## Usage
1920-Start a connection to the firehose by creating a `Skyfall::Stream` object, passing the server hostname and endpoint name:
002122```rb
23require 'skyfall'
2425-sky = Skyfall::Stream.new('bsky.social', :subscribe_repos)
26```
2728-Add event listeners to handle incoming messages and get notified of errors:
002930```rb
0000000031sky.on_connect { puts "Connected" }
32sky.on_disconnect { puts "Disconnected" }
003334-sky.on_message { |m| p m }
35sky.on_error { |e| puts "ERROR: #{e}" }
00000036```
3738When you're ready, open the connection by calling `connect`:
···41sky.connect
42```
43000000000000000000000000000000000000000000000000004445### Processing messages
4647-Each message passed to `on_message` is an instance of the `WebsocketMessage` class and has such properties:
000000000000000004849-- `type` (symbol) - usually `:commit`
50-- `seq` (sequential number)
51-- `time` (Time)
52-- `repo` (string) - DID of the repository (user account)
53- `commit` - CID of the commit
54-- `prev` - CID of the previous commit in that repo
55- `operations` - list of operations (usually one)
5657-Operations are objects of type `Operation` and have such properties:
00000000000005859-- `repo` (string) - DID of the repository (user account)
000060- `collection` (string) - name of the relevant collection in the repository, e.g. `app.bsky.feed.post` for posts
0061- `path` (string) - the path part of the at:// URI - collection name + ID (rkey) of the item
062- `action` (symbol) - `:create`, `:update` or `:delete`
63-- `uri` (string) - the at:// URI
64-- `type` (symbol) - short name of the collection, e.g. `:bsky_post`
65-- `cid` - CID of the operation/record (`nil` for delete operations)
6667-Create and update operations will also have an attached record (JSON object) with details of the post, like etc. The record data is currently available as a Ruby hash via `raw_record` property (custom types will be added in a later version).
6869So for example, in order to filter only "create post" operations and print their details, you can do something like this:
70···82end
83```
8485-See complete example in [example/firehose.rb](https://github.com/mackuba/skyfall/blob/master/example/firehose.rb).
000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000868788## Credits
8990-Copyright ยฉ 2023 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/mackuba.eu)).
9192The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT).
93
···1# Skyfall
23+A Ruby gem for streaming data from the Bluesky/ATProto firehose ๐ฆ
4+5+> [!NOTE]
6+> Part of ATProto Ruby SDK: [ruby.sdk.blue](https://ruby.sdk.blue)
789## What does it do
1011+Skyfall is a Ruby library for connecting to the *"[firehose](https://atproto.com/specs/event-stream)"* of the Bluesky social network, i.e. a websocket which streams all new posts and everything else happening on the Bluesky network in real time. The code connects to the websocket endpoint, decodes the messages which are encoded in some binary formats like DAG-CBOR, and returns the data as Ruby objects, which you can filter and save to some kind of database (e.g. in order to create a custom feed).
12+13+Since version 0.5, Skyfall also supports connecting to [Jetstream](https://github.com/bluesky-social/jetstream/) sources, which serve the same kind of stream, but as JSON messages instead of CBOR.
141516## Installation
1718+To use Skyfall, you need a reasonably new version of Ruby โ it should run on Ruby 2.6 and above, although it's recommended to use a version that's still getting maintainance updates, i.e. currently 3.2+. A compatible version should be preinstalled on macOS Big Sur and above and on many Linux systems. Otherwise, you can install one using tools such as [RVM](https://rvm.io), [asdf](https://asdf-vm.com), [ruby-install](https://github.com/postmodern/ruby-install) or [ruby-build](https://github.com/rbenv/ruby-build), or `rpm` or `apt-get` on Linux (see more installation options on [ruby-lang.org](https://www.ruby-lang.org/en/downloads/)).
19+20+To install the gem, run the command:
21+22+ [sudo] gem install skyfall
23+24+Or add this to your app's `Gemfile`:
25+26+ gem 'skyfall', '~> 0.6'
272829## Usage
3031+### Standard ATProto firehose
32+33+To connect to the firehose, start by creating a `Skyfall::Firehose` object, specifying the server hostname and endpoint name:
3435```rb
36require 'skyfall'
3738+sky = Skyfall::Firehose.new('bsky.network', :subscribe_repos)
39```
4041+The server name can be just a hostname, or a full URL with a `ws:` or `wss:` scheme, which is useful if you want to use a non-encrypted websocket connection, e.g. `"ws://localhost:8000"`. The endpoint can be either a full NSID string like `"com.atproto.sync.subscribeRepos"`, or one of the defined symbol shortcuts - you will almost always want to pass `:subscribe_repos` here.
42+43+Next, set up event listeners to handle incoming messages and get notified of errors. Here are all the available listeners (you will need at least either `on_message` or `on_raw_message`):
4445```rb
46+# this gives you a parsed message object, one of subclasses of Skyfall::Firehose::Message
47+sky.on_message { |msg| p msg }
48+49+# this gives you raw binary data as received from the websocket
50+sky.on_raw_message { |data| p data }
51+52+# lifecycle events
53+sky.on_connecting { |url| puts "Connecting to #{url}..." }
54sky.on_connect { puts "Connected" }
55sky.on_disconnect { puts "Disconnected" }
56+sky.on_reconnect { puts "Connection lost, trying to reconnect..." }
57+sky.on_timeout { puts "Connection stalled, triggering a reconnect..." }
5859+# handling errors (there's a default error handler that does exactly this)
60sky.on_error { |e| puts "ERROR: #{e}" }
61+```
62+63+You can also call these as setters accepting a `Proc` - e.g. to disable default error handling, you can do:
64+65+```rb
66+sky.on_error = nil
67```
6869When you're ready, open the connection by calling `connect`:
···72sky.connect
73```
7475+The `#connect` method blocks until the connection is explicitly closed with `#disconnect` from an event or interrupt handler. Skyfall uses [EventMachine](https://github.com/eventmachine/eventmachine) under the hood, so in order to run some things in parallel, you can use e.g. `EM::PeriodicTimer`.
76+77+78+### Using a Jetstream source
79+80+Alternatively, you can connect to a [Jetstream](https://github.com/bluesky-social/jetstream/) server. Jetstream is a firehose proxy that lets you stream data as simple JSON instead, which uses much less bandwidth, and allows you to pick only a subset of events that you're interested in, e.g. only posts or only from specific accounts. (See the [configuration section](#jetstream-filters) for more info on Jetstream filtering.)
81+82+Jetstream connections are made using a `Skyfall::Jetstream` instance, which has more or less the same API as `Skyfall::Firehose`, so it should be possible to switch between those by just changing the line that creates the client instance:
83+84+```rb
85+sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network')
86+87+sky.on_message { |msg| ... }
88+sky.on_error { |e| ... }
89+sky.on_connect { ... }
90+...
91+92+sky.connect
93+```
94+95+### Cursors
96+97+ATProto websocket endpoints implement a "*cursor*" feature to help you make sure that you don't miss anything if your connection is down for a bit (because of a network issue, server restart, deploy etc.). Each message includes a `seq` field, which is the sequence number of the event. You can keep track of the last seq you've seen, and when you reconnect, you pass that number as a cursor parameter - the server will then "replay" all events you might have missed since that last one. (The `bsky.network` Relay firehose currently has a buffer of about 72 hours, though that's not something required by specification.)
98+99+To use a cursor when connecting to the firehose, pass it as the third parameter to `Skyfall::Firehose`. You should then regularly save the `seq` of the last event to some permanent storage, and then load it from there when reconnecting.
100+101+A full-network firehose sends many hundreds of events per second, so depending on your use case, it might be enough if you save it every n events (e.g. every 100 or 1000) and on clean shutdown:
102+103+```rb
104+cursor = load_cursor
105+106+sky = Skyfall::Firehose.new('bsky.network', :subscribe_repos, cursor)
107+sky.on_message do |msg|
108+ save_cursor(msg.seq) if msg.seq % 1000 == 0
109+ process_message(msg)
110+end
111+```
112+113+Jetstream has a similar mechanism, except the cursor is the event's timestamp in Unix time microseconds instead of just a number incrementing by 1. For `Skyfall::Jetstream`, pass the cursor as a key in an options hash:
114+115+```rb
116+cursor = load_cursor
117+118+sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', { cursor: cursor })
119+sky.on_message do |msg|
120+ save_cursor(msg.seq)
121+ process_message(msg)
122+end
123+```
124+125126### Processing messages
127128+Each message passed to `on_message` is an instance of a subclass of either `Skyfall::Firehose::Message` or `Skyfall::Jetstream::Message`, depending on the selected source. The supported message types are:
129+130+- `CommitMessage` (`#commit`) - represents a change in a user's repo; most messages are of this type
131+- `IdentityMessage` (`#identity`) - notifies about a change in user's DID document, e.g. a handle change or a migration to a new PDS
132+- `AccountMessage` (`#account`) - notifies about a change of an account's status (de/activation, suspension, deletion)
133+- `SyncMessage` (`#sync`) - updates repository state, can be used to trigger account resynchronization
134+- `LabelsMessage` (`#labels`) - only used in `subscribe_labels` endpoint
135+- `InfoMessage` (`#info`) - a protocol error message, e.g. about an invalid cursor parameter
136+- `UnknownMessage` is used for other unrecognized message types
137+138+`Skyfall::Firehose::Message` and `Skyfall::Jetstream::Message` variants of message classes should have more or less the same interface, except when a given field is not included in one of the formats.
139+140+All message objects have the following shared properties:
141+142+- `type` (symbol) - the message type identifier, e.g. `:commit`
143+- `seq` (integer) - a sequential index of the message; Jetstream messages instead have a `time_us` value, which is a Unix timestamp in microseconds (also aliased as `seq` for compatibility)
144+- `repo` or `did` (string) - DID of the repository (user account)
145+- `time` (Time) - timestamp of the described action
146147+All properties except `type` may be nil for some message types that aren't related to a specific user, like `#info`.
148+149+Commit messages additionally have:
150+151- `commit` - CID of the commit
0152- `operations` - list of operations (usually one)
153154+Handle and Identity messages additionally have:
155+156+- `handle` - the new handle assigned to the DID
157+158+Account messages additionally have:
159+160+- `active?` - whether the account is active, or inactive for any reason
161+- `status` - if not active, shows the status of the account (`:deactivated`, `:deleted`, `:takendown`)
162+163+Info messages additionally have:
164+165+- `name` - identifier of the message/error
166+- `message` - a human-readable description
167+168169+### Commit operations
170+171+Operations are objects of type `Skyfall::Firehose::Operation` or `Skyfall::Jetstream::Operation` and have such properties:
172+173+- `repo` or `did` (string) - DID of the repository (user account)
174- `collection` (string) - name of the relevant collection in the repository, e.g. `app.bsky.feed.post` for posts
175+- `type` (symbol) - short name of the collection, e.g. `:bsky_post`
176+- `rkey` (string) - identifier of a record in a collection
177- `path` (string) - the path part of the at:// URI - collection name + ID (rkey) of the item
178+- `uri` (string) - the complete at:// URI
179- `action` (symbol) - `:create`, `:update` or `:delete`
180+- `cid` (CID) - CID of the operation/record (`nil` for delete operations)
00181182+Create and update operations will also have an attached record (JSON object) with details of the post, like etc. The record data is currently available as a Ruby hash via `raw_record` property (custom types will be added in future).
183184So for example, in order to filter only "create post" operations and print their details, you can do something like this:
185···197end
198```
199200+For more examples, see the [examples page](https://ruby.sdk.blue/examples/) on [ruby.sdk.blue](https://ruby.sdk.blue), or the [bluesky-feeds-rb](https://tangled.org/mackuba.eu/bluesky-feeds-rb/blob/master/app/firehose_stream.rb) project, which implements a feed generator service.
201+202+203+### Note on custom lexicons
204+205+Note that the `Operation` objects have two properties that tell you the kind of record they're about: `#collection`, which is a string containing the official name of the collection/lexicon, e.g. `"app.bsky.feed.post"`; and `#type`, which is a symbol meant to save you some typing, e.g. `:bsky_post`.
206+207+When Skyfall receives a message about a record type that's not on the list, whether in the `app.bsky` namespace or not, the operation `type` will be `:unknown`, while the `collection` will be the original string. So if an app like e.g. "Skygram" appears with a `zz.skygram.*` namespace that lets you share photos on ATProto, the operations will have a type `:unknown` and collection names like `zz.skygram.feed.photo`, and you can check the `collection` field for record types known to you and process them in some appropriate way, even if Skyfall doesn't recognize the record type.
208+209+Do not however check if such operations have a `type` equal to `:unknown` first - just ignore the type and only check the `collection` string. The reason is that some next version of Skyfall might start recognizing those records and add a new `type` value for them like e.g. `:skygram_photo`, and then they won't match your condition anymore.
210+211+212+## Reconnection logic
213+214+In a perfect world, the websocket would never disconnect until you disconnect it, but unfortunately we don't live in a perfect world. The socket sometimes disconnects or stops responding, and Skyfall has some built-in protections to make sure it can operate without much oversight.
215+216+217+### Broken connections
218+219+If the connection is randomly closed for some reason, Skyfall will by default try to reconnect automatically. If the reconnection fails (e.g. because the network is down), it will wait with an [exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) up to 5 minute intervals and keep retrying forever until it connects again. The `on_reconnect` callback is triggered when the connection is closed (before the wait delay). This mechanism should generally solve most of the problem.
220+221+The auto reconnecting feature is enabled by default, but you can turn it off by setting `auto_reconnect` to `false`.
222+223+### Stalled connections & heartbeat
224+225+Occasionally, especially during times of very heavy traffic, the websocket can get into a stuck state where it stops receiving any data, but doesn't disconnect and just hangs like this forever. To work around this, there is a "heartbeat" feature which starts a background timer, which periodically checks how much time has passed since the last received event, and if the time exceeds a set limit, it manually disconnects and reconnects the stream.
226+227+This feature is not enabled by default, because there are some firehoses which will not be sending events often, possibly only once in a while โ e.g. labellers and independent PDS firehoses โ and in this case we don't want any heartbeat since it will be completely normal not to have any events for a long time. It's not really possible to detect easily if we're connecting to a full network relay or one of those, so in order to avoid false alarms, you need to enable this manually using the `check_heartbeat` property.
228+229+You can also change the `heartbeat_interval`, i.e. how often the timer is triggered (default: 10s), and the `heartbeat_timeout`, i.e. the amount of time passed without events needed to cause a reconnect (default: 5 min):
230+231+```rb
232+sky.check_heartbeat = true
233+sky.heartbeat_interval = 5
234+sky.heartbeat_timeout = 120
235+```
236+237+### Cursors when reconnecting
238+239+Skyfall keeps track of the last event's `seq` internally in the `cursor` property, so if the client reconnects for whatever reason, it will automatically use the latest cursor in the URL.
240+241+> [!NOTE]
242+> This only happens if you use the `on_message` callback and not `on_raw_message`, since the event is not parsed from binary data into a `Message` object if you use `on_raw_message`, so Skyfall won't have access to the `seq` field then.
243+244+245+## Streaming from labellers
246+247+Apart from `subscribe_repos`, there is a second endpoint `subscribe_labels`, which is used to stream labels from [labellers](https://atproto.com/specs/label) (ATProto moderation services). This endpoint only sends `#labels` events (and possibly `#info`).
248+249+To connect to a labeller, pass `:subscribe_labels` as the endpoint name to `Skyfall::Firehose`. The `on_message` callback will get called with `Skyfall::Firehose::LabelsMessage` events, each of which includes one or more labels as `Skyfall::Label`:
250+251+```rb
252+cursor = load_cursor(service)
253+sky = Skyfall::Firehose.new(service, :subscribe_labels, cursor)
254+sky.on_message do |msg|
255+ if msg.type == :labels
256+ msg.labels.each do |l|
257+ puts "[#{l.created_at}] #{l.subject} => #{l.value}"
258+ end
259+ end
260+end
261+```
262+263+See [ATProto label docs](https://atproto.com/specs/label) for info on what fields are included with each label - `Skyfall::Label` includes properties with these original names, and also more friendly aliases for each (e.g. `value` instead of `val`).
264+265+266+## Other configuration
267+268+### User agent
269+270+Skyfall sends a user agent header when making a connection. This is set by default to `"Skyfall/0.x.y"`, but it's recommended that you override it using the `user_agent` field to something that identifies your app and its author โ this will let the owner of the server you're connecting to know who to contact in case the client is causing some problems.
271+272+You can also append your user agent info to the default value like this:
273+274+```rb
275+sky.user_agent = "NewsBot (@news.bot) #{sky.version_string}"
276+```
277+278+### Jetstream filters
279+280+Jetstream allows you to specify [filters](https://github.com/bluesky-social/jetstream?tab=readme-ov-file#consuming-jetstream) of collection types and/or tracked DIDs when you connect, so it will send you only the events you're interested in. You can e.g. ask only for posts and ignore likes, or only profile events and ignore everything else, or only listen for posts from a few specific accounts.
281+282+To use these filters, pass the "wantedCollections" and/or "wantedDids" parameters in the options hash when initializing `Skyfall::Jetstream`. You can use the original JavaScript param names, or a more Ruby-like snake_case form:
283+284+```rb
285+sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', {
286+ wanted_collections: 'app.bsky.feed.post',
287+ wanted_dids: @dids
288+})
289+```
290+291+For collections, you can also use the symbol codes used in `Operation#type`, e.g. `:bsky_post`:
292+293+```rb
294+sky = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', {
295+ wanted_collections: [:bsky_post]
296+})
297+```
298+299+See [Jetstream docs](https://github.com/bluesky-social/jetstream?tab=readme-ov-file#consuming-jetstream) for more info on available filters.
300+301+> [!NOTE]
302+> The `compress` and `requireHello` options (and zstd compression) are not available at the moment. Also the "subscriber sourced messages" aren't implemented yet.
303304305## Credits
306307+Copyright ยฉ 2026 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/did:plc:oio4hkxaop4ao4wz2pp3f4cr)).
308309The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT).
310
···1+# frozen_string_literal: true
2+3module Skyfall
4+5+ #
6+ # This module defines constants for known Bluesky record collection types, and a mapping of those
7+ # names to symbol short codes which can be used as shorthand when processing events or in
8+ # Jetstream filters.
9+ #
10+11 module Collection
12+ BSKY_PROFILE = "app.bsky.actor.profile"
13+ BSKY_ACTOR_STATUS = "app.bsky.actor.status"
14+ BSKY_FEED = "app.bsky.feed.generator"
15+ BSKY_LIKE = "app.bsky.feed.like"
16+ BSKY_POST = "app.bsky.feed.post"
17+ BSKY_POSTGATE = "app.bsky.feed.postgate"
18+ BSKY_REPOST = "app.bsky.feed.repost"
19+ BSKY_THREADGATE = "app.bsky.feed.threadgate"
20+ BSKY_BLOCK = "app.bsky.graph.block"
21+ BSKY_FOLLOW = "app.bsky.graph.follow"
22+ BSKY_LIST = "app.bsky.graph.list"
23+ BSKY_LISTBLOCK = "app.bsky.graph.listblock"
24+ BSKY_LISTITEM = "app.bsky.graph.listitem"
25+ BSKY_STARTERPACK = "app.bsky.graph.starterpack"
26+ BSKY_VERIFICATION = "app.bsky.graph.verification"
27+ BSKY_LABELER = "app.bsky.labeler.service"
28+29+ BSKY_NOTIF_DECLARATION = "app.bsky.notification.declaration"
30+ BSKY_CHAT_DECLARATION = "chat.bsky.actor.declaration"
31+32+ # Mapping of NSID collection names to symbol short codes
33+34+ SHORT_CODES = {
35+ BSKY_ACTOR_STATUS => :bsky_actor_status,
36+ BSKY_BLOCK => :bsky_block,
37+ BSKY_FEED => :bsky_feed,
38+ BSKY_FOLLOW => :bsky_follow,
39+ BSKY_LABELER => :bsky_labeler,
40+ BSKY_LIKE => :bsky_like,
41+ BSKY_LIST => :bsky_list,
42+ BSKY_LISTBLOCK => :bsky_listblock,
43+ BSKY_LISTITEM => :bsky_listitem,
44+ BSKY_POST => :bsky_post,
45+ BSKY_POSTGATE => :bsky_postgate,
46+ BSKY_PROFILE => :bsky_profile,
47+ BSKY_REPOST => :bsky_repost,
48+ BSKY_STARTERPACK => :bsky_starterpack,
49+ BSKY_THREADGATE => :bsky_threadgate,
50+ BSKY_VERIFICATION => :bsky_verification,
51+ BSKY_CHAT_DECLARATION => :bsky_chat_declaration,
52+ BSKY_NOTIF_DECLARATION => :bsky_notif_declaration
53+ }
54+55+ # Returns a symbol short code for a given collection NSID, or `:unknown`
56+ # if NSID is not on the list.
57+ # @param collection [String] collection NSID
58+ # @return [Symbol] short code or :unknown
59+60+ def self.short_code(collection)
61+ SHORT_CODES[collection] || :unknown
62+ end
63+64+ # Returns a collection NSID assigned to a given short code symbol, if one is defined.
65+ # @param code [Symbol] one of the symbols listed in {SHORT_CODES}
66+ # @return [String, nil] assigned NSID string, or nil when code is not known
67+68+ def self.from_short_code(code)
69+ SHORT_CODES.detect { |k, v| v == code }&.first
70+ end
71 end
72end
+58-4
lib/skyfall/errors.rb
···001module Skyfall
2- class DecodeError < StandardError
0003 end
45- class UnsupportedError < StandardError
0006 end
78- class SubscriptionError < StandardError
9- attr_reader :error_type, :error_message
0001000000000000000000000000000000000000011 def initialize(error_type, error_message = nil)
12 @error_type = error_type
13 @error_message = error_message
1415 super("Subscription error: #{error_type}" + (error_message ? " (#{error_message})" : ""))
16 end
000000017 end
18end
···1+# frozen_string_literal: true
2+3module Skyfall
4+ #
5+ # Wrapper base class for Skyfall error classes.
6+ #
7+ class Error < StandardError
8 end
910+ #
11+ # Raised when some code is not configured or configured incorrectly.
12+ #
13+ class ConfigError < Error
14 end
1516+ #
17+ # Raised when some part of the message being decoded has invalid format.
18+ #
19+ class DecodeError < Error
20+ end
2122+ #
23+ # Raised when {Stream#connect} is called and there's already another instance of {Stream} or its
24+ # subclass like {Firehose} that's connected to another websocket.
25+ #
26+ # This is currently not supported in Skyfall, because it uses EventMachine behind the scenes, which
27+ # runs everything on a single "reactor" thread, and there can be only one such reactor thread in
28+ # a given process. In theory, it should be possible for two connections to run inside a single
29+ # shared EventMachine event loop, but it would require some more coordination and it might have
30+ # unexpected side effects - e.g. synchronous work (including I/O and network requests) done during
31+ # processing of an event from one connection would be blocking the other connection.
32+ #
33+ class ReactorActiveError < Error
34+ def initialize
35+ super(
36+ "An EventMachine reactor thread is already running, but it seems to have been launched by another Stream. " +
37+ "Skyfall doesn't currently support running two different Stream instances in a single process."
38+ )
39+ end
40+ end
41+42+ #
43+ # Raised when the server sends a message which is formatted correctly, but describes some kind of
44+ # error condition that the server has detected.
45+ #
46+ class SubscriptionError < Error
47+48+ # @return [String] a short machine-readable error code
49+ attr_reader :error_type
50+51+ # @return [String] a human-readable error message
52+ attr_reader :error_message
53+54+ #
55+ # @param error_type [String] a short machine-readable error code
56+ # @param error_message [String, nil] a human-readable error message
57+ #
58 def initialize(error_type, error_message = nil)
59 @error_type = error_type
60 @error_message = error_message
6162 super("Subscription error: #{error_type}" + (error_message ? " (#{error_message})" : ""))
63 end
64+ end
65+66+ #
67+ # Raised when the server sends a message which is formatted correctly, but written in a version
68+ # that's not supported by this library.
69+ #
70+ class UnsupportedError < Error
71 end
72end
+19
lib/skyfall/events.rb
···0000000000000000000
···1+# frozen_string_literal: true
2+3+module Skyfall
4+5+ # @private
6+ module Events
7+ protected
8+9+ def event_handler(name)
10+ define_method("on_#{name}") do |&block|
11+ @handlers[name.to_sym] = block
12+ end
13+14+ define_method("on_#{name}=") do |block|
15+ @handlers[name.to_sym] = block
16+ end
17+ end
18+ end
19+end
···1+# frozen_string_literal: true
2+3+require_relative '../firehose'
4+require_relative 'message'
5+6+module Skyfall
7+8+ #
9+ # Firehose message sent when the status of an account changes. This can be:
10+ #
11+ # - an account being created, sending its initial state (should be active)
12+ # - an account being deactivated or suspended
13+ # - an account being restored back to an active state from deactivation/suspension
14+ # - an account being deleted (the status returning `:deleted`)
15+ #
16+17+ class Firehose::AccountMessage < Firehose::Message
18+19+ #
20+ # @private
21+ # @param type_object [Hash] first decoded CBOR frame with metadata
22+ # @param data_object [Hash] second decoded CBOR frame with payload
23+ # @raise [DecodeError] if the message doesn't include required data
24+ #
25+ def initialize(type_object, data_object)
26+ super
27+ check_if_not_nil 'seq', 'did', 'time', 'active'
28+29+ @active = @data_object['active']
30+ @status = @data_object['status']&.to_sym
31+ end
32+33+ # @return [Boolean] true if the account is active, false if it's deactivated/suspended etc.
34+ def active?
35+ @active
36+ end
37+38+ # @return [Symbol, nil] for inactive accounts, specifies the exact state; nil for active accounts
39+ attr_reader :status
40+ end
41+end
···1+# frozen_string_literal: true
2+3+require_relative '../car_archive'
4+require_relative '../cid'
5+require_relative '../firehose'
6+require_relative 'message'
7+require_relative 'operation'
8+9+module Skyfall
10+11+ #
12+ # Firehose message which includes one or more operations on records in the repo (a record was
13+ # created, updated or deleted). In most cases this is a single record operation.
14+ #
15+ # Most of the messages received from the firehose are of this type, and this is the type you
16+ # will usually be most interested in.
17+ #
18+19+ class Firehose::CommitMessage < Firehose::Message
20+21+ #
22+ # @private
23+ # @param type_object [Hash] first decoded CBOR frame with metadata
24+ # @param data_object [Hash] second decoded CBOR frame with payload
25+ # @raise [DecodeError] if the message doesn't include required data
26+ #
27+ def initialize(type_object, data_object)
28+ super
29+ check_if_not_nil 'seq', 'repo', 'commit', 'blocks', 'ops', 'time', 'rev'
30+ end
31+32+ # @return [String] current revision of the repo
33+ def rev
34+ @data_object['rev']
35+ end
36+37+ # @return [String, nil] revision of the previous commit in the repo
38+ def since
39+ @data_object['since']
40+ end
41+42+ # @return [CID, nil] CID (Content Identifier) of data of the previous commit in the repo
43+ def prev_data
44+ @prev_data ||= CID.from_cbor_tag(@data_object['prevData'])
45+ end
46+47+ # @return [CID] CID (Content Identifier) of the commit
48+ def commit
49+ @commit ||= CID.from_cbor_tag(@data_object['commit'])
50+ end
51+52+ # @return [Skyfall::CarArchive] commit data in the form of a parsed CAR archive
53+ def blocks
54+ @blocks ||= CarArchive.new(@data_object['blocks'])
55+ end
56+57+ # @return [Array<Firehose::Operation>] record operations (usually one) included in the commit
58+ def operations
59+ @operations ||= @data_object['ops'].map { |op| Firehose::Operation.new(self, op) }
60+ end
61+62+ # Looks up record data assigned to a given operation in the commit's CAR archive.
63+ # @param op [Firehose::Operation]
64+ # @return [Hash, nil]
65+ def raw_record_for_operation(op)
66+ op.cid && blocks.section_with_cid(op.cid)
67+ end
68+ end
69+end
+37
lib/skyfall/firehose/identity_message.rb
···0000000000000000000000000000000000000
···1+# frozen_string_literal: true
2+3+require_relative '../firehose'
4+require_relative 'message'
5+6+module Skyfall
7+8+ #
9+ # Firehose message sent when a new DID is created or when the details of someone's DID document
10+ # are changed (usually either a handle change or a migration to a different PDS). The message
11+ # may include currently assigned handle, though it's not required that this field is set.
12+ #
13+ # Note: the message is originally emitted from the account's PDS and is passed as is by relays,
14+ # which means you can't fully trust that the handle is actually correctly assigned to the DID
15+ # and verified by DNS or well-known. To confirm that, use `DID.resolve_handle` from
16+ # [DIDKit](https://ruby.sdk.blue/didkit/).
17+ #
18+19+ class Firehose::IdentityMessage < Firehose::Message
20+21+ #
22+ # @private
23+ # @param type_object [Hash] first decoded CBOR frame with metadata
24+ # @param data_object [Hash] second decoded CBOR frame with payload
25+ # @raise [DecodeError] if the message doesn't include required data
26+ #
27+ def initialize(type_object, data_object)
28+ super
29+ check_if_not_nil 'seq', 'did', 'time'
30+31+ @handle = @data_object['handle']
32+ end
33+34+ # @return [String, nil] current handle assigned to the DID
35+ attr_reader :handle
36+ end
37+end
···1+# frozen_string_literal: true
2+3+require_relative '../firehose'
4+require_relative 'message'
5+6+module Skyfall
7+8+ #
9+ # An informational firehose message from the websocket service itself, unrelated to any repos.
10+ #
11+ # Currently there is only one type of message defined, `"OutdatedCursor"`, which is sent when
12+ # the client connects with a cursor that is older than the oldest event currently kept in the
13+ # backfill buffer. This message means that you're likely missing some events that were sent
14+ # since the last time the client was connected but which were already deleted from the buffer.
15+ #
16+ # Note: the {#did}, {#seq} and {#time} properties are always `nil` for `#info` messages.
17+ #
18+19+ class Firehose::InfoMessage < Firehose::Message
20+21+ # @return [String] short machine-readable code of the info message
22+ attr_reader :name
23+24+ # @return [String, nil] a human-readable description
25+ attr_reader :message
26+27+ # Message which means that the cursor passed when connecting is older than the oldest event
28+ # currently kept in the backfill buffer, and that you've likely missed some events that have
29+ # already been deleted
30+ OUTDATED_CURSOR = "OutdatedCursor"
31+32+ #
33+ # @private
34+ # @param type_object [Hash] first decoded CBOR frame with metadata
35+ # @param data_object [Hash] second decoded CBOR frame with payload
36+ # @raise [DecodeError] if the message doesn't include required data
37+ #
38+ def initialize(type_object, data_object)
39+ super
40+ check_if_not_nil 'name'
41+42+ @name = @data_object['name']
43+ @message = @data_object['message']
44+ end
45+46+ # @return [String] a formatted summary
47+ def to_s
48+ (@name || "InfoMessage") + (@message ? ": #{@message}" : "")
49+ end
50+51+ protected
52+53+ # @return [Array<Symbol>] list of instance variables to be printed in the {#inspect} output
54+ def inspectable_variables
55+ super - [:@did, :@seq]
56+ end
57+ end
58+end
+41
lib/skyfall/firehose/labels_message.rb
···00000000000000000000000000000000000000000
···1+# frozen_string_literal: true
2+3+require_relative '../firehose'
4+require_relative '../label'
5+require_relative 'message'
6+7+module Skyfall
8+9+ #
10+ # A message which includes one or more labels (as {Skyfall::Label}). This type of message
11+ # is only sent from a `:subscribe_labels` firehose from a labeller service.
12+ #
13+ # Note: the {#did} and {#time} properties are always `nil` for `#labels` messages.
14+ #
15+16+ class Firehose::LabelsMessage < Firehose::Message
17+18+ # @return [Array<Skyfall::Label>] labels included in the batch
19+ attr_reader :labels
20+21+ #
22+ # @private
23+ # @param type_object [Hash] first decoded CBOR frame with metadata
24+ # @param data_object [Hash] second decoded CBOR frame with payload
25+ # @raise [DecodeError] if the message doesn't include required data
26+ #
27+ def initialize(type_object, data_object)
28+ super
29+ check_if_not_nil 'seq', 'labels'
30+31+ @labels = @data_object['labels'].map { |x| Label.new(x) }
32+ end
33+34+ protected
35+36+ # @return [Array<Symbol>] list of instance variables to be printed in the {#inspect} output
37+ def inspectable_variables
38+ super - [:@did]
39+ end
40+ end
41+end
···1+# frozen_string_literal: true
2+3+require_relative '../errors'
4+require_relative '../extensions'
5+require_relative '../firehose'
6+7+require 'cbor'
8+require 'time'
9+10+module Skyfall
11+12+ # @abstract
13+ # Abstract base class representing a CBOR firehose message.
14+ #
15+ # Actual messages are returned as instances of one of the subclasses of this class,
16+ # depending on the type of message, most commonly as {Skyfall::Firehose::CommitMessage}.
17+ #
18+ # The {new} method is overridden here so that it can be called with a binary data message
19+ # from the websocket, and it parses the type from the appropriate frame and builds an
20+ # instance of a matching subclass.
21+ #
22+ # You normally don't need to call this class directly, unless you're building a custom
23+ # subclass of {Skyfall::Stream}, or reading raw data packets from the websocket through
24+ # the {Skyfall::Stream#on_raw_message} event handler.
25+26+ class Firehose::Message
27+ using Skyfall::Extensions
28+29+ # Type of the message (e.g. `:commit`, `:identity` etc.)
30+ # @return [Symbol]
31+ attr_reader :type
32+33+ # DID of the account (repo) that the event is sent by.
34+ # @return [String, nil]
35+ attr_reader :did
36+37+ # Sequential number of the message, to be used as a cursor when reconnecting.
38+ # @return [Integer, nil]
39+ attr_reader :seq
40+41+ alias repo did
42+ alias kind type
43+44+ # First of the two CBOR objects forming the message payload, which mostly just includes the type field.
45+ # @api private
46+ # @return [Hash]
47+ attr_reader :type_object
48+49+ # Second of the two CBOR objects forming the message payload, which contains the rest of the data.
50+ # @api private
51+ # @return [Hash]
52+ attr_reader :data_object
53+54+ #
55+ # Parses the CBOR objects from the binary data and returns an instance of an appropriate subclass.
56+ #
57+ # {Skyfall::Firehose::UnknownMessage} is returned if the message type is not recognized.
58+ #
59+ # @param data [String] binary payload of a firehose websocket message
60+ # @return [Skyfall::Firehose::Message]
61+ # @raise [Skyfall::DecodeError] if the structure of the message is invalid
62+ # @raise [Skyfall::UnsupportedError] if the message has an unknown future version
63+ # @raise [Skyfall::SubscriptionError] if the data contains an error message from the server
64+ #
65+ def self.new(data)
66+ type_object, data_object = decode_cbor_objects(data)
67+68+ message_class = case type_object['t']
69+ when '#account' then Firehose::AccountMessage
70+ when '#commit' then Firehose::CommitMessage
71+ when '#identity' then Firehose::IdentityMessage
72+ when '#info' then Firehose::InfoMessage
73+ when '#labels' then Firehose::LabelsMessage
74+ when '#sync' then Firehose::SyncMessage
75+ else Firehose::UnknownMessage
76+ end
77+78+ if self != Firehose::Message && self != message_class
79+ expected_type = self.name.split('::').last.gsub(/Message$/, '').downcase
80+ raise DecodeError, "Expected ##{expected_type} message, got #{type_object['t']}"
81+ end
82+83+ message = message_class.allocate
84+ message.send(:initialize, type_object, data_object)
85+ message
86+ end
87+88+ #
89+ # @private
90+ # @param type_object [Hash] first decoded CBOR frame with metadata
91+ # @param data_object [Hash] second decoded CBOR frame with payload
92+ #
93+ def initialize(type_object, data_object)
94+ @type_object = type_object
95+ @data_object = data_object
96+97+ @type = @type_object['t'][1..-1].to_sym
98+ @did = @data_object['repo'] || @data_object['did']
99+ @seq = @data_object['seq']
100+ end
101+102+ #
103+ # List of operations on records included in the message. Only `#commit` messages include
104+ # operations, but for convenience the method is declared here and returns an empty array
105+ # in other messages.
106+ # @return [Array<Firehose::Operation>]
107+ #
108+ def operations
109+ []
110+ end
111+112+ #
113+ # @return [Boolean] true if the message is {Firehose::UnknownMessage} (of unrecognized type)
114+ #
115+ def unknown?
116+ self.is_a?(Firehose::UnknownMessage)
117+ end
118+119+ #
120+ # Timestamp decoded from the message.
121+ #
122+ # Note: this represents the time when the message was emitted from the original PDS, which
123+ # might differ a lot from the `created_at` time saved in the record data, e.g. if user's local
124+ # time is set incorrectly, or if an archive of existing posts was imported from another platform.
125+ #
126+ # @return [Time, nil]
127+ #
128+ def time
129+ @time ||= @data_object['time'] && Time.iso8601(@data_object['time'])
130+ end
131+132+ # Much faster version for Ruby 3.2+
133+134+ if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.2')
135+ def time
136+ @time ||= @data_object['time'] && Time.new(@data_object['time'])
137+ end
138+ end
139+140+ # Returns a string with a representation of the object for debugging purposes.
141+ # @return [String]
142+ def inspect
143+ vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
144+ "#<#{self.class}:0x#{object_id} #{vars}>"
145+ end
146+147+148+ protected
149+150+ # @return [Array<Symbol>] list of instance variables to be printed in the {#inspect} output
151+ def inspectable_variables
152+ instance_variables - [:@type_object, :@data_object, :@blocks]
153+ end
154+155+156+ private
157+158+ # Note: this method is written this way as an optimization
159+ def check_if_not_nil(a, b = nil, c = nil, d = nil, e = nil, f = nil, g = nil)
160+ ok = @data_object.has_key?(a)
161+ ok &&= @data_object.has_key?(b) if b
162+ ok &&= @data_object.has_key?(c) if c
163+ ok &&= @data_object.has_key?(d) if d
164+ ok &&= @data_object.has_key?(e) if e
165+ ok &&= @data_object.has_key?(f) if f
166+ ok &&= @data_object.has_key?(g) if g
167+168+ if !ok
169+ expected_fields = [a, b, c, d, e, f, g].compact
170+ missing_fields = expected_fields.select { |x| @data_object[x].nil? }
171+ raise DecodeError.new("Missing event details (#{missing_fields.map(&:to_s).join(', ')})")
172+ end
173+ end
174+175+ def self.decode_cbor_objects(data)
176+ objects = CBOR.decode_sequence(data)
177+178+ if objects.length < 2
179+ raise DecodeError.new("Malformed message: #{objects.inspect}")
180+ elsif objects.length > 2
181+ raise DecodeError.new("Invalid number of objects: #{objects.length}")
182+ end
183+184+ type, data = objects
185+186+ if data['error']
187+ raise SubscriptionError.new(data['error'], data['message'])
188+ end
189+190+ raise DecodeError.new("Invalid object type: #{type.inspect}") unless type.is_a?(Hash)
191+ raise DecodeError.new("Missing data: #{type.inspect}") unless type['op'] && type['t']
192+ raise DecodeError.new("Invalid object type: #{type['op'].inspect}") unless type['op'].is_a?(Integer)
193+ raise DecodeError.new("Invalid object type: #{type['t'].inspect}") unless type['t'].is_a?(String)
194+ raise DecodeError.new("Invalid message type: #{type['t'].inspect}") unless type['t'].start_with?('#')
195+ raise UnsupportedError.new("Unsupported version: #{type['op']}") unless type['op'] == 1
196+ raise DecodeError.new("Invalid object type: #{data.inspect}") unless data.is_a?(Hash)
197+198+ [type, data]
199+ end
200+201+ private_class_method :decode_cbor_objects
202+ end
203+end
204+205+# need to be at the end because of a circular dependency
206+207+require_relative 'account_message'
208+require_relative 'commit_message'
209+require_relative 'identity_message'
210+require_relative 'info_message'
211+require_relative 'labels_message'
212+require_relative 'sync_message'
213+require_relative 'unknown_message'
···1+# frozen_string_literal: true
2+3+require_relative '../collection'
4+require_relative '../firehose'
5+6+module Skyfall
7+8+ #
9+ # A single record operation from a firehose commit event. An operation is a new record being
10+ # created, or an existing record modified or deleted. It includes the URI and other details of
11+ # the record in question, type of the action taken, and record data for "created" and "update"
12+ # actions.
13+ #
14+ # Note: when a record is deleted, the previous record data is *not* included in the commit, only
15+ # its URI. This means that if you're tracking records which are referencing other records, e.g.
16+ # follow, block, or like records, you need to store information about this referencing record
17+ # including an URI or rkey, because in case of a delete, you will not get information about which
18+ # post was unliked or which account was unfollowed, only which like/follow record was deleted.
19+ #
20+ # At the moment, Skyfall doesn't parse the record data into any rich models specific for a given
21+ # record type with a convenient API, but simply returns them as `Hash` objects (see {#raw_record}).
22+ # In the future, a separate `#record` method might be added which returns a parsed record model.
23+ #
24+25+ class Firehose::Operation
26+27+ #
28+ # @param message [Skyfall::Firehose::Message] commit message the operation is included in
29+ # @param json [Hash] operation data
30+ #
31+ def initialize(message, json)
32+ @message = message
33+ @json = json
34+ end
35+36+ # @return [String] DID of the account/repository in which the operation happened
37+ def repo
38+ @message.repo
39+ end
40+41+ alias did repo
42+43+ # @return [String] path part of the record URI (collection + rkey)
44+ # @deprecated Use {#collection} + {#rkey}
45+ def path
46+ @@path_warning_printed ||= false
47+48+ unless @@path_warning_printed
49+ $stderr.puts "Warning: Skyfall::Firehose::Operation#path is deprecated - use #collection + #rkey"
50+ @@path_warning_printed = true
51+ end
52+53+ @json['path']
54+ end
55+56+ # @return [Symbol] type of the operation (`:create`, `:update` or `:delete`)
57+ def action
58+ @json['action'].to_sym
59+ end
60+61+ # @return [String] record collection NSID
62+ def collection
63+ @json['path'].split('/')[0]
64+ end
65+66+ # @return [String] record rkey
67+ def rkey
68+ @json['path'].split('/')[1]
69+ end
70+71+ # @return [String] full AT URI of the record
72+ def uri
73+ "at://#{repo}/#{@json['path']}"
74+ end
75+76+ # @return [CID, nil] CID (Content Identifier) of the record (nil for delete operations)
77+ def cid
78+ @cid ||= @json['cid'] && CID.from_cbor_tag(@json['cid'])
79+ end
80+81+ # @return [Hash, nil] record data as a plain Ruby Hash (nil for delete operations)
82+ def raw_record
83+ @raw_record ||= @message.raw_record_for_operation(self)
84+ end
85+86+ # Symbol short code of the collection, like `:bsky_post`. If the collection NSID is not
87+ # recognized, the type is `:unknown`. The full NSID is always available through the
88+ # `#collection` property.
89+ #
90+ # @return [Symbol]
91+ # @see Skyfall::Collection
92+ #
93+ def type
94+ Collection.short_code(collection)
95+ end
96+97+ # Returns a string with a representation of the object for debugging purposes.
98+ # @return [String]
99+ def inspect
100+ vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
101+ "#<#{self.class}:0x#{object_id} #{vars}>"
102+ end
103+104+ private
105+106+ def inspectable_variables
107+ instance_variables - [:@message]
108+ end
109+ end
110+end
+40
lib/skyfall/firehose/sync_message.rb
···0000000000000000000000000000000000000000
···1+# frozen_string_literal: true
2+3+require_relative '../car_archive'
4+require_relative '../firehose'
5+require_relative 'message'
6+7+module Skyfall
8+9+ #
10+ # Firehose message which declares the current state of the repository. The message is meant to
11+ # trigger a resynchronization of the repository from a receiving consumer, if the consumer detects
12+ # from the message rev that it must have missed some events from that repository.
13+ #
14+ # The sync message can be emitted by a PDS or relay to force a repair of a broken account state,
15+ # or e.g. when an account is created, migrated or recovered from a CAR backup.
16+ #
17+18+ class Firehose::SyncMessage < Firehose::Message
19+20+ #
21+ # @private
22+ # @param type_object [Hash] first decoded CBOR frame with metadata
23+ # @param data_object [Hash] second decoded CBOR frame with payload
24+ # @raise [DecodeError] if the message doesn't include required data
25+ #
26+ def initialize(type_object, data_object)
27+ super
28+ check_if_not_nil 'seq', 'did', 'blocks', 'rev', 'time'
29+ end
30+31+ def rev
32+ @rev ||= @data_object['rev']
33+ end
34+35+ # @return [Skyfall::CarArchive] commit data in the form of a parsed CAR archive
36+ def blocks
37+ @blocks ||= CarArchive.new(@data_object['blocks'])
38+ end
39+ end
40+end
+14
lib/skyfall/firehose/unknown_message.rb
···00000000000000
···1+# frozen_string_literal: true
2+3+require_relative '../firehose'
4+require_relative 'message'
5+6+module Skyfall
7+8+ #
9+ # Firehose message of an unrecognized type.
10+ #
11+12+ class Firehose::UnknownMessage < Firehose::Message
13+ end
14+end
···1+# frozen_string_literal: true
2+3+require_relative 'stream'
4+require 'uri'
5+6+module Skyfall
7+8+ #
9+ # Client of a standard AT Protocol firehose websocket.
10+ #
11+ # This is the main Skyfall class to use to connect to a CBOR-based firehose
12+ # websocket endpoint like `subscribeRepos` (on a PDS or a relay).
13+ #
14+ # To connect to the firehose, you need to:
15+ #
16+ # * create an instance of {Firehose}, passing it the hostname/URL of the server,
17+ # name of the endpoint (normally `:subscribe_repos`) and optionally a cursor
18+ # * set up callbacks to be run when connecting, disconnecting, when a message
19+ # is received etc. (you need to set at least a message handler)
20+ # * call {#connect} to start the connection
21+ # * handle the received messages (instances of a {Skyfall::Firehose::Message}
22+ # subclass)
23+ #
24+ # @example
25+ # client = Skyfall::Firehose.new('bsky.network', :subscribe_repos, last_cursor)
26+ # # or: client = Skyfall::Firehose.new('bsky.network', last_cursor)
27+ #
28+ # client.on_message do |msg|
29+ # next unless msg.type == :commit
30+ #
31+ # msg.operations.each do |op|
32+ # if op.type == :bsky_post && op.action == :create
33+ # puts "[#{msg.time}] #{msg.repo}: #{op.raw_record['text']}"
34+ # end
35+ # end
36+ # end
37+ #
38+ # client.connect
39+ #
40+ # # You might also want to set some or all of these lifecycle callback handlers:
41+ #
42+ # client.on_connecting { |url| puts "Connecting to #{url}..." }
43+ # client.on_connect { puts "Connected" }
44+ # client.on_disconnect { puts "Disconnected" }
45+ # client.on_reconnect { puts "Connection lost, trying to reconnect..." }
46+ # client.on_timeout { puts "Connection stalled, triggering a reconnect..." }
47+ # client.on_error { |e| puts "ERROR: #{e}" }
48+ #
49+ # @note Most of the methods of this class that you might want to use are defined in {Skyfall::Stream}.
50+ #
51+52+ class Firehose < Stream
53+54+ # the main firehose endpoint on a PDS or relay
55+ SUBSCRIBE_REPOS = "com.atproto.sync.subscribeRepos"
56+57+ # only used with moderation services (labellers)
58+ SUBSCRIBE_LABELS = "com.atproto.label.subscribeLabels"
59+60+ NAMED_ENDPOINTS = {
61+ :subscribe_repos => SUBSCRIBE_REPOS,
62+ :subscribe_labels => SUBSCRIBE_LABELS
63+ }
64+65+ # Current cursor (seq of the last seen message)
66+ # @return [Integer, nil]
67+ attr_accessor :cursor
68+69+ #
70+ # @overload initialize(server, endpoint, cursor = nil)
71+ # Returns a new instance of a firehose client connecting to a given endpoint.
72+ #
73+ # @param server [String]
74+ # Address of the server to connect to.
75+ # Expects a string with either just a hostname, or a ws:// or wss:// URL with no path.
76+ # @param endpoint [Symbol, String]
77+ # XRPC method name.
78+ # Pass either a full NSID, or a symbol shorthand from {NAMED_ENDPOINTS}
79+ # @param cursor [Integer, String, nil]
80+ # sequence number from which to resume
81+ # @raise [ArgumentError] if any of the parameters is invalid
82+ #
83+ # @overload initialize(server, cursor = nil)
84+ # Returns a new instance of a firehose client connecting to `subscribeRepos`.
85+ #
86+ # @param server [String]
87+ # Address of the server to connect to.
88+ # Expects a string with either just a hostname, or a ws:// or wss:// URL with no path.
89+ # @param cursor [Integer, String, nil]
90+ # sequence number from which to resume
91+ # @raise [ArgumentError] if any of the parameters is invalid
92+ #
93+94+ def initialize(server, endpoint = nil, cursor = nil)
95+ require_relative 'firehose/message'
96+ super(server)
97+98+ if cursor.nil? && (endpoint.nil? || endpoint.to_s =~ /\A\d+\z/)
99+ cursor = endpoint
100+ endpoint = :subscribe_repos
101+ end
102+103+ @endpoint = check_endpoint(endpoint)
104+ @cursor = check_cursor(cursor)
105+ @root_url = ensure_empty_path(@root_url)
106+ end
107+108+109+ protected
110+111+ # Returns the full URL of the websocket endpoint to connect to.
112+ # @return [String]
113+114+ def build_websocket_url
115+ @root_url + "/xrpc/" + @endpoint + (@cursor ? "?cursor=#{@cursor}" : "")
116+ end
117+118+ # Processes a single message received from the websocket. Passes the received data to the
119+ # {#on_raw_message} handler, builds a {Skyfall::Firehose::Message} object, and passes it to
120+ # the {#on_message} handler (if defined). Also updates the {#cursor} to this message's sequence
121+ # number (note: this is skipped if {#on_message} is not set).
122+ #
123+ # @param msg
124+ # {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/API/MessageEvent Faye::WebSocket::API::MessageEvent}
125+ # @return [nil]
126+127+ def handle_message(msg)
128+ data = msg.data
129+ @handlers[:raw_message]&.call(data)
130+131+ if @handlers[:message]
132+ atp_message = Message.new(data)
133+ @cursor = atp_message.seq
134+ @handlers[:message].call(atp_message)
135+ else
136+ @cursor = nil
137+ end
138+ end
139+140+141+ private
142+143+ def check_cursor(cursor)
144+ if cursor.nil?
145+ nil
146+ elsif cursor.is_a?(Integer) || cursor.is_a?(String) && cursor =~ /^[0-9]+$/
147+ cursor.to_i
148+ else
149+ raise ArgumentError, "Invalid cursor: #{cursor.inspect} - cursor must be an integer number"
150+ end
151+ end
152+153+ def check_endpoint(endpoint)
154+ if endpoint.is_a?(String)
155+ raise ArgumentError.new("Invalid endpoint name: #{endpoint}") if endpoint.strip == '' || !endpoint.include?('.')
156+ elsif endpoint.is_a?(Symbol)
157+ raise ArgumentError.new("Unknown endpoint: #{endpoint}") if NAMED_ENDPOINTS[endpoint].nil?
158+ endpoint = NAMED_ENDPOINTS[endpoint]
159+ else
160+ raise ArgumentError, "Endpoint should be a string or a symbol"
161+ end
162+163+ endpoint
164+ end
165+ end
166+end
+39
lib/skyfall/jetstream/account_message.rb
···000000000000000000000000000000000000000
···1+# frozen_string_literal: true
2+3+require_relative '../errors'
4+require_relative '../jetstream'
5+require_relative 'message'
6+7+module Skyfall
8+9+ #
10+ # Jetstream message sent when the status of an account changes. This can be:
11+ #
12+ # - an account being created, sending its initial state (should be active)
13+ # - an account being deactivated or suspended
14+ # - an account being restored back to an active state from deactivation/suspension
15+ # - an account being deleted (the status returning `:deleted`)
16+ #
17+18+ class Jetstream::AccountMessage < Jetstream::Message
19+20+ #
21+ # @param json [Hash] message JSON decoded from the websocket message
22+ # @raise [DecodeError] if the message doesn't include required data
23+ #
24+ def initialize(json)
25+ raise DecodeError.new("Missing event details (account)") if json['account'].nil? || json['account']['active'].nil?
26+ super
27+ end
28+29+ # @return [Boolean] true if the account is active, false if it's deactivated/suspended etc.
30+ def active?
31+ @json['account']['active']
32+ end
33+34+ # @return [Symbol, nil] for inactive accounts, specifies the exact state; nil for active accounts
35+ def status
36+ @json['account']['status']&.to_sym
37+ end
38+ end
39+end
···1+# frozen_string_literal: true
2+3+require_relative '../errors'
4+require_relative '../jetstream'
5+require_relative 'message'
6+require_relative 'operation'
7+8+module Skyfall
9+10+ #
11+ # Jetstream message which includes a single operation on a record in the repo (a record was
12+ # created, updated or deleted). Most of the messages received from Jetstream are of this type,
13+ # and this is the type you will usually be most interested in.
14+ #
15+16+ class Jetstream::CommitMessage < Jetstream::Message
17+18+ #
19+ # @param json [Hash] message JSON decoded from the websocket message
20+ # @raise [DecodeError] if the message doesn't include required data
21+ #
22+ def initialize(json)
23+ raise DecodeError.new("Missing event details (commit)") if json['commit'].nil?
24+25+ %w(collection rkey operation).each { |f| raise DecodeError.new("Missing event details (#{f})") if json['commit'][f].nil? }
26+27+ super
28+ end
29+30+ # Returns the record operation included in the commit.
31+ # @return [Jetstream::Operation]
32+ #
33+ def operation
34+ @operation ||= Jetstream::Operation.new(self, json['commit'])
35+ end
36+37+ alias op operation
38+39+ # Returns record operations included in the commit. Currently a `:commit` message from
40+ # Jetstream always includes exactly one operation, but for compatibility with
41+ # {Skyfall::Firehose}'s API it's also returned in an array here.
42+ #
43+ # @return [Array<Jetstream::Operation>]
44+ #
45+ def operations
46+ [operation]
47+ end
48+ end
49+end
+36
lib/skyfall/jetstream/identity_message.rb
···000000000000000000000000000000000000
···1+# frozen_string_literal: true
2+3+require_relative '../errors'
4+require_relative '../jetstream'
5+require_relative 'message'
6+7+module Skyfall
8+9+ #
10+ # Jetstream message sent when a new DID is created or when the details of someone's DID document
11+ # are changed (usually either a handle change or a migration to a different PDS). The message
12+ # should include currently assigned handle (though the field is not required).
13+ #
14+ # Note: the message is originally emitted from the account's PDS and is passed as is by relays,
15+ # which means you can't fully trust that the handle is actually correctly assigned to the DID
16+ # and verified by DNS or well-known. To confirm that, use `DID.resolve_handle` from
17+ # [DIDKit](https://ruby.sdk.blue/didkit/).
18+ #
19+20+ class Jetstream::IdentityMessage < Jetstream::Message
21+22+ #
23+ # @param json [Hash] message JSON decoded from the websocket message
24+ # @raise [DecodeError] if the message doesn't include required data
25+ #
26+ def initialize(json)
27+ raise DecodeError.new("Missing event details (identity)") if json['identity'].nil?
28+ super
29+ end
30+31+ # @return [String, nil] current handle assigned to the DID
32+ def handle
33+ @json['identity']['handle']
34+ end
35+ end
36+end
···1+# frozen_string_literal: true
2+3+require_relative '../errors'
4+require_relative '../jetstream'
5+6+require 'time'
7+8+module Skyfall
9+10+ # @abstract
11+ # Abstract base class representing a Jetstream message.
12+ #
13+ # Actual messages are returned as instances of one of the subclasses of this class,
14+ # depending on the type of message, most commonly as {Skyfall::Jetstream::CommitMessage}.
15+ #
16+ # The {new} method is overridden here so that it can be called with a JSON message from
17+ # the websocket, and it parses the type from the JSON and builds an instance of a matching
18+ # subclass.
19+ #
20+ # You normally don't need to call this class directly, unless you're building a custom
21+ # subclass of {Skyfall::Stream} or reading raw data packets from the websocket through
22+ # the {Skyfall::Stream#on_raw_message} event handler.
23+24+ class Jetstream::Message
25+26+ # Type of the message (e.g. `:commit`, `:identity` etc.)
27+ # @return [Symbol]
28+ attr_reader :type
29+30+ # DID of the account (repo) that the event is sent by
31+ # @return [String]
32+ attr_reader :did
33+34+ # Server timestamp of the message (in Unix time microseconds), which serves as a cursor
35+ # when reconnecting; an equivalent of {Skyfall::Firehose::Message#seq} in CBOR firehose
36+ # messages.
37+ # @return [Integer]
38+ attr_reader :time_us
39+40+ alias repo did
41+ alias seq time_us
42+ alias kind type
43+44+ # The raw JSON of the message as parsed from the websocket packet.
45+ attr_reader :json
46+47+ #
48+ # Parses the JSON data from a websocket message and returns an instance of an appropriate subclass.
49+ #
50+ # {Skyfall::Jetstream::UnknownMessage} is returned if the message type is not recognized.
51+ #
52+ # @param data [String] plain text payload of a Jetstream websocket message
53+ # @return [Skyfall::Jetstream::Message]
54+ # @raise [DecodeError] if the message doesn't include required data
55+ #
56+ def self.new(data)
57+ json = JSON.parse(data)
58+59+ message_class = case json['kind']
60+ when 'account' then Jetstream::AccountMessage
61+ when 'commit' then Jetstream::CommitMessage
62+ when 'identity' then Jetstream::IdentityMessage
63+ else Jetstream::UnknownMessage
64+ end
65+66+ if self != Jetstream::Message && self != message_class
67+ expected_type = self.name.split('::').last.gsub(/Message$/, '').downcase
68+ raise DecodeError, "Expected '#{expected_type}' message, got '#{json['kind']}'"
69+ end
70+71+ message = message_class.allocate
72+ message.send(:initialize, json)
73+ message
74+ end
75+76+ #
77+ # @param json [Hash] message JSON decoded from the websocket message
78+ # @raise [DecodeError] if the message doesn't include required data
79+ #
80+ def initialize(json)
81+ %w(kind did time_us).each { |f| raise DecodeError.new("Missing event details (#{f})") if json[f].nil? }
82+83+ @json = json
84+ @type = @json['kind'].to_sym
85+ @did = @json['did']
86+ @time_us = @json['time_us']
87+ end
88+89+ #
90+ # @return [Boolean] true if the message is {Jetstream::UnknownMessage} (of unrecognized type)
91+ #
92+ def unknown?
93+ self.is_a?(Jetstream::UnknownMessage)
94+ end
95+96+ # Returns a record operation included in the message. Only `:commit` messages include
97+ # operations, but for convenience the method is declared here and returns nil in other messages.
98+ #
99+ # @return [nil]
100+ #
101+ def operation
102+ nil
103+ end
104+105+ alias op operation
106+107+ # List of operations on records included in the message. Only `:commit` messages include
108+ # operations, but for convenience the method is declared here and returns an empty array
109+ # in other messages.
110+ #
111+ # @return [Array<Jetstream::Operation>]
112+ #
113+ def operations
114+ []
115+ end
116+117+ #
118+ # Timestamp decoded from the message.
119+ #
120+ # Note: the time is read from the {#time_us} field, which stores the event time as an integer in
121+ # Unix time microseconds, and which is used as an equivalent of {Skyfall::Firehose::Message#seq}
122+ # in CBOR firehose messages. This timestamp represents the time when the message was received
123+ # and stored by Jetstream, which might differ a lot from the `created_at` time saved in the
124+ # record data, e.g. if user's local time is set incorrectly or if an archive of existing posts
125+ # was imported from another platform. It will also differ (usually only slightly) from the
126+ # timestamp of the original CBOR message emitted from the PDS and passed through the relay.
127+ #
128+ # @return [Time]
129+ #
130+ def time
131+ @time ||= Time.at(@time_us / 1_000_000.0)
132+ end
133+ end
134+end
135+136+# need to be at the end because of a circular dependency
137+138+require_relative 'account_message'
139+require_relative 'commit_message'
140+require_relative 'identity_message'
141+require_relative 'unknown_message'
···1+# frozen_string_literal: true
2+3+require_relative '../collection'
4+require_relative '../jetstream'
5+6+module Skyfall
7+8+ #
9+ # A single record operation from a Jetstream commit event. An operation is a new record being
10+ # created, or an existing record modified or deleted. It includes the URI and other details of
11+ # the record in question, type of the action taken, and record data for "created" and "update"
12+ # actions.
13+ #
14+ # Note: when a record is deleted, the previous record data is *not* included in the commit, only
15+ # its URI. This means that if you're tracking records which are referencing other records, e.g.
16+ # follow, block, or like records, you need to store information about this referencing record
17+ # including an URI or rkey, because in case of a delete, you will not get information about which
18+ # post was unliked or which account was unfollowed, only which like/follow record was deleted.
19+ #
20+ # At the moment, Skyfall doesn't parse the record data into any rich models specific for a given
21+ # record type with a convenient API, but simply returns them as `Hash` objects (see {#raw_record}).
22+ # In the future, a second `#record` method might be added which returns a parsed record model.
23+ #
24+25+ class Jetstream::Operation
26+27+ #
28+ # @param message [Skyfall::Jetstream::Message] commit message the operation is parsed from
29+ # @param json [Hash] operation data
30+ #
31+ def initialize(message, json)
32+ @message = message
33+ @json = json
34+ end
35+36+ # @return [String] DID of the account/repository in which the operation happened
37+ def repo
38+ @message.repo
39+ end
40+41+ alias did repo
42+43+ # @return [String] path part of the record URI (collection + rkey)
44+ # @deprecated Use {#collection} + {#rkey}
45+ def path
46+ @@path_warning_printed ||= false
47+48+ unless @@path_warning_printed
49+ $stderr.puts "Warning: Skyfall::Jetstream::Operation#path is deprecated - use #collection + #rkey"
50+ @@path_warning_printed = true
51+ end
52+53+ @json['collection'] + '/' + @json['rkey']
54+ end
55+56+ # @return [Symbol] type of the operation (`:create`, `:update` or `:delete`)
57+ def action
58+ @json['operation'].to_sym
59+ end
60+61+ # @return [String] record collection NSID
62+ def collection
63+ @json['collection']
64+ end
65+66+ # @return [String] record rkey
67+ def rkey
68+ @json['rkey']
69+ end
70+71+ # @return [String] full AT URI of the record
72+ def uri
73+ "at://#{repo}/#{collection}/#{rkey}"
74+ end
75+76+ # @return [CID, nil] CID (Content Identifier) of the record (nil for delete operations)
77+ def cid
78+ @cid ||= @json['cid'] && CID.from_json(@json['cid'])
79+ end
80+81+ # @return [Hash, nil] record data as a plain Ruby Hash (nil for delete operations)
82+ def raw_record
83+ @json['record']
84+ end
85+86+ # Symbol short code of the collection, like `:bsky_post`. If the collection NSID is not
87+ # recognized, the type is `:unknown`. The full NSID is always available through the
88+ # `#collection` property.
89+ #
90+ # @return [Symbol]
91+ # @see Skyfall::Collection
92+ #
93+ def type
94+ Collection.short_code(collection)
95+ end
96+97+ # Returns a string with a representation of the object for debugging purposes.
98+ # @return [String]
99+ def inspect
100+ vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
101+ "#<#{self.class}:0x#{object_id} #{vars}>"
102+ end
103+104+ private
105+106+ def inspectable_variables
107+ instance_variables - [:@message]
108+ end
109+ end
110+end
+14
lib/skyfall/jetstream/unknown_message.rb
···00000000000000
···1+# frozen_string_literal: true
2+3+require_relative '../jetstream'
4+require_relative 'message'
5+6+module Skyfall
7+8+ #
9+ # Jetstream message of an unrecognized type.
10+ #
11+12+ class Jetstream::UnknownMessage < Jetstream::Message
13+ end
14+end
···1+# frozen_string_literal: true
2+3+require_relative 'stream'
4+5+require 'json'
6+require 'time'
7+require 'uri'
8+9+module Skyfall
10+11+ #
12+ # Client of a Jetstream service (JSON-based firehose).
13+ #
14+ # This is an equivalent of {Skyfall::Firehose} for Jetstream sources, mirroring its API.
15+ # It returns messages as instances of subclasses of {Skyfall::Jetstream::Message}, which
16+ # are generally equivalent to the respective {Skyfall::Firehose::Message} variants as much
17+ # as possible.
18+ #
19+ # To connect to a Jetstream websocket, you need to:
20+ #
21+ # * create an instance of Jetstream, passing it the hostname/URL of the server, and optionally
22+ # parameters such as cursor or collection/DID filters
23+ # * set up callbacks to be run when connecting, disconnecting, when a message is received etc.
24+ # (you need to set at least a message handler)
25+ # * call {#connect} to start the connection
26+ # * handle the received messages
27+ #
28+ # @example
29+ # client = Skyfall::Jetstream.new('jetstream2.us-east.bsky.network', {
30+ # wanted_collections: 'app.bsky.feed.post',
31+ # wanted_dids: @dids
32+ # })
33+ #
34+ # client.on_message do |msg|
35+ # next unless msg.type == :commit
36+ #
37+ # op = msg.operation
38+ #
39+ # if op.type == :bsky_post && op.action == :create
40+ # puts "[#{msg.time}] #{msg.repo}: #{op.raw_record['text']}"
41+ # end
42+ # end
43+ #
44+ # client.connect
45+ #
46+ # # You might also want to set some or all of these lifecycle callback handlers:
47+ #
48+ # client.on_connecting { |url| puts "Connecting to #{url}..." }
49+ # client.on_connect { puts "Connected" }
50+ # client.on_disconnect { puts "Disconnected" }
51+ # client.on_reconnect { puts "Connection lost, trying to reconnect..." }
52+ # client.on_timeout { puts "Connection stalled, triggering a reconnect..." }
53+ # client.on_error { |e| puts "ERROR: #{e}" }
54+ #
55+ # @note Most of the methods of this class that you might want to use are defined in {Skyfall::Stream}.
56+ #
57+58+ class Jetstream < Stream
59+60+ # Current cursor (time of the last seen message)
61+ # @return [Integer, nil]
62+ attr_accessor :cursor
63+64+ #
65+ # @param server [String] Address of the server to connect to.
66+ # Expects a string with either just a hostname, or a ws:// or wss:// URL with no path.
67+ # @param params [Hash] options, see below:
68+ #
69+ # @option params [Integer] :cursor
70+ # cursor from which to resume
71+ #
72+ # @option params [Array<String>] :wanted_dids
73+ # DID filter to pass to the server (`:wantedDids` is also accepted);
74+ # value should be a DID string or an array of those
75+ #
76+ # @option params [Array<String, Symbol>] :wanted_collections
77+ # collection filter to pass to the server (`:wantedCollections` is also accepted);
78+ # value should be an NSID string or a symbol shorthand, or an array of those
79+ #
80+ # @raise [ArgumentError] if the server parameter or the options are invalid
81+ #
82+ def initialize(server, params = {})
83+ require_relative 'jetstream/message'
84+ super(server)
85+86+ @params = check_params(params)
87+ @cursor = @params.delete(:cursor)
88+ @root_url = ensure_empty_path(@root_url)
89+ end
90+91+92+ protected
93+94+ # Returns the full URL of the websocket endpoint to connect to.
95+ # @return [String]
96+97+ def build_websocket_url
98+ params = @cursor ? @params.merge(cursor: @cursor) : @params
99+ query = URI.encode_www_form(params)
100+101+ @root_url + "/subscribe" + (query.length > 0 ? "?#{query}" : '')
102+ end
103+104+ # Processes a single message received from the websocket. Passes the received data to the
105+ # {#on_raw_message} handler, builds a {Skyfall::Jetstream::Message} object, and passes it to
106+ # the {#on_message} handler (if defined). Also updates the {#cursor} to this message's
107+ # microsecond timestamp (note: this is skipped if {#on_message} is not set).
108+ #
109+ # @param msg
110+ # {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/API/MessageEvent Faye::WebSocket::API::MessageEvent}
111+ # @return [nil]
112+113+ def handle_message(msg)
114+ data = msg.data
115+ @handlers[:raw_message]&.call(data)
116+117+ if @handlers[:message]
118+ jet_message = Message.new(data)
119+ @cursor = jet_message.time_us
120+ @handlers[:message].call(jet_message)
121+ else
122+ @cursor = nil
123+ end
124+ end
125+126+127+ private
128+129+ def check_params(params)
130+ params ||= {}
131+ processed = {}
132+133+ raise ArgumentError.new("Params should be a hash") unless params.is_a?(Hash)
134+135+ params.each do |k, v|
136+ next if v.nil?
137+138+ if k.is_a?(Symbol)
139+ k = k.to_s
140+ elsif !k.is_a?(String)
141+ raise ArgumentError.new("Invalid params key: #{k.inspect}")
142+ end
143+144+ k = k.gsub(/_([a-zA-Z])/) { $1.upcase }.to_sym
145+ processed[k] = check_option(k, v)
146+ end
147+148+ processed
149+ end
150+151+ def check_option(k, v)
152+ case k
153+ when :wantedCollections
154+ check_wanted_collections(v)
155+ when :wantedDids
156+ check_wanted_dids(v)
157+ when :cursor
158+ check_cursor(v)
159+ when :compress, :requireHello
160+ raise ArgumentError.new("Skyfall::Jetstream doesn't support the #{k.inspect} option yet")
161+ else
162+ raise ArgumentError.new("Unknown option: #{k.inspect}")
163+ end
164+ end
165+166+ def check_wanted_collections(list)
167+ list = [list] unless list.is_a?(Array)
168+169+ list.map do |c|
170+ if c.is_a?(String)
171+ # TODO: more validation
172+ c
173+ elsif c.is_a?(Symbol)
174+ Collection.from_short_code(c) or raise ArgumentError.new("Unknown collection symbol: #{c.inspect}")
175+ else
176+ raise ArgumentError.new("Invalid collection argument: #{c.inspect}")
177+ end
178+ end
179+ end
180+181+ def check_wanted_dids(list)
182+ list = [list] unless list.is_a?(Array)
183+184+ if x = list.detect { |c| !c.is_a?(String) || c !~ /\Adid:[a-z]+:/ }
185+ raise ArgumentError.new("Invalid DID argument: #{x.inspect}")
186+ end
187+188+ # TODO: more validation
189+ list
190+ end
191+192+ def check_cursor(cursor)
193+ cursor.to_i
194+ end
195+ end
196+end
···1+# frozen_string_literal: true
2+3+require_relative 'errors'
4+require 'time'
5+6+module Skyfall
7+8+ #
9+ # A single label emitted from the "subscribeLabels" firehose of a labeller service.
10+ #
11+ # The label assigns some specific value - from a list of available values defined by this
12+ # labeller - to a specific target (at:// URI or a DID). In general, this will usually be either
13+ # a "badge" that a user requested to be assigned to themselves from a fun/informative labeller,
14+ # or some kind of (likely negative) label assigned to a user or post by a moderation labeller.
15+ #
16+ # You generally don't need to create instances of this class manually, but will receive them
17+ # from {Skyfall::Firehose} that's connected to `:subscribe_labels` in the {Stream#on_message}
18+ # callback handler (wrapped in a {Skyfall::Firehose::LabelsMessage}).
19+ #
20+21+ class Label
22+23+ # @return [Hash] the label's JSON data
24+ attr_reader :data
25+26+ #
27+ # @param data [Hash] raw label JSON
28+ # @raise [Skyfall::DecodeError] if the data has an invalid format
29+ # @raise [Skyfall::UnsupportedError] if the label is in an unsupported future version
30+ #
31+ def initialize(data)
32+ @data = data
33+34+ raise DecodeError.new("Missing version: #{data}") unless data.has_key?('ver')
35+ raise DecodeError.new("Invalid version: #{ver}") unless ver.is_a?(Integer) && ver >= 1
36+ raise UnsupportedError.new("Unsupported version: #{ver}") unless ver == 1
37+38+ raise DecodeError.new("Missing source: #{data}") unless data.has_key?('src')
39+ raise DecodeError.new("Invalid source: #{src}") unless src.is_a?(String) && src.start_with?('did:')
40+41+ raise DecodeError.new("Missing uri: #{data}") unless data.has_key?('uri')
42+ raise DecodeError.new("Invalid uri: #{uri}") unless uri.is_a?(String)
43+ raise DecodeError.new("Invalid uri: #{uri}") unless uri.start_with?('at://') || uri.start_with?('did:')
44+ end
45+46+ # @return [Integer] label format version number
47+ def version
48+ @data['ver']
49+ end
50+51+ # DID of the labelling authority (the labeller service).
52+ # @return [String]
53+ def authority
54+ @data['src']
55+ end
56+57+ # AT URI or DID of the labelled subject (e.g. a user or post).
58+ # @return [String]
59+ def subject
60+ @data['uri']
61+ end
62+63+ # @return [CID, nil] CID of the specific version of the subject that this label applies to
64+ def cid
65+ @cid ||= @data['cid'] && CID.from_json(@data['cid'])
66+ end
67+68+ # @return [String] label value
69+ def value
70+ @data['val']
71+ end
72+73+ # @return [Boolean] if true, then this is a negation (delete) of an existing label
74+ def negation?
75+ !!@data['neg']
76+ end
77+78+ # @return [Time] timestamp when the label was created
79+ def created_at
80+ @created_at ||= Time.parse(@data['cts'])
81+ end
82+83+ # @return [Time, nil] optional timestamp when the label expires
84+ def expires_at
85+ @expires_at ||= @data['exp'] && Time.parse(@data['exp'])
86+ end
87+88+ alias ver version
89+ alias src authority
90+ alias uri subject
91+ alias val value
92+ alias neg negation?
93+ alias cts created_at
94+ alias exp expires_at
95+ end
96+end
···1-require_relative 'collection'
2-3-module Skyfall
4- class Operation
5- def initialize(message, json)
6- @message = message
7- @json = json
8- end
9-10- def repo
11- @message.repo
12- end
13-14- def path
15- @json['path']
16- end
17-18- def action
19- @json['action'].to_sym
20- end
21-22- def collection
23- @json['path'].split('/')[0]
24- end
25-26- def rkey
27- @json['path'].split('/')[1]
28- end
29-30- def uri
31- "at://#{repo}/#{path}"
32- end
33-34- def cid
35- @cid ||= @json['cid'] && CID.from_cbor_tag(@json['cid'])
36- end
37-38- def raw_record
39- @raw_record ||= cid && @message.blocks.section_with_cid(cid)
40- end
41-42- def type
43- case collection
44- when Collection::BSKY_POST then :bsky_post
45- when Collection::BSKY_LIKE then :bsky_like
46- when Collection::BSKY_FOLLOW then :bsky_follow
47- when Collection::BSKY_REPOST then :bsky_repost
48- when Collection::BSKY_BLOCK then :bsky_block
49- when Collection::BSKY_PROFILE then :bsky_profile
50- when Collection::BSKY_LISTITEM then :bsky_listitem
51- when Collection::BSKY_FEED then :bsky_feed
52- else :unknown
53- end
54- end
55- end
56-end
···1-require_relative 'messages/websocket_message'
23require 'eventmachine'
4require 'faye/websocket'
5require 'uri'
600007module Skyfall
8- class Stream
9- SUBSCRIBE_REPOS = "com.atproto.sync.subscribeRepos"
1011- NAMED_ENDPOINTS = {
12- :subscribe_repos => SUBSCRIBE_REPOS
13- }
0000001415 MAX_RECONNECT_INTERVAL = 300
1617- attr_accessor :heartbeat_timeout, :heartbeat_interval, :cursor, :auto_reconnect
00000001819- def initialize(server, endpoint, cursor = nil)
20- @endpoint = check_endpoint(endpoint)
21- @server = check_hostname(server)
22- @cursor = cursor
00000000000000000000000000000000000000000000000023 @handlers = {}
24- @heartbeat_mutex = Mutex.new
25- @heartbeat_interval = 5
26- @heartbeat_timeout = 30
27- @last_update = nil
28 @auto_reconnect = true
029 @connection_attempts = 0
00000030 end
31000000000032 def connect
33 return if @ws
34000035 url = build_websocket_url
3637 @handlers[:connecting]&.call(url)
00000038 @engines_on = true
3940 EM.run do
···42 @handlers[:error]&.call(e)
43 end
4445- @ws = Faye::WebSocket::Client.new(url)
4647 @ws.on(:open) do |e|
48 @handlers[:connect]&.call
0049 end
5051 @ws.on(:message) do |msg|
052 @connection_attempts = 0
53-54- data = msg.data.pack('C*')
55- @handlers[:raw_message]&.call(data)
56-57- if @handlers[:message]
58- atp_message = Skyfall::WebsocketMessage.new(data)
59- @cursor = atp_message.seq
60- @handlers[:message].call(atp_message)
61- else
62- @cursor = nil
63- end
64 end
6566 @ws.on(:error) do |e|
···70 @ws.on(:close) do |e|
71 @ws = nil
7273- if @auto_reconnect && @engines_on
74- EM.add_timer(reconnect_delay) do
00075 @connection_attempts += 1
76- @handlers[:reconnect]&.call
77 connect
78 end
79 else
080 @engines_on = false
81 @handlers[:disconnect]&.call
82 EM.stop_event_loop unless @ws
···85 end
86 end
8700000000000000088 def disconnect
89 return unless EM.reactor_running?
90091 @engines_on = false
92 EM.stop_event_loop
93 end
9495 alias close disconnect
9697- def on_message(&block)
98- @handlers[:message] = block
000099 end
100101- def on_raw_message(&block)
102- @handlers[:raw_message] = block
0000103 end
104105- def on_connecting(&block)
106- @handlers[:connecting] = block
000000107 end
108109- def on_connect(&block)
110- @handlers[:connect] = block
0000000000000000000000000000000000000000000000000000000000000000000000000000000000111 end
112113- def on_disconnect(&block)
114- @handlers[:disconnect] = block
0000000000000115 end
116117- def on_error(&block)
118- @handlers[:error] = block
00000119 end
120121- def on_reconnect(&block)
122- @handlers[:reconnect] = block
00000000000000000000000000000000000000000000000000123 end
124125126 private
127000000000000000000000000128 def reconnect_delay
129 if @connection_attempts == 0
130 0
···133 end
134 end
135136- def build_websocket_url
137- url = "wss://#{@server}/xrpc/#{@endpoint}"
138- url += "?cursor=#{@cursor}" if @cursor
139- url
140- end
141142- def check_endpoint(endpoint)
143- if endpoint.is_a?(String)
144- raise ArgumentError("Invalid endpoint name: #{endpoint}") if endpoint.strip.empty? || !endpoint.include?('.')
145- elsif endpoint.is_a?(Symbol)
146- raise ArgumentError("Unknown endpoint: #{endpoint}") if NAMED_ENDPOINTS[endpoint].nil?
147- endpoint = NAMED_ENDPOINTS[endpoint]
00148 else
149- raise ArgumentError("Endpoint should be a string or a symbol")
00150 end
151-152- endpoint
153 end
154155- def check_hostname(server)
156- if server.is_a?(String)
157- raise ArgumentError("Invalid server name: #{server}") if server.strip.empty? || server.include?('/')
158- else
159- raise ArgumentError("Server name should be a string")
160 end
161162- server
163 end
164 end
165end
···1+# frozen_string_literal: true
23require 'eventmachine'
4require 'faye/websocket'
5require 'uri'
67+require_relative 'errors'
8+require_relative 'events'
9+require_relative 'version'
10+11module Skyfall
001213+ # Base class of a websocket client. It provides basic websocket client functionality such as
14+ # connecting to the service, keeping the connection alive and running lifecycle callbacks.
15+ #
16+ # In most cases, you will not create instances of this class directly, but rather use either
17+ # {Firehose} or {Jetstream}. Use this class as a superclass if you need to implement some
18+ # custom client for a websocket API that isn't supported yet.
19+20+ class Stream
21+ extend Events
2223 MAX_RECONNECT_INTERVAL = 300
2425+ # If enabled, the client will try to reconnect if the connection is closed unexpectedly.
26+ # (Default: true)
27+ #
28+ # When the reconnect attempt fails, it will wait with an exponential backoff delay before
29+ # retrying again, up to {MAX_RECONNECT_INTERVAL} seconds.
30+ #
31+ # @return [Boolean]
32+ attr_accessor :auto_reconnect
3334+ # User agent sent in the header when connecting.
35+ #
36+ # Default value is {#default_user_agent} = {#version_string} `(Skyfall/x.y)`. It's recommended
37+ # to set it or extend it with some information that indicates what service this is and who is
38+ # running it (e.g. a Bluesky handle).
39+ #
40+ # @return [String]
41+ # @example
42+ # client.user_agent = "my.service (@my.handle) #{client.version_string}"
43+ attr_accessor :user_agent
44+45+ # If enabled, runs a timer which does periodical "heatbeat checks".
46+ #
47+ # The heartbeat timer is started when the client connects to the service, and checks if the stream
48+ # hasn't stalled and is still regularly sending new messages. If no messages are detected for some
49+ # period of time, the client forces a reconnect.
50+ #
51+ # This is **not** enabled by default, because depending on the service you're connecting to, it
52+ # might be normal to not receive any messages for a while.
53+ #
54+ # @see #heartbeat_timeout
55+ # @see #heartbeat_interval
56+ # @return [Boolean]
57+ attr_accessor :check_heartbeat
58+59+ # Interval in seconds between heartbeat checks (default: 10). Only used if {#check_heartbeat} is set.
60+ # @return [Numeric]
61+ attr_accessor :heartbeat_interval
62+63+ # Number of seconds without messages after which reconnect is triggered (default: 300).
64+ # Only used if {#check_heartbeat} is set.
65+ # @return [Numeric]
66+ attr_accessor :heartbeat_timeout
67+68+ # Time when the most recent message was received from the websocket.
69+ #
70+ # Note: this is _local time_ when the message was received; this is different from the timestamp
71+ # of the message, which is the server time of the original source (PDS) when emitting the message,
72+ # and different from a potential `created_at` saved in the record.
73+ #
74+ # @return [Time, nil]
75+ attr_reader :last_update
76+77+ #
78+ # @param server [String] Address of the server to connect to.
79+ # Expects a string with either just a hostname, or a ws:// or wss:// URL.
80+ #
81+ # @raise [ArgumentError] if the server parameter is invalid
82+ #
83+ def initialize(server)
84+ @root_url = build_root_url(server)
85+86 @handlers = {}
000087 @auto_reconnect = true
88+ @check_heartbeat = false
89 @connection_attempts = 0
90+ @heartbeat_interval = 10
91+ @heartbeat_timeout = 300
92+ @last_update = nil
93+ @user_agent = default_user_agent
94+95+ @handlers[:error] = proc { |e| puts "ERROR: #{e}" }
96 end
9798+ #
99+ # Opens a connection to the configured websocket.
100+ #
101+ # This method starts an EventMachine reactor on the current thread, and will only return
102+ # once the connection is closed.
103+ #
104+ # @return [nil]
105+ # @raise [ConfigError] if no message handler has been configured
106+ # @raise [ReactorActiveError] if another stream is already running
107+ #
108 def connect
109 return if @ws
110111+ if @handlers[:message].nil? && @handlers[:raw_message].nil?
112+ raise ConfigError, "Either on_message or on_raw_message handler needs to be set"
113+ end
114+115 url = build_websocket_url
116117 @handlers[:connecting]&.call(url)
118+119+ @reconnect_timer&.cancel
120+ @reconnect_timer = nil
121+122+ raise ReactorActiveError if existing_reactor?
123+124 @engines_on = true
125126 EM.run do
···128 @handlers[:error]&.call(e)
129 end
130131+ @ws = build_websocket_client(url)
132133 @ws.on(:open) do |e|
134 @handlers[:connect]&.call
135+ @last_update = Time.now
136+ start_heartbeat_timer
137 end
138139 @ws.on(:message) do |msg|
140+ @reconnecting = false
141 @connection_attempts = 0
142+ @last_update = Time.now
143+ handle_message(msg)
000000000144 end
145146 @ws.on(:error) do |e|
···150 @ws.on(:close) do |e|
151 @ws = nil
152153+ if @reconnecting || @auto_reconnect && @engines_on
154+ @handlers[:reconnect]&.call
155+156+ @reconnect_timer&.cancel
157+ @reconnect_timer = EM::Timer.new(reconnect_delay) do
158 @connection_attempts += 1
0159 connect
160 end
161 else
162+ stop_heartbeat_timer
163 @engines_on = false
164 @handlers[:disconnect]&.call
165 EM.stop_event_loop unless @ws
···168 end
169 end
170171+ #
172+ # Forces a reconnect, closing the connection and calling {#connect} again.
173+ # @return [nil]
174+ #
175+ def reconnect
176+ @reconnecting = true
177+ @connection_attempts = 0
178+179+ @ws ? @ws.close : connect
180+ end
181+182+ #
183+ # Closes the connection and stops the EventMachine reactor thread.
184+ # @return [nil]
185+ #
186 def disconnect
187 return unless EM.reactor_running?
188189+ @reconnecting = false
190 @engines_on = false
191 EM.stop_event_loop
192 end
193194 alias close disconnect
195196+ #
197+ # Default user agent sent when connecting to the service. (Currently `"#{version_string}"`)
198+ # @return [String]
199+ #
200+ def default_user_agent
201+ version_string
202 end
203204+ #
205+ # Skyfall version string for use in user agent strings (`"Skyfall/x.y"`).
206+ # @return [String]
207+ #
208+ def version_string
209+ "Skyfall/#{Skyfall::VERSION}"
210 end
211212+ def check_heartbeat=(value)
213+ @check_heartbeat = value
214+215+ if @check_heartbeat && @engines_on && @ws && !@heartbeat_timer
216+ start_heartbeat_timer
217+ elsif !@check_heartbeat && @heartbeat_timer
218+ stop_heartbeat_timer
219+ end
220 end
221222+223+ # @!method on_connecting(block)
224+ # Defines a callback to be run when the client tries to open a connection to the websocket.
225+ # Can be also run as a setter `on_connecting=`.
226+ # @param [Proc] block
227+ # @yieldparam [String] url URL to which the client is connecting
228+ # @return [nil]
229+230+ event_handler :connecting
231+232+ # @!method on_connect(block)
233+ # Defines a callback to be run after a connection to the websocket is opened.
234+ # Can be also run as a setter `on_connect=`.
235+ # @param [Proc] block
236+ # @return [nil]
237+238+ event_handler :connect
239+240+ # @!method on_raw_message(block)
241+ # Defines a callback to be run when a message is received, passing a raw data packet as
242+ # received from the websocket (plain text or binary). Can be also run as a setter `on_raw_message=`.
243+ # @param [Proc] block
244+ # @yieldparam [String] data payload of the received message
245+ # @return [nil]
246+247+ event_handler :raw_message
248+249+ # @!method on_message(block)
250+ # Defines a callback to be run when a message is received, passing the message as a parsed
251+ # object of an appropriate message class. Can be also run as a setter `on_message=`.
252+ # @param [Proc] block
253+ # @yieldparam [Object] message parsed message of an appropriate class
254+ # @return [nil]
255+256+ event_handler :message
257+258+ # @!method on_disconnect(block)
259+ # Defines a callback to be run after a connection to the websocket is closed (and the client
260+ # does not reconnect). Can be also run as a setter `on_disconnect=`.
261+ #
262+ # This callback is not run when `on_reconnect` fires.
263+ # @param [Proc] block
264+ # @return [nil]
265+266+ event_handler :disconnect
267+268+ # @!method on_reconnect(block)
269+ # Defines a callback to be run when a connection to the websocket is broken, but the client
270+ # initiates or schedules a reconnect (which may happen after a delay). Can be also run as
271+ # a setter `on_reconnect=`.
272+ # @param [Proc] block
273+ # @return [nil]
274+275+ event_handler :reconnect
276+277+ # @!method on_timeout(block)
278+ # Defines a callback to be run when the heartbeat timer forces a reconnect. A reconnect is
279+ # triggered after not receiving any messages for a period of time specified in {#heartbeat_timeout}
280+ # (if {#check_heartbeat} is enabled). Can be also run as a setter `on_timeout=`.
281+ #
282+ # This callback is also followed by `on_reconnect`.
283+ # @param [Proc] block
284+ # @return [nil]
285+286+ event_handler :timeout
287+288+ # @!method on_error(block)
289+ # Defines a callback to be run when the websocket connection returns an error. Can be also
290+ # run as a setter `on_error=`.
291+ #
292+ # Default handler prints the error to stdout.
293+ #
294+ # @param [Proc] block
295+ # @yieldparam [Exception] error the received error
296+ # @return [nil]
297+298+ event_handler :error
299+300+301+ # Returns a string with a representation of the object for debugging purposes.
302+ # @return [String]
303+ def inspect
304+ vars = inspectable_variables.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
305+ "#<#{self.class}:0x#{object_id} #{vars}>"
306 end
307308+309+ protected
310+311+ # @note This method is designed to be overridden in subclasses.
312+ #
313+ # Returns the full URL of the websocket endpoint to connect to, with path and query parameters
314+ # if needed. The base implementation simply returns the base URL passed to the initializer.
315+ #
316+ # Override this method in subclasses to point to the specific endpoint and add necessary
317+ # parameters like cursor or filters, depending on the arguments passed to the constructor.
318+ #
319+ # @return [String]
320+321+ def build_websocket_url
322+ @root_url
323 end
324325+ # Builds and configures a websocket client object that is used to connect to the requested service.
326+ #
327+ # @return [Faye::WebSocket::Client]
328+ # see {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/Client Faye::WebSocket::Client}
329+330+ def build_websocket_client(url)
331+ Faye::WebSocket::Client.new(url, nil, { headers: { 'User-Agent' => user_agent }.merge(request_headers) })
332 end
333334+ # @note This method is designed to be overridden in subclasses.
335+ #
336+ # Processes a single message received from the websocket. The implementation is expected to
337+ # parse the message from a plain text or binary form, build an appropriate message object,
338+ # and call the `:message` and/or `:raw_message` callback handlers, passing the right parameters.
339+ #
340+ # The base implementation simply takes the message data and passes it as is to `:raw_message`,
341+ # and does not call `:message` at all.
342+ #
343+ # @param msg
344+ # {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/API/MessageEvent Faye::WebSocket::API::MessageEvent}
345+ # @return [nil]
346+347+ def handle_message(msg)
348+ data = msg.data
349+ @handlers[:raw_message]&.call(data)
350+ end
351+352+ # Additional headers to pass with the request when connecting to the websocket endpoint.
353+ # The user agent header (built from {#user_agent}) is added separately.
354+ #
355+ # The base implementation returns an empty hash.
356+ #
357+ # @return [Hash] a hash of `{ header_name => header_value }`
358+359+ def request_headers
360+ {}
361+ end
362+363+ # Returns the underlying websocket client object. It can be used e.g. to send messages back
364+ # to the server (but see also: {#send_data}).
365+ #
366+ # @return [Faye::WebSocket::Client]
367+ # see {https://rubydoc.info/gems/faye-websocket/Faye/WebSocket/Client Faye::WebSocket::Client}
368+369+ def socket
370+ @ws
371+ end
372+373+ # Sends a message back to the server.
374+ #
375+ # @param data [String, Array] the message to send -
376+ # a string for text websockets, a binary string or byte array for binary websockets
377+ # @return [Boolean] true if the message was sent successfully
378+379+ def send_data(data)
380+ @ws.send(data)
381+ end
382+383+ # @return [Array<Symbol>] list of instance variables to be printed in the {#inspect} output
384+ def inspectable_variables
385+ instance_variables - [:@handlers, :@ws]
386 end
387388389 private
390391+ def existing_reactor?
392+ EM.reactor_running? && !@engines_on
393+ end
394+395+ def start_heartbeat_timer
396+ return if !@check_heartbeat || @heartbeat_interval.to_f <= 0 || @heartbeat_timeout.to_f <= 0
397+ return if @heartbeat_timer
398+399+ @heartbeat_timer = EM::PeriodicTimer.new(@heartbeat_interval) do
400+ next if @ws.nil? || @heartbeat_timeout.to_f <= 0
401+ time_passed = Time.now - @last_update
402+403+ if time_passed > @heartbeat_timeout
404+ @handlers[:timeout]&.call
405+ reconnect
406+ end
407+ end
408+ end
409+410+ def stop_heartbeat_timer
411+ @heartbeat_timer&.cancel
412+ @heartbeat_timer = nil
413+ end
414+415 def reconnect_delay
416 if @connection_attempts == 0
417 0
···420 end
421 end
422423+ def build_root_url(server)
424+ if !server.is_a?(String)
425+ raise ArgumentError, "Server parameter should be a string"
426+ end
0427428+ if server.include?('://')
429+ uri = URI(server)
430+431+ if uri.scheme != 'ws' && uri.scheme != 'wss'
432+ raise ArgumentError, "Server parameter should be a hostname or a ws:// or wss:// URL"
433+ end
434+435+ uri.to_s
436 else
437+ server = "wss://#{server}"
438+ uri = URI(server) # raises if invalid
439+ server
440 end
00441 end
442443+ def ensure_empty_path(url)
444+ url = url.chomp('/')
445+446+ if URI(url).path != ''
447+ raise ArgumentError, "Server URL should only include a hostname, without any path"
448 end
449450+ url
451 end
452 end
453end
+1-1
lib/skyfall/version.rb
···1# frozen_string_literal: true
23module Skyfall
4- VERSION = "0.2.2"
5end
···1# frozen_string_literal: true
23module Skyfall
4+ VERSION = "0.6.1"
5end