+11
-4
Gemfile
+11
-4
Gemfile
···
1
source "https://rubygems.org"
2
3
-
gem 'blue_factory', '~> 0.1', '>= 0.1.4'
4
-
gem 'skyfall', '~> 0.2', '>= 0.2.3'
5
6
gem 'activerecord', '~> 8.0'
7
gem 'sinatra-activerecord', '~> 2.0'
8
-
gem 'sqlite3'
9
gem 'rake'
10
gem 'rainbow'
11
12
group :development do
13
-
gem 'irb'
14
gem 'debug'
15
gem 'thin'
16
gem 'capistrano', '~> 2.0'
17
end
···
1
source "https://rubygems.org"
2
3
+
gem 'blue_factory', '~> 0.1.6'
4
+
gem 'skyfall', '~> 0.6'
5
+
gem 'didkit', '~> 0.2'
6
7
gem 'activerecord', '~> 8.0'
8
gem 'sinatra-activerecord', '~> 2.0'
9
+
gem 'sqlite3', '~> 2.5'
10
gem 'rake'
11
gem 'rainbow'
12
+
gem 'irb'
13
14
group :development do
15
gem 'debug'
16
gem 'thin'
17
gem 'capistrano', '~> 2.0'
18
+
19
+
# for net-ssh & capistrano
20
+
gem 'ed25519', '>= 1.2', '< 2.0'
21
+
gem 'bcrypt_pbkdf', '>= 1.0', '< 2.0'
22
end
23
+
24
+
gem 'ostruct' # for rake, to remove when rake is updated
+45
-33
Gemfile.lock
+45
-33
Gemfile.lock
···
21
tzinfo (~> 2.0, >= 2.0.5)
22
uri (>= 0.13.1)
23
base32 (0.3.4)
24
-
base64 (0.2.0)
25
-
benchmark (0.4.0)
26
-
bigdecimal (3.1.9)
27
-
blue_factory (0.1.4)
28
sinatra (~> 3.0)
29
capistrano (2.15.11)
30
highline
···
34
net-ssh-gateway (>= 1.1.0)
35
cbor (0.5.9.8)
36
concurrent-ruby (1.3.5)
37
-
connection_pool (2.5.0)
38
daemons (1.4.1)
39
date (3.4.1)
40
-
debug (1.10.0)
41
irb (~> 1.10)
42
reline (>= 0.3.8)
43
-
drb (2.2.1)
44
eventmachine (1.2.7)
45
-
faye-websocket (0.11.3)
46
eventmachine (>= 0.12.0)
47
-
websocket-driver (>= 0.5.1)
48
-
highline (2.1.0)
49
i18n (1.14.7)
50
concurrent-ruby (~> 1.0)
51
-
io-console (0.8.0)
52
-
irb (1.15.1)
53
pp (>= 0.6.0)
54
rdoc (>= 4.0.0)
55
reline (>= 0.4.2)
56
-
logger (1.6.6)
57
minitest (5.25.5)
58
mustermann (3.0.3)
59
ruby2_keywords (~> 0.0.1)
60
-
net-scp (4.0.0)
61
net-ssh (>= 2.6.5, < 8.0.0)
62
net-sftp (4.0.0)
63
net-ssh (>= 5.0.0, < 8.0.0)
64
-
net-ssh (7.2.0)
65
net-ssh-gateway (2.0.0)
66
net-ssh (>= 4.0.0)
67
pp (0.6.2)
68
prettyprint
69
prettyprint (0.2.0)
70
-
psych (5.2.3)
71
date
72
stringio
73
-
rack (2.2.13)
74
rack-protection (3.2.0)
75
base64 (>= 0.1.0)
76
rack (~> 2.2, >= 2.2.4)
77
rainbow (3.1.1)
78
-
rake (13.2.1)
79
-
rdoc (6.13.0)
80
psych (>= 4.0.0)
81
-
reline (0.6.0)
82
io-console (~> 0.5)
83
ruby2_keywords (0.0.5)
84
securerandom (0.4.1)
···
90
sinatra-activerecord (2.0.28)
91
activerecord (>= 4.1)
92
sinatra (>= 1.0)
93
-
skyfall (0.5.0)
94
base32 (~> 0.3, >= 0.3.4)
95
base64 (~> 0.1)
96
cbor (~> 0.5, >= 0.5.9.6)
97
eventmachine (~> 1.2, >= 1.2.7)
98
-
faye-websocket (~> 0.11)
99
-
sqlite3 (2.6.0-arm64-darwin)
100
-
sqlite3 (2.6.0-x86_64-linux-gnu)
101
-
stringio (3.1.6)
102
-
thin (1.8.2)
103
daemons (~> 1.0, >= 1.0.9)
104
eventmachine (~> 1.0, >= 1.0.4)
105
-
rack (>= 1, < 3)
106
-
tilt (2.6.0)
107
timeout (0.4.3)
108
tzinfo (2.0.6)
109
concurrent-ruby (~> 1.0)
110
uri (1.0.3)
111
-
websocket-driver (0.7.7)
112
base64
113
websocket-extensions (>= 0.1.0)
114
websocket-extensions (0.1.5)
···
119
120
DEPENDENCIES
121
activerecord (~> 8.0)
122
-
blue_factory (~> 0.1, >= 0.1.4)
123
capistrano (~> 2.0)
124
debug
125
irb
126
rainbow
127
rake
128
sinatra-activerecord (~> 2.0)
129
-
skyfall (~> 0.2, >= 0.2.3)
130
-
sqlite3
131
thin
132
133
BUNDLED WITH
134
-
2.6.6
···
21
tzinfo (~> 2.0, >= 2.0.5)
22
uri (>= 0.13.1)
23
base32 (0.3.4)
24
+
base64 (0.3.0)
25
+
bcrypt_pbkdf (1.1.1)
26
+
benchmark (0.4.1)
27
+
bigdecimal (3.2.2)
28
+
blue_factory (0.1.6)
29
sinatra (~> 3.0)
30
capistrano (2.15.11)
31
highline
···
35
net-ssh-gateway (>= 1.1.0)
36
cbor (0.5.9.8)
37
concurrent-ruby (1.3.5)
38
+
connection_pool (2.5.3)
39
daemons (1.4.1)
40
date (3.4.1)
41
+
debug (1.11.0)
42
irb (~> 1.10)
43
reline (>= 0.3.8)
44
+
didkit (0.2.3)
45
+
drb (2.2.3)
46
+
ed25519 (1.4.0)
47
+
erb (5.0.2)
48
eventmachine (1.2.7)
49
+
faye-websocket (0.12.0)
50
eventmachine (>= 0.12.0)
51
+
websocket-driver (>= 0.8.0)
52
+
highline (3.1.2)
53
+
reline
54
i18n (1.14.7)
55
concurrent-ruby (~> 1.0)
56
+
io-console (0.8.1)
57
+
irb (1.15.2)
58
pp (>= 0.6.0)
59
rdoc (>= 4.0.0)
60
reline (>= 0.4.2)
61
+
logger (1.7.0)
62
minitest (5.25.5)
63
mustermann (3.0.3)
64
ruby2_keywords (~> 0.0.1)
65
+
net-scp (4.1.0)
66
net-ssh (>= 2.6.5, < 8.0.0)
67
net-sftp (4.0.0)
68
net-ssh (>= 5.0.0, < 8.0.0)
69
+
net-ssh (7.3.0)
70
net-ssh-gateway (2.0.0)
71
net-ssh (>= 4.0.0)
72
+
ostruct (0.6.2)
73
pp (0.6.2)
74
prettyprint
75
prettyprint (0.2.0)
76
+
psych (5.2.6)
77
date
78
stringio
79
+
rack (2.2.17)
80
rack-protection (3.2.0)
81
base64 (>= 0.1.0)
82
rack (~> 2.2, >= 2.2.4)
83
rainbow (3.1.1)
84
+
rake (13.3.0)
85
+
rdoc (6.14.2)
86
+
erb
87
psych (>= 4.0.0)
88
+
reline (0.6.1)
89
io-console (~> 0.5)
90
ruby2_keywords (0.0.5)
91
securerandom (0.4.1)
···
97
sinatra-activerecord (2.0.28)
98
activerecord (>= 4.1)
99
sinatra (>= 1.0)
100
+
skyfall (0.6.0)
101
base32 (~> 0.3, >= 0.3.4)
102
base64 (~> 0.1)
103
cbor (~> 0.5, >= 0.5.9.6)
104
eventmachine (~> 1.2, >= 1.2.7)
105
+
faye-websocket (~> 0.12)
106
+
sqlite3 (2.7.2-arm64-darwin)
107
+
sqlite3 (2.7.2-x86_64-linux-gnu)
108
+
stringio (3.1.7)
109
+
thin (2.0.1)
110
daemons (~> 1.0, >= 1.0.9)
111
eventmachine (~> 1.0, >= 1.0.4)
112
+
logger
113
+
rack (>= 1, < 4)
114
+
tilt (2.6.1)
115
timeout (0.4.3)
116
tzinfo (2.0.6)
117
concurrent-ruby (~> 1.0)
118
uri (1.0.3)
119
+
websocket-driver (0.8.0)
120
base64
121
websocket-extensions (>= 0.1.0)
122
websocket-extensions (0.1.5)
···
127
128
DEPENDENCIES
129
activerecord (~> 8.0)
130
+
bcrypt_pbkdf (>= 1.0, < 2.0)
131
+
blue_factory (~> 0.1.6)
132
capistrano (~> 2.0)
133
debug
134
+
didkit (~> 0.2)
135
+
ed25519 (>= 1.2, < 2.0)
136
irb
137
+
ostruct
138
rainbow
139
rake
140
sinatra-activerecord (~> 2.0)
141
+
skyfall (~> 0.6)
142
+
sqlite3 (~> 2.5)
143
thin
144
145
BUNDLED WITH
146
+
2.6.9
+1
-1
LICENSE.txt
+1
-1
LICENSE.txt
+159
-22
README.md
+159
-22
README.md
···
1
-
<h1>Bluesky feeds in Ruby <img src="https://github.com/mackuba/bluesky-feeds-rb/assets/28465/81159f5a-82f6-4520-82c1-434057905a2c" style="width: 28px; margin-left: 5px; position: relative; top: 1px;"></h1>
2
3
-
This repo is an example or template that you can use to create a "feed generator" service for the Bluesky social network that hosts custom feeds. It's a reimplementation of the official TypeScript [feed-generator](https://github.com/bluesky-social/feed-generator) example in Ruby.
4
5
6
-
## How do feed generators work
7
8
-
**\#TODO** - please read the README of the official [feed-generator](https://github.com/bluesky-social/feed-generator) project.
9
10
11
## Architecture of the app
12
13
The project can be divided into three major parts:
14
15
-
1. The "input" part, which subscribes to the firehose stream on the Bluesky server, reads and processes all incoming messages, and saves relevant posts and any other data to a local database.
16
2. The "output" part, which makes the list of posts available as a feed server that implements the required "feed generator" endpoints.
17
-
3. Everything else in the middle - the database, the models and feed classes.
18
19
-
The first two parts were mostly abstracted away in the forms of two Ruby gems, namely [skyfall](https://github.com/mackuba/skyfall) for connecting to the firehose and [blue_factory](https://github.com/mackuba/blue_factory) for hosting the feed generator interface. The part in the middle is mostly up to you, since it depends greatly on what exactly you want to achieve (what kind of feed algorithms to implement, what data you need to keep, what database to use and so on) - but you can use this project as a good starting point.
20
21
-
See the repositories of these two projects for more info on what they implement and how you can configure and use them.
22
23
24
## Setting up
25
26
-
First, you need to set up the database. By default, the app is configured to use SQLite and to create database files in `db` directory. If you want to use e.g. MySQL or PostgreSQL, you need to add a different database adapter gem to the [`Gemfile`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/Gemfile) and change the configuration in [`config/database.yml`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/config/database.yml).
27
28
To create the database, run the migrations:
29
···
31
bundle exec rake db:migrate
32
```
33
34
-
The feed configuration is done in [`app/config.rb`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/app/config.rb). You need to set there:
35
36
- the DID identifier of the publisher (your account)
37
- the hostname on which the service will be running
38
39
-
Next, you need to create some feed classes in [`app/feeds`](https://github.com/mackuba/bluesky-feeds-rb/tree/master/app/feeds). See the included feeds like [`StarWarsFeed`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/app/feeds/star_wars_feed.rb) as an example. The [`Feed`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/app/feeds/feed.rb) superclass provides a `#get_posts` implementation which loads the posts in a feed in response to a request and passes the URIs to the server.
40
41
Once you have the feeds prepared, configure them in `app/config.rb`:
42
···
44
BlueFactory.add_feed 'starwars', StarWarsFeed.new
45
```
46
47
48
-
### Running in development
49
50
-
To run the firehose stream, use the [`firehose.rb`](https://github.com/mackuba/bluesky-feeds-rb/tree/master/firehose.rb) script. By default, it will save all posts to the database and print progress dots for every saved post, and will print the text of each post that matches any feed's conditions. See the options in the file to change this.
51
52
-
In another terminal window, use the [`server.rb`](https://github.com/mackuba/bluesky-feeds-rb/tree/master/server.rb) script to run the server. It should respond to such requests:
53
54
```
55
curl -i http://localhost:3000/.well-known/did.json
···
57
curl -i http://localhost:3000/xrpc/app.bsky.feed.getFeedSkeleton?feed=at://did:plc:.../app.bsky.feed.generator/starwars
58
```
59
60
-
### Running in production
61
62
-
First, you need to make sure that the firehose script is always running and is restarted if necessary. One option to do this could be writing a `systemd` service config file and adding it to `/etc/systemd/system`. You can find an example service file in [`dist/bsky_feeds.service`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/dist/bsky_feeds.service).
63
64
-
To run the server part, you need an HTTP server and a Ruby app server. The choice is up to you and the configuration will depend on your selected config. My recommendation is Nginx with either Passenger (runs your app automatically from Nginx) or something like Puma (needs to be started by e.g. `systemd` like the firehose). You can find an example of Nginx configuration for Passenger in [`dist/feeds-nginx.conf`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/dist/feeds-nginx.conf).
65
66
67
-
## Publishing the feed
68
69
Once you have the feed deployed to the production server, you can use the `bluesky:publish` Rake task (from the [blue_factory](https://github.com/mackuba/blue_factory) gem) to upload the feed configuration to the Bluesky network.
70
71
You need to make sure that you have configured the feed's metadata in the feed class:
72
73
-
- `display_name` (required) - the publicly visible name of your feed, e.g. "Star Wars Feed" (should be something short)
74
-
- `description` (optional) - a longer (~1-2 lines) description of what the feed does, displayed on the feed page as the "bio"
75
-
- `avatar_file` (optional) - path to an avatar image from the project's root (PNG or JPG)
76
77
When you're ready, run the rake task passing the feed key (you will be asked for the uploader account's password):
78
···
81
```
82
83
84
## Credits
85
86
-
Copyright ยฉ 2023 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/mackuba.eu)).
87
88
The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT).
89
···
1
+
<h1>Bluesky feeds in Ruby <img src="https://raw.githubusercontent.com/mackuba/bluesky-feeds-rb/refs/heads/master/images/ruby.png" width="26"></h1>
2
+
3
+
This repo is an example or template that you can use to create a "feed generator" service for the Bluesky social network which hosts custom feeds. It's a reimplementation of the official TypeScript [feed-generator](https://github.com/bluesky-social/feed-generator) example in Ruby.
4
+
5
+
This app is extracted from my personal feed service app running on [blue.mackuba.eu](https://blue.mackuba.eu) which hosts all my custom feeds. My own project has the exact same structure, it just has more feeds, models and stuff in it (and I recently migrated it to Postgres).
6
7
+
8
+
## How feed generators work
9
+
10
+
This is well explained on the Bluesky documentation site, in the section "[Custom Feeds](https://docs.bsky.app/docs/starter-templates/custom-feeds)", and in the readme of the official TypeScript [feed-generator](https://github.com/bluesky-social/feed-generator) project.
11
+
12
+
The gist is this:
13
14
+
- you (feed operator) run a service on your server, which implements a few specific XRPC endpoints
15
+
- a feed record is uploaded to your account, including metadata and location of the feed generator service
16
+
- when the user wants to load the feed, the AppView makes a request to your service on their behalf
17
+
- your service looks at the request params, and returns a list of posts it selected in the form of at:// URIs
18
+
- the AppView takes those URIs and maps them to full posts, which it returns to the user's app
19
20
+
How exactly those posts are selected to be returned in the given request is completely up to you, the only requirement is that these are posts that the AppView will have in its database, since you only send URIs, not actual post data. In most cases, these will be "X latest posts matching some condition". In the request, you get the URI of the specific feed (there can be, and usually is, more than one on the service), `limit`, `cursor`, and an authentication token from which you can extract the DID of the calling user (in case the feed is a personalized one).
21
22
+
It's not a strict requirement, but in order to be able to pick and return those post URIs, in almost all cases the feed service also needs to have a separate component that streams posts from the relay "firehose" and saves some or all of them to a local database.
23
24
25
## Architecture of the app
26
27
The project can be divided into three major parts:
28
29
+
1. The "input" part, which subscribes to the firehose stream on the Bluesky relay, reads and processes all incoming messages, and saves relevant posts and any other data to a local database.
30
2. The "output" part, which makes the list of posts available as a feed server that implements the required "feed generator" endpoints.
31
+
3. Everything else in the middle โ the database, the models and feed classes.
32
+
33
+
The first two parts were mostly abstracted away in the forms of two Ruby gems, namely [skyfall](https://github.com/mackuba/skyfall) for connecting to the firehose and [blue_factory](https://github.com/mackuba/blue_factory) for hosting the feed generator interface. See the repositories of these two projects for more info on what they implement and how you can configure and use them.
34
+
35
+
The part in the middle is mostly up to you, since it depends greatly on what exactly you want to achieve (what kind of feed algorithms to implement, what data you need to keep, what database to use and so on) โ but you can use this project as a good starting point.
36
+
37
+
(The rest of the readme assumes you know Ruby to some degree and are at least somewhat familiar with ActiveRecord.)
38
+
39
+
40
+
### Feeds
41
+
42
+
The Bluesky API allows you to run feeds using basically any algorithm you want, and there are several main categories of feeds: chronological feeds based on keywords or some simple conditions, "top of the week" feeds sorted by number of likes, or Discover-like feeds with a personalized algorithm and a random-ish order.
43
+
44
+
The [blue_factory](https://github.com/mackuba/blue_factory) gem used here should allow you to build any kind of feed you want; its main API is a `get_posts` method that you implement in a feed class, which gets request params and returns an array of hashes with post URIs. The decision on how to pick these URIs is up to you.
45
+
46
+
However, the sample code in this project is mostly targeted at the most common type of feeds, the keyword-based chronological ones. It defines a base feed class [Feed](app/feeds/feed.rb), which includes an implementation of `get_posts` that loads post records from the database, where they have been already assigned earlier to the given feed, so the request handling involves only a very simple query filtered by feed ID. The subclasses of `Feed` provide their versions of a `post_matches?` method, which is used by the firehose client process to determine where a post should be added.
47
+
48
+
If you want to implement a different kind of feed, e.g. a Following-style feed like my "[Follows & Replies](https://bsky.app/profile/did:plc:oio4hkxaop4ao4wz2pp3f4cr/feed/follows-replies)", that should also be possible with this architecture, but you need to implement a custom version of `get_posts` in a given feed class that does some more complex queries.
49
+
50
+
The feed classes also include a set of simple getter methods that return metadata about the given feed, like name, description, avatar etc.
51
+
52
+
53
+
### Database & models
54
+
55
+
By default, the app is configured to use SQLite and to create database files in `db` directory. Using MySQL or PostgreSQL should also be possible with some minor changes in the code (I've tried both) โ but SQLite has been working mostly fine for me in production with a database as large as 200+ GB (>200 mln post records). The important thing here is that there's only one "writer" (the firehose process), and the Sinatra server process(es) only read data, so you don't normally run into concurrent write issues, unless you add different unrelated features.
56
+
57
+
There are two main tables/models: [Post](app/models/post.rb) and [FeedPost](app/models/feed_post.rb). `Post` stores post records as received from the Bluesky relay โ the DID of the author, "rkey" of the post record, the post text, and other data as JSON.
58
+
59
+
`FeedPost` records link specific posts into specific feeds. When a post is saved, the post instance is passed to all configured feed classes, and each of them checks (via `post_matches?`) if the post matches the feed's keywords and if it should be included in that feed. In such case, a matching `FeedPost` is also created. `feed_posts` is kind of like a many-to-many join table, except there is no `feeds` table, it's sort of virtual (feeds are defined in code). `FeedPost` records have a `post_id` referencing a `Post`, and a `feed_id` with the feed number, which is defined in subclasses of the `Feed` class (which is *not* an AR model). Each `Feed` class has one different `feed_id` assigned in code.
60
+
61
+
The app can be configured to either save every single post, with only some of them having `FeedPost` records referencing them, or to save only the posts which have been added to at least one feed. The mode is selected by using the options `-da` or `-dm` respectively in [`bin/firehose`](bin/firehose). By default, the app uses `-da` (all) mode in development and the `-dm` (matching) mode in production.
62
+
63
+
Saving all posts allows you to rescan posts when you make changes to a feed and include older posts that were skipped before, but at the cost of storing orders of magnitude more data (around 4 mln posts daily as of July 2025). Saving only matching posts keeps the database much more compact and manageable, but without the ability to re-check missed older posts (or to build feeds using different algorithms than plain keyword matching, e.g. Following-style feeds).
64
+
65
+
There is an additional `subscriptions` table, which stores the most recent cursor for the relay you're connecting to. This is used when reconnecting after network connection issues or downtime, so you can catch up the missed events added in the meantime since last known position.
66
+
67
+
68
+
### Firehose client
69
+
70
+
The firehose client service, using the [skyfall](https://github.com/mackuba/skyfall) gem, is implemented in [`app/firehose_stream.rb`](app/firehose_stream.rb). Skyfall handles things like connecting to the websocket and parsing the returned messages; what you need is mostly to provide lifecycle callbacks (which mostly print logs), and to handle the incoming messages by checking the message type and included data.
71
+
72
+
The most important message type is `:commit`, which includes record operations. For those messages, we check the record type and process the record accordingly โ in this case, we're only really looking at post records (`:bsky_post`). For "delete" events we find and delete a `Post` record, for "create" events we build one from the provided data, then pass it through all configured feeds to see if it matches any, then optionally create `FeedPost` references.
73
+
74
+
All processing here is done inline, single-threaded, within the event processing loop. This should be [more than fine](https://journal.mackuba.eu/2025/06/24/firehose-go-brrr/) in practice even with firehose traffic several times bigger than it is now, as long as you aren't doing (a lot of) network requests within the loop. This could be expanded into a multi-process setup with a Redis queue and multiple workers, but right now there's no need for that.
75
+
76
77
+
### XRPC Server
78
79
+
The server implementation is technically in the [blue_factory](https://github.com/mackuba/blue_factory) gem. It's based on [Sinatra](https://sinatrarb.com), and the Sinatra class implementing the 3 required endpoints is included there in [`server.rb`](https://github.com/mackuba/blue_factory/blob/master/lib/blue_factory/server.rb) and can be used as is. It's configured using static methods on the `BlueFactory` module, which is done in [`app/config.rb`](app/config.rb) here.
80
+
81
+
As an optional thing, the [`app/server.rb`](app/server.rb) file includes some slightly convoluted code that lets you run a block of code in the context of the Sinatra server class, where you can use any normal [Sinatra APIs](https://sinatrarb.com/intro.html) to define additional routes, helpers, change Sinatra configuration, and so on. The example there adds a root `/` endpoint, which returns simple HTML listing available feeds.
82
83
84
## Setting up
85
86
+
This app should run on any somewhat recent version of Ruby, but of course it's recommended to run one that's still maintained, ideally the latest one. It's also recommended to install it with [YJIT support](https://www.leemeichin.com/posts/ruby-32-jit.html), and on Linux also with [jemalloc](https://scalingo.com/blog/improve-ruby-application-memory-jemalloc).
87
+
88
+
First, you need to install the dependencies of course:
89
+
90
+
```
91
+
bundle install
92
+
```
93
+
94
+
Next, set up the SQLite database. If you want to use e.g. MySQL or PostgreSQL, you need to add a different database adapter gem to the [`Gemfile`](./Gemfile) and change the configuration in [`config/database.yml`](config/database.yml).
95
96
To create the database, run the migrations:
97
···
99
bundle exec rake db:migrate
100
```
101
102
+
103
+
### Configuration
104
+
105
+
The feed configuration is done in [`app/config.rb`](app/config.rb). You need to set there:
106
107
- the DID identifier of the publisher (your account)
108
- the hostname on which the service will be running
109
110
+
Next, you need to create some feed classes in [`app/feeds`](app/feeds). See the included feeds like [StarWarsFeed](app/feeds/star_wars_feed.rb) as an example.
111
112
Once you have the feeds prepared, configure them in `app/config.rb`:
113
···
115
BlueFactory.add_feed 'starwars', StarWarsFeed.new
116
```
117
118
+
The first argument is the "rkey" which will be visible at the end of the feed URL.
119
+
120
+
If you want to implement some kind of authentication or personalization in your feeds, uncomment the `:enable_unsafe_auth` line in the `config.rb`, and see the commented out alternative implementation of `get_posts` in [`app/feeds/feed.rb`](app/feeds/feed.rb#L68-L99).
121
+
122
+
(Note: as the "unsafe" part in the name implies, this does not currently fully validate the user tokens โ see the "[Authentication](https://github.com/mackuba/blue_factory#authentication)" section in the `blue_factory` readme for more info.)
123
+
124
+
125
+
## Running in development
126
+
127
+
The app uses two separate processes, for the firehose stream client, and for the XRPC server that handles incoming feed requests.
128
129
+
To run the firehose client, use the [`bin/firehose`](bin/firehose) script. By default, it will save all posts to the database and print progress dots for every saved post, and will print the text of each post that matches any feed's conditions. See the options in the file or in `--help` output to change this.
130
131
+
The app uses one of Bluesky's official [Jetstream](https://github.com/bluesky-social/jetstream) servers as the source by default. If you want to use a different Jetstream server, edit `DEFAULT_JETSTREAM` in [`app/firehose_stream.rb`](app/firehose_stream.rb#L14), or pass a `FIREHOSE=...` env variable on the command line. You can also use a full ATProto relay instead โ in that case you will also need to replace the [initializer in the `start` method](app/firehose_stream.rb#L39-L45).
132
133
+
In another terminal window, use the [`bin/server`](bin/server) script to run the server. It should respond to such requests:
134
135
```
136
curl -i http://localhost:3000/.well-known/did.json
···
138
curl -i http://localhost:3000/xrpc/app.bsky.feed.getFeedSkeleton?feed=at://did:plc:.../app.bsky.feed.generator/starwars
139
```
140
141
+
### Useful Rake tasks
142
+
143
+
While working on feeds, you may find these two Rake tasks useful:
144
+
145
+
```
146
+
bundle exec rake print_feed KEY=starwars | less -r
147
+
```
148
149
+
This task prints the posts included in a feed in a readable format, reverse-chronologically. Optionally, add e.g. `N=500` to include more entries (default is 100).
150
151
+
```
152
+
bundle exec rake rebuild_feed ...
153
+
```
154
155
+
This task rescans the posts in the database after you edit some feed code, and adds/removes posts to the feed if they now match / no longer match. It has three main modes:
156
157
+
```
158
+
bundle exec rake rebuild_feed KEY=starwars DAYS=7
159
+
```
160
+
161
+
- removes all current posts from the feed, scans the given number of days back and re-adds matching posts to the feed
162
+
163
+
```
164
+
bundle exec rake rebuild_feed KEY=starwars DAYS=7 APPEND_ONLY=1
165
+
```
166
+
167
+
- scans the given number of days back and adds additional matching posts to the feed, but without touching existing posts in the feed
168
+
169
+
```
170
+
bundle exec rake rebuild_feed KEY=starwars ONLY_EXISTING=1
171
+
```
172
+
173
+
- quickly checks only posts included currently in the feed, and removes them if needed
174
+
175
+
There are also `DRY_RUN`, `VERBOSE` and `UNSAFE` env options, see [`feeds.rake`](lib/tasks/feeds.rake) for more info.
176
+
177
+
178
+
## Running in production
179
+
180
+
In my Ruby projects I'm using the classic [Capistrano](https://capistranorb.com) tool to deploy to production servers (and in the ancient 2.x version, since I still haven't found the time to migrate my setup scripts to 3.xโฆ). There is a sample [`deploy.rb`](config/deploy.rb) config file included in the `config` directory. To use something like Docker or a service like Heroku, you'll need to adapt the config for your specific setup.
181
+
182
+
On the server, you need to make sure that the firehose process is always running and is restarted if necessary. One option to do this could be writing a systemd service config file and adding it to `/etc/systemd/system`. You can find an example service file in [`dist/bsky_feeds.service`](dist/bsky_feeds.service).
183
+
184
+
To run the XRPC service, you need an HTTP server (reverse proxy) and a Ruby app server. The choice is up to you and the configuration will depend on your selected config. My recommendation is Nginx with either Passenger (runs your app automatically from Nginx) or something like Puma (needs to be started by e.g. systemd like the firehose). You can find an example of Nginx configuration for Passenger in [`dist/feeds-nginx.conf`](dist/feeds-nginx.conf).
185
+
186
+
187
+
### Publishing the feed
188
189
Once you have the feed deployed to the production server, you can use the `bluesky:publish` Rake task (from the [blue_factory](https://github.com/mackuba/blue_factory) gem) to upload the feed configuration to the Bluesky network.
190
191
You need to make sure that you have configured the feed's metadata in the feed class:
192
193
+
- `display_name` (required) โ the publicly visible name of your feed, e.g. "Star Wars Feed" (should be something short)
194
+
- `description` (optional) โ a longer (~1-2 lines) description of what the feed does, displayed on the feed page as the "bio"
195
+
- `avatar_file` (optional) โ path to an avatar image from the project's root (PNG or JPG)
196
197
When you're ready, run the rake task passing the feed key (you will be asked for the uploader account's password):
198
···
201
```
202
203
204
+
### App maintenance
205
+
206
+
If you're running the app in "save all" mode in production, at some point you will probably need to start cleaning up older posts periodically. You can use this Rake task for this:
207
+
208
+
```
209
+
bundle exec rake cleanup_posts DAYS=30
210
+
```
211
+
212
+
This will delete posts older than 30 days, but only if they aren't assigned to any feed.
213
+
214
+
Another Rake task lets you remove a specific post manually from a feed โ this might be useful e.g. if you notice that something unexpected ๐ has been included in your feed, and you want to quickly delete it from there without having to edit & redeploy the code:
215
+
216
+
```
217
+
bundle exec rake delete_feed_item URL=https://bsky.app/profile/example.com/post/xxx
218
+
```
219
+
220
+
221
## Credits
222
223
+
Copyright ยฉ 2025 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/mackuba.eu)).
224
225
The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT).
226
+4
Rakefile
+4
Rakefile
+3
-6
app/config.rb
+3
-6
app/config.rb
···
1
-
Dir[File.join(__dir__, 'feeds', '*.rb')].each { |f| require(f) }
2
3
-
require 'blue_factory'
4
-
require 'sinatra/activerecord'
5
-
6
-
ActiveRecord::Base.connection.execute "PRAGMA journal_mode = WAL"
7
8
BlueFactory.set :publisher_did, 'did:plc:<your_identifier_here>'
9
BlueFactory.set :hostname, 'feeds.example.com'
···
12
# see Feed#get_posts(params, visitor_did) in app/feeds/feed.rb
13
# BlueFactory.set :enable_unsafe_auth, true
14
15
-
BlueFactory.add_feed 'build', BuildInPublicFeed.new
16
BlueFactory.add_feed 'linux', LinuxFeed.new
17
BlueFactory.add_feed 'starwars', StarWarsFeed.new
···
1
+
require_relative 'init'
2
3
+
Dir[File.join(__dir__, 'feeds', '*.rb')].each { |f| require(f) }
4
5
BlueFactory.set :publisher_did, 'did:plc:<your_identifier_here>'
6
BlueFactory.set :hostname, 'feeds.example.com'
···
9
# see Feed#get_posts(params, visitor_did) in app/feeds/feed.rb
10
# BlueFactory.set :enable_unsafe_auth, true
11
12
+
BlueFactory.add_feed 'kit', KitFeed.new
13
BlueFactory.add_feed 'linux', LinuxFeed.new
14
BlueFactory.add_feed 'starwars', StarWarsFeed.new
-47
app/feeds/build_in_public_feed.rb
-47
app/feeds/build_in_public_feed.rb
···
1
-
require_relative 'feed'
2
-
3
-
class BuildInPublicFeed < Feed
4
-
REGEXPS = [/\bbuild\s?in\s?public\b/i]
5
-
6
-
def feed_id
7
-
3
8
-
end
9
-
10
-
def display_name
11
-
"#buildinpublic"
12
-
end
13
-
14
-
def description
15
-
"Indie hackers and entrepreneurs building things in public - use #buildinpublic hashtag"
16
-
end
17
-
18
-
def post_matches?(post)
19
-
all_text = matched_text(post)
20
-
21
-
REGEXPS.any? { |x| all_text =~ x }
22
-
end
23
-
24
-
def matched_text(post)
25
-
lines = [post.text]
26
-
27
-
if embed = post.record['embed']
28
-
if images = (embed['images'] || embed['media'] && embed['media']['images'])
29
-
lines += images.map { |i| i['alt'] }.compact
30
-
end
31
-
32
-
if link = embed['external']
33
-
lines += [link['uri'], link['title'], link['description']].compact
34
-
end
35
-
end
36
-
37
-
lines.join("\n")
38
-
end
39
-
40
-
def colored_text(t)
41
-
text = t.dup
42
-
43
-
REGEXPS.each { |r| text.gsub!(r) { |s| Rainbow(s).green }}
44
-
45
-
text
46
-
end
47
-
end
···
+5
app/feeds/feed.rb
+5
app/feeds/feed.rb
+50
app/feeds/kit_feed.rb
+50
app/feeds/kit_feed.rb
···
···
1
+
require_relative 'feed'
2
+
3
+
class KitFeed < Feed
4
+
KIT_REGEX = [/\bkit\b/i]
5
+
CAT_REGEX = [/\bcat\b/i, /\bkitty\b/i]
6
+
7
+
PAUL = 'did:plc:ragtjsm2j2vknwkz3zp4oxrd'
8
+
9
+
def feed_id
10
+
3
11
+
end
12
+
13
+
def display_name
14
+
"Kit Feed"
15
+
end
16
+
17
+
def description
18
+
"Photos of Paul's lovely cat Kit ๐ฑ"
19
+
end
20
+
21
+
def avatar_file
22
+
"images/kitkat.jpg"
23
+
end
24
+
25
+
def post_matches?(post)
26
+
return false unless post.repo == PAUL
27
+
28
+
alt = embed_text(post)
29
+
return false if alt.nil?
30
+
31
+
KIT_REGEX.any? { |r| alt =~ r } || (CAT_REGEX.any? { |r| alt =~ r } && KIT_REGEX.any? { |r| post.text =~ r })
32
+
end
33
+
34
+
def embed_text(post)
35
+
if embed = post.record['embed']
36
+
if images = (embed['images'] || embed['media'] && embed['media']['images'])
37
+
images.map { |i| i['alt'] }.compact.join("\n")
38
+
end
39
+
end
40
+
end
41
+
42
+
def colored_text(t)
43
+
text = t.dup
44
+
45
+
KIT_REGEX.each { |r| text.gsub!(r) { |s| Rainbow(s).green }}
46
+
CAT_REGEX.each { |r| text.gsub!(r) { |s| Rainbow(s).bright.orange }}
47
+
48
+
text
49
+
end
50
+
end
+16
-1
app/feeds/linux_feed.rb
+16
-1
app/feeds/linux_feed.rb
···
13
/the linux of/i, /linux (bros|nerds)/i, /ubuntu tv/i
14
]
15
16
MUTED_PROFILES = [
17
'did:plc:35c6qworuvguvwnpjwfq3b5p', # Linux Kernel Releases
18
'did:plc:ppuqidjyabv5iwzeoxt4fq5o', # GitHub Trending JS/TS
···
40
def post_matches?(post)
41
return false if MUTED_PROFILES.include?(post.repo)
42
43
-
REGEXPS.any? { |r| post.text =~ r } && !(EXCLUDE.any? { |r| post.text =~ r })
44
end
45
46
def colored_text(t)
47
text = t.dup
48
49
EXCLUDE.each { |r| text.gsub!(r) { |s| Rainbow(s).red }}
50
REGEXPS.each { |r| text.gsub!(r) { |s| Rainbow(s).green }}
51
52
text
···
13
/the linux of/i, /linux (bros|nerds)/i, /ubuntu tv/i
14
]
15
16
+
LINK_EXCLUDES = [
17
+
/\bamzn\.to\b/i, /\bwww\.amazon\.com\b/i, /\bmercadolivre\.com\b/i,
18
+
]
19
+
20
MUTED_PROFILES = [
21
'did:plc:35c6qworuvguvwnpjwfq3b5p', # Linux Kernel Releases
22
'did:plc:ppuqidjyabv5iwzeoxt4fq5o', # GitHub Trending JS/TS
···
44
def post_matches?(post)
45
return false if MUTED_PROFILES.include?(post.repo)
46
47
+
REGEXPS.any? { |r| post.text =~ r } && !(EXCLUDE.any? { |r| post.text =~ r }) && !has_forbidden_links?(post)
48
+
end
49
+
50
+
def has_forbidden_links?(post)
51
+
if embed = post.record['embed']
52
+
if link = (embed['external'] || embed['media'] && embed['media']['external'])
53
+
return true if LINK_EXCLUDES.any? { |r| r =~ link['uri'] }
54
+
end
55
+
end
56
+
57
+
return false
58
end
59
60
def colored_text(t)
61
text = t.dup
62
63
EXCLUDE.each { |r| text.gsub!(r) { |s| Rainbow(s).red }}
64
+
LINK_EXCLUDES.each { |r| text.gsub!(r) { |s| Rainbow(s).red }}
65
REGEXPS.each { |r| text.gsub!(r) { |s| Rainbow(s).green }}
66
67
text
+134
-21
app/firehose_stream.rb
+134
-21
app/firehose_stream.rb
···
6
require_relative 'models/feed_post'
7
require_relative 'models/post'
8
require_relative 'models/subscription'
9
10
class FirehoseStream
11
-
attr_accessor :show_progress, :log_status, :log_posts, :save_posts, :replay_events
12
13
-
DEFAULT_RELAY = 'bsky.network'
14
15
def initialize(service = nil)
16
@env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
17
-
@service = service || DEFAULT_RELAY
18
19
@show_progress = (@env == :development) ? true : false
20
@log_status = true
···
23
@replay_events = (@env == :development) ? false : true
24
25
@feeds = BlueFactory.all_feeds.select(&:is_updating?)
26
end
27
28
def start
29
return if @sky
30
31
last_cursor = load_or_init_cursor
32
-
cursor = @replay_events ? last_cursor : nil
33
34
-
@sky = sky = Skyfall::Firehose.new(@service, :subscribe_repos, cursor)
35
36
@sky.on_message do |m|
37
process_message(m)
···
39
40
if @log_status
41
@sky.on_connecting { |u| log "Connecting to #{u}..." }
42
@sky.on_connect {
43
@replaying = !!(cursor)
44
log "Connected โ"
45
}
46
@sky.on_disconnect {
47
log "Disconnected."
48
-
save_cursor(sky.cursor)
49
}
50
-
@sky.on_reconnect { log "Connection lost, reconnecting..." }
51
@sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
52
end
53
···
55
end
56
57
def stop
58
@sky&.disconnect
59
@sky = nil
60
end
···
74
75
def process_message(msg)
76
if msg.type == :info
77
-
# AtProto error, the only one right now is "OutdatedCursor"
78
log "InfoMessage: #{msg}"
79
80
elsif msg.type == :identity
81
# use these events if you want to track handle changes:
82
# log "Handle change: #{msg.repo} => #{msg.handle}"
83
84
-
elsif msg.is_a?(Skyfall::Firehose::UnknownMessage)
85
log "Unknown message type: #{msg.type} (#{msg.seq})"
86
end
87
···
92
@replaying = false
93
end
94
95
-
if msg.seq % 10 == 0
96
save_cursor(msg.seq)
97
end
98
···
103
104
when :bsky_like, :bsky_repost
105
# if you want to use the number of likes and/or reposts for filtering or sorting:
106
-
# add a likes/reposts column to feeds, then do +1 / -1 here depending on op.action
107
108
when :bsky_follow
109
# if you want to make a personalized feed that needs info about given user's follows/followers:
···
115
end
116
end
117
118
def process_post(msg, op)
119
if op.action == :delete
120
-
if post = Post.find_by(repo: op.repo, rkey: op.rkey)
121
post.destroy
122
end
123
end
···
143
return
144
end
145
146
-
text = op.raw_record['text']
147
148
# to save space, delete redundant post text and type from the saved data JSON
149
-
trimmed_record = op.raw_record.dup
150
-
trimmed_record.delete('$type')
151
-
trimmed_record.delete('text')
152
-
trimmed_json = JSON.generate(trimmed_record)
153
154
# tip: if you don't need full record data for debugging, delete the data column in posts
155
post = Post.new(
···
158
text: text,
159
rkey: op.rkey,
160
data: trimmed_json,
161
-
record: op.raw_record
162
)
163
164
matched = false
165
166
@feeds.each do |feed|
167
if feed.post_matches?(post)
168
-
FeedPost.create!(feed_id: feed.feed_id, post: post, time: msg.time) unless !@save_posts
169
matched = true
170
end
171
end
···
175
puts text
176
end
177
178
-
post.save! if @save_posts == :all
179
180
print '.' if @show_progress && @log_posts != :all
181
rescue StandardError => e
···
187
end
188
end
189
190
def log(text)
191
puts if @show_progress
192
puts "[#{Time.now}] #{text}"
193
end
194
195
def inspect
196
-
vars = instance_variables - [:@feeds, :@timer]
197
values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
198
"#<#{self.class}:0x#{object_id} #{values}>"
199
end
···
6
require_relative 'models/feed_post'
7
require_relative 'models/post'
8
require_relative 'models/subscription'
9
+
require_relative 'utils'
10
11
class FirehoseStream
12
+
attr_accessor :start_cursor, :show_progress, :log_status, :log_posts, :save_posts, :replay_events
13
14
+
DEFAULT_JETSTREAM = 'jetstream2.us-east.bsky.network'
15
+
POSTS_BATCH_SIZE = 100
16
+
17
+
include Utils
18
19
def initialize(service = nil)
20
@env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
21
+
@service = service || DEFAULT_JETSTREAM
22
23
@show_progress = (@env == :development) ? true : false
24
@log_status = true
···
27
@replay_events = (@env == :development) ? false : true
28
29
@feeds = BlueFactory.all_feeds.select(&:is_updating?)
30
+
@post_queue = []
31
end
32
33
def start
34
return if @sky
35
36
last_cursor = load_or_init_cursor
37
+
cursor = @replay_events ? (@start_cursor || last_cursor) : nil
38
+
39
+
@sky = sky = Skyfall::Jetstream.new(@service, {
40
+
cursor: cursor,
41
+
42
+
# we ask Jetstream to only send us post records, since we don't need anything else
43
+
# if you need to process e.g. likes or follows too, update or remove this param
44
+
wanted_collections: ['app.bsky.feed.post'],
45
+
})
46
47
+
# set your user agent here to identify yourself on the relay
48
+
# @sky.user_agent = "My Feed Server (@my.handle) #{@sky.version_string}"
49
+
50
+
@sky.check_heartbeat = true
51
52
@sky.on_message do |m|
53
process_message(m)
···
55
56
if @log_status
57
@sky.on_connecting { |u| log "Connecting to #{u}..." }
58
+
59
@sky.on_connect {
60
+
@message_counter = 0
61
@replaying = !!(cursor)
62
log "Connected โ"
63
}
64
+
65
@sky.on_disconnect {
66
log "Disconnected."
67
+
}
68
+
69
+
@sky.on_reconnect {
70
+
log "Connection lost, reconnecting..."
71
+
}
72
+
73
+
@sky.on_timeout {
74
+
log "Trying to reconnect..."
75
}
76
+
77
@sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
78
end
79
···
81
end
82
83
def stop
84
+
save_queued_posts
85
+
save_cursor(@sky.cursor) unless @sky.nil?
86
+
87
@sky&.disconnect
88
@sky = nil
89
end
···
103
104
def process_message(msg)
105
if msg.type == :info
106
+
# ATProto error, the only one right now is "OutdatedCursor"
107
log "InfoMessage: #{msg}"
108
109
elsif msg.type == :identity
110
# use these events if you want to track handle changes:
111
# log "Handle change: #{msg.repo} => #{msg.handle}"
112
113
+
elsif msg.type == :account
114
+
# tracking account status changes, e.g. suspensions, deactivations and deletes
115
+
process_account_message(msg)
116
+
117
+
elsif msg.unknown?
118
log "Unknown message type: #{msg.type} (#{msg.seq})"
119
end
120
···
125
@replaying = false
126
end
127
128
+
@message_counter += 1
129
+
130
+
if @message_counter % 100 == 0
131
+
# save current cursor every 100 events
132
save_cursor(msg.seq)
133
end
134
···
139
140
when :bsky_like, :bsky_repost
141
# if you want to use the number of likes and/or reposts for filtering or sorting:
142
+
# add a likes/reposts table, then add/remove records here depending on op.action
143
+
# (you'll need to track like records and not just have a single numeric "likes" field,
144
+
# because delete events only include the uri/rkey of the like, not of the liked post)
145
146
when :bsky_follow
147
# if you want to make a personalized feed that needs info about given user's follows/followers:
···
153
end
154
end
155
156
+
def process_account_message(msg)
157
+
if msg.status == :deleted
158
+
# delete all data we have stored about this account
159
+
FeedPost.joins(:post).where(post: { repo: msg.did }).delete_all
160
+
Post.where(repo: msg.did).delete_all
161
+
end
162
+
end
163
+
164
def process_post(msg, op)
165
if op.action == :delete
166
+
if post = Post.find_by_repo_rkey(op.repo, op.rkey)
167
post.destroy
168
end
169
end
···
189
return
190
end
191
192
+
record = op.raw_record
193
+
text = record['text']
194
195
# to save space, delete redundant post text and type from the saved data JSON
196
+
record.delete('$type')
197
+
record.delete('text')
198
+
trimmed_json = JSON.generate(record)
199
200
# tip: if you don't need full record data for debugging, delete the data column in posts
201
post = Post.new(
···
204
text: text,
205
rkey: op.rkey,
206
data: trimmed_json,
207
+
record: record
208
)
209
210
+
if !post.valid?
211
+
if post.errors.has_key?(:data)
212
+
post.trim_too_long_data
213
+
end
214
+
215
+
if !post.valid?
216
+
log "Error: post is invalid: #{op.uri} (#{msg.seq}): #{post.errors.to_a.join(', ')}"
217
+
return
218
+
end
219
+
end
220
+
221
matched = false
222
223
@feeds.each do |feed|
224
if feed.post_matches?(post)
225
+
post.feed_posts.build(feed_id: feed.feed_id, time: msg.time) unless !@save_posts
226
matched = true
227
end
228
end
···
232
puts text
233
end
234
235
+
if @save_posts == :all
236
+
# wait until we have 100 posts and then save them all in one insert, if possible
237
+
@post_queue << post
238
+
239
+
if @post_queue.length >= POSTS_BATCH_SIZE
240
+
save_queued_posts
241
+
end
242
+
elsif @save_posts == :matching && matched
243
+
# save immediately because matched posts might be rare; we've already checked validations
244
+
post.save!(validate: false)
245
+
end
246
247
print '.' if @show_progress && @log_posts != :all
248
rescue StandardError => e
···
254
end
255
end
256
257
+
def save_queued_posts
258
+
# we can only squash posts into one insert statement if they don't have nested feed_posts
259
+
# so we save those without feed_posts first:
260
+
261
+
matched, unmatched = @post_queue.partition { |x| x.feed_posts.length > 0 }
262
+
263
+
if unmatched.length > 0
264
+
values = unmatched.map { |p| p.attributes.except('id') }
265
+
Post.insert_all(values)
266
+
end
267
+
268
+
@post_queue = matched
269
+
return if @post_queue.empty?
270
+
271
+
# and for those that do have feed_posts, we save them normally, in one transaction:
272
+
273
+
ActiveRecord::Base.transaction do
274
+
@post_queue.each do |p|
275
+
# skip validations since we've checked the posts before adding them to the queue
276
+
p.save!(validate: false)
277
+
end
278
+
end
279
+
280
+
@post_queue = []
281
+
rescue StandardError => e
282
+
# there shouldn't be any ActiveRecord errors raised, but SQLite might find some issues which
283
+
# aren't covered by AR validations; so in that case, try to save any valid posts one by one:
284
+
285
+
@post_queue.each do |p|
286
+
begin
287
+
ActiveRecord::Base.transaction do
288
+
p.save!(validate: false)
289
+
end
290
+
rescue StandardError => e
291
+
log "Error in #save_queued_posts: #{e}"
292
+
293
+
unless e.message == "nesting of 100 is too deep"
294
+
log p.inspect
295
+
log e.backtrace.reject { |x| x.include?('/ruby/') }
296
+
end
297
+
end
298
+
end
299
+
300
+
@post_queue = []
301
+
end
302
+
303
def log(text)
304
puts if @show_progress
305
puts "[#{Time.now}] #{text}"
306
end
307
308
def inspect
309
+
vars = instance_variables - [:@feeds]
310
values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
311
"#<#{self.class}:0x#{object_id} #{values}>"
312
end
+26
app/init.rb
+26
app/init.rb
···
···
1
+
require 'blue_factory'
2
+
require 'sinatra/activerecord'
3
+
4
+
if defined?(RubyVM::YJIT) && RubyVM::YJIT.respond_to?(:enabled?)
5
+
if !RubyVM::YJIT.enabled?
6
+
if RubyVM::YJIT.respond_to?(:enable)
7
+
# Ruby 3.3+
8
+
RubyVM::YJIT.enable
9
+
else
10
+
# Ruby 3.2
11
+
puts "-" * 106
12
+
puts "Note: YJIT is not enabled. To improve performance, enable it by adding an ENV var RUBYOPT=\"--enable-yjit\"."
13
+
puts "-" * 106
14
+
end
15
+
end
16
+
else
17
+
puts "-" * 112
18
+
puts "Note: YJIT is not enabled. To improve performance, it's recommended to " +
19
+
((RUBY_VERSION.to_f >= 3.2) ? "install Ruby with YJIT support turned on." : "update to a newer Ruby with YJIT support.")
20
+
puts "-" * 112
21
+
end
22
+
23
+
ar_logger = ActiveRecord::Base.logger
24
+
ActiveRecord::Base.logger = nil
25
+
ActiveRecord::Base.connection.execute "PRAGMA journal_mode = WAL"
26
+
ActiveRecord::Base.logger = ar_logger
+56
-1
app/models/post.rb
+56
-1
app/models/post.rb
···
1
require 'active_record'
2
require 'json'
3
4
class Post < ActiveRecord::Base
5
validates_presence_of :repo, :time, :data, :rkey
6
validates :text, length: { minimum: 0, allow_nil: false }
7
8
has_many :feed_posts, dependent: :destroy
9
10
attr_writer :record
11
12
def self.find_by_at_uri(uri)
13
parts = uri.gsub(%r(^at://), '').split('/')
14
return nil unless parts.length == 3 && parts[1] == 'app.bsky.feed.post'
15
16
-
Post.find_by(repo: parts[0], rkey: parts[2])
17
end
18
19
def record
···
22
23
def at_uri
24
"at://#{repo}/app.bsky.feed.post/#{rkey}"
25
end
26
end
···
1
require 'active_record'
2
require 'json'
3
4
+
require_relative 'feed_post'
5
+
6
class Post < ActiveRecord::Base
7
validates_presence_of :repo, :time, :data, :rkey
8
validates :text, length: { minimum: 0, allow_nil: false }
9
+
validates_length_of :repo, maximum: 60
10
+
validates_length_of :rkey, maximum: 16
11
+
validates_length_of :text, maximum: 1000
12
+
validates_length_of :data, maximum: 10000
13
14
has_many :feed_posts, dependent: :destroy
15
16
attr_writer :record
17
18
+
def self.find_by_repo_rkey(repo, rkey)
19
+
# the '+' is to make sure that SQLite uses the rkey index and not a different one
20
+
Post.where("+repo = ?", repo).where(rkey: rkey).first
21
+
end
22
+
23
def self.find_by_at_uri(uri)
24
parts = uri.gsub(%r(^at://), '').split('/')
25
return nil unless parts.length == 3 && parts[1] == 'app.bsky.feed.post'
26
27
+
find_by_repo_rkey(parts[0], parts[2])
28
end
29
30
def record
···
33
34
def at_uri
35
"at://#{repo}/app.bsky.feed.post/#{rkey}"
36
+
end
37
+
38
+
def quoted_post_uri
39
+
if embed = record['embed']
40
+
if embed['$type'] == "app.bsky.embed.record"
41
+
return embed['record']['uri']
42
+
elsif embed['$type'] == "app.bsky.embed.recordWithMedia"
43
+
if embed['record']['$type'] == "app.bsky.embed.record"
44
+
return embed['record']['record']['uri']
45
+
end
46
+
end
47
+
end
48
+
49
+
return nil
50
+
end
51
+
52
+
def thread_root_uri
53
+
if root = (record['reply'] && record['reply']['root'])
54
+
root['uri']
55
+
else
56
+
nil
57
+
end
58
+
end
59
+
60
+
def parent_uri
61
+
if parent = (record['reply'] && record['reply']['parent'])
62
+
parent['uri']
63
+
else
64
+
nil
65
+
end
66
+
end
67
+
68
+
def trim_too_long_data
69
+
if embed = record['embed']
70
+
if external = embed['external']
71
+
external['description'] = ''
72
+
end
73
+
end
74
+
75
+
if record['bridgyOriginalText']
76
+
record['bridgyOriginalText'] = ''
77
+
end
78
+
79
+
self.data = JSON.generate(record)
80
end
81
end
+8
app/post_console_printer.rb
+8
app/post_console_printer.rb
···
9
10
def display(post)
11
print Rainbow(post.time).bold + ' * ' + Rainbow(post.id).bold + ' * '
12
+
13
+
langs = post.record['langs']
14
+
if langs.nil?
15
+
print '[nil] * '
16
+
elsif langs != ['en']
17
+
print "[#{langs.join(', ')}] * "
18
+
end
19
+
20
puts Rainbow("https://bsky.app/profile/#{post.repo}/post/#{post.rkey}").darkgray
21
puts
22
puts @feed.colored_text(post.text)
+2
app/server.rb
+2
app/server.rb
+3
-8
app/utils.rb
+3
-8
app/utils.rb
···
1
-
require 'json'
2
-
require 'open-uri'
3
4
module Utils
5
def handle_from_did(did)
6
-
url = "https://plc.directory/#{did}"
7
-
json = JSON.parse(URI.open(url).read)
8
-
json['alsoKnownAs'][0].gsub('at://', '')
9
end
10
11
def did_from_handle(handle)
12
-
url = "https://bsky.social/xrpc/com.atproto.identity.resolveHandle?handle=#{handle}"
13
-
json = JSON.parse(URI.open(url).read)
14
-
json['did']
15
end
16
17
extend self
+18
-3
bin/firehose
+18
-3
bin/firehose
···
7
8
$stdout.sync = true
9
10
-
ActiveRecord::Base.logger = nil
11
12
def print_help
13
puts "Usage: #{$0} [options...]"
···
33
puts " * Replaying missed events: [default: -nr in development, -r in production]"
34
puts " -r = pass a cursor param when connecting to replay any missed events"
35
puts " -nr = don't replay missed events"
36
end
37
38
firehose = FirehoseStream.new(ENV['FIREHOSE'])
···
59
firehose.save_posts = false
60
when '-r'
61
firehose.replay_events = true
62
when '-nr'
63
firehose.replay_events = false
64
when '-h', '--help'
···
72
end
73
74
trap("SIGINT") {
75
firehose.log "Stopping..."
76
-
firehose.stop
77
}
78
79
trap("SIGTERM") {
80
firehose.log "Shutting down the service..."
81
-
firehose.stop
82
}
83
84
firehose.start
···
7
8
$stdout.sync = true
9
10
+
if ENV['ARLOG'] == '1'
11
+
ActiveRecord::Base.logger = Logger.new(STDOUT)
12
+
else
13
+
ActiveRecord::Base.logger = nil
14
+
end
15
16
def print_help
17
puts "Usage: #{$0} [options...]"
···
37
puts " * Replaying missed events: [default: -nr in development, -r in production]"
38
puts " -r = pass a cursor param when connecting to replay any missed events"
39
puts " -nr = don't replay missed events"
40
+
puts " -r12345 = start from this specific cursor"
41
end
42
43
firehose = FirehoseStream.new(ENV['FIREHOSE'])
···
64
firehose.save_posts = false
65
when '-r'
66
firehose.replay_events = true
67
+
when /^\-r(\d+)$/
68
+
firehose.replay_events = true
69
+
firehose.start_cursor = $1.to_i
70
when '-nr'
71
firehose.replay_events = false
72
when '-h', '--help'
···
80
end
81
82
trap("SIGINT") {
83
+
puts
84
firehose.log "Stopping..."
85
+
86
+
EM.add_timer(0) {
87
+
firehose.stop
88
+
}
89
}
90
91
trap("SIGTERM") {
92
firehose.log "Shutting down the service..."
93
+
94
+
EM.add_timer(0) {
95
+
firehose.stop
96
+
}
97
}
98
99
firehose.start
+1
-1
config/database.yml
+1
-1
config/database.yml
+1
-1
config/deploy.rb
+1
-1
config/deploy.rb
+15
db/migrate/20241017135717_add_missing_limits.rb
+15
db/migrate/20241017135717_add_missing_limits.rb
···
···
1
+
class AddMissingLimits < ActiveRecord::Migration[6.1]
2
+
def up
3
+
change_table :posts do |t|
4
+
t.change :repo, :string, limit: 60, null: false
5
+
t.change :rkey, :string, limit: 16, null: false
6
+
end
7
+
end
8
+
9
+
def down
10
+
change_table :posts do |t|
11
+
t.change :repo, :string, null: false
12
+
t.change :rkey, :string, null: false
13
+
end
14
+
end
15
+
end
+5
db/migrate/20241022002658_add_feed_posts_post_index.rb
+5
db/migrate/20241022002658_add_feed_posts_post_index.rb
+4
-3
db/schema.rb
+4
-3
db/schema.rb
···
10
#
11
# It's strongly recommended that you check this file into your version control system.
12
13
-
ActiveRecord::Schema[8.0].define(version: 2024_06_04_130301) do
14
create_table "feed_posts", force: :cascade do |t|
15
t.integer "feed_id", null: false
16
t.integer "post_id", null: false
17
t.datetime "time", precision: nil, null: false
18
t.index ["feed_id", "time"], name: "index_feed_posts_on_feed_id_and_time"
19
end
20
21
create_table "posts", force: :cascade do |t|
22
-
t.string "repo", null: false
23
t.datetime "time", precision: nil, null: false
24
t.string "text", null: false
25
t.text "data", null: false
26
-
t.string "rkey", null: false
27
t.index ["repo", "time"], name: "index_posts_on_repo_and_time"
28
t.index ["rkey"], name: "index_posts_on_rkey"
29
t.index ["time"], name: "index_posts_on_time"
···
10
#
11
# It's strongly recommended that you check this file into your version control system.
12
13
+
ActiveRecord::Schema[8.0].define(version: 2024_10_22_002658) do
14
create_table "feed_posts", force: :cascade do |t|
15
t.integer "feed_id", null: false
16
t.integer "post_id", null: false
17
t.datetime "time", precision: nil, null: false
18
t.index ["feed_id", "time"], name: "index_feed_posts_on_feed_id_and_time"
19
+
t.index ["post_id"], name: "index_feed_posts_on_post_id"
20
end
21
22
create_table "posts", force: :cascade do |t|
23
+
t.string "repo", limit: 60, null: false
24
t.datetime "time", precision: nil, null: false
25
t.string "text", null: false
26
t.text "data", null: false
27
+
t.string "rkey", limit: 16, null: false
28
t.index ["repo", "time"], name: "index_posts_on_repo_and_time"
29
t.index ["rkey"], name: "index_posts_on_rkey"
30
t.index ["time"], name: "index_posts_on_time"
images/kitkat.jpg
images/kitkat.jpg
This is a binary file and will not be displayed.
images/ruby.png
images/ruby.png
This is a binary file and will not be displayed.
+34
-23
lib/tasks/feeds.rake
+34
-23
lib/tasks/feeds.rake
···
119
method = ENV['UNSAFE'] ? :tap : :transaction
120
dry = !!ENV['DRY_RUN']
121
122
ActiveRecord::Base.send(method) do
123
if ENV['ONLY_EXISTING']
124
rescan_feed_items(feed, dry)
···
136
end
137
138
def rescan_feed_items(feed, dry = false)
139
-
feed_posts = FeedPost.where(feed_id: feed.feed_id).includes(:post).to_a
140
total = feed_posts.length
141
142
puts "Processing posts..."
143
144
-
deleted = 0
145
146
feed_posts.each do |fp|
147
if !feed.post_matches?(fp.post)
148
-
if dry
149
-
puts "Post would be deleted: ##{fp.post.id} \"#{fp.post.text}\""
150
-
else
151
puts "Deleting from feed: ##{fp.post.id} \"#{fp.post.text}\""
152
fp.destroy
153
end
154
-
deleted += 1
155
end
156
end
157
158
if dry
159
-
puts "#{deleted} post(s) would be deleted."
160
else
161
-
puts "Done (#{deleted} post(s) deleted)."
162
end
163
end
164
165
def rebuild_feed(feed, days, append_only, dry = false)
166
-
posts = Post.order('time, id')
167
-
start = posts.where("time <= DATETIME('now', '-#{days} days')").last
168
-
stop = posts.last
169
-
first = posts.first
170
-
total = start ? (stop.id - start.id + 1) : (stop.id - first.id + 1)
171
-
172
if append_only
173
feed_posts = FeedPost.where(feed_id: feed.feed_id)
174
current_post_ids = Set.new(feed_posts.pluck('post_id'))
···
182
current_post_ids = []
183
end
184
185
offset = 0
186
page = 100000
187
matched_posts = []
···
196
break if batch.empty?
197
198
batch.each_with_index do |post, i|
199
-
$stderr.print "Processing posts... [#{offset + i + 1}/#{total}]\r"
200
$stderr.flush
201
202
if !current_post_ids.include?(post.id) && feed.post_matches?(post)
203
-
if dry
204
-
matched_posts << post
205
-
else
206
-
FeedPost.create!(feed_id: feed.feed_id, post: post, time: post.time)
207
-
end
208
end
209
end
210
···
214
215
$stderr.puts "Processing posts... Done." + " " * 30
216
217
-
if dry
218
if append_only
219
-
puts "Added posts:"
220
puts "=============================="
221
puts
222
end
···
224
Signal.trap("SIGPIPE", "SYSTEM_DEFAULT")
225
printer = PostConsolePrinter.new(feed)
226
227
-
matched_posts.each do |p|
228
printer.display(p)
229
end
230
···
119
method = ENV['UNSAFE'] ? :tap : :transaction
120
dry = !!ENV['DRY_RUN']
121
122
+
if ENV['ONLY_EXISTING'] && ENV['APPEND_ONLY']
123
+
raise "APPEND_ONLY cannot be used together with ONLY_EXISTING"
124
+
end
125
+
126
ActiveRecord::Base.send(method) do
127
if ENV['ONLY_EXISTING']
128
rescan_feed_items(feed, dry)
···
140
end
141
142
def rescan_feed_items(feed, dry = false)
143
+
feed_posts = FeedPost.where(feed_id: feed.feed_id).includes(:post).order('time DESC').to_a
144
total = feed_posts.length
145
146
puts "Processing posts..."
147
148
+
deleted = []
149
150
feed_posts.each do |fp|
151
if !feed.post_matches?(fp.post)
152
+
if !dry
153
puts "Deleting from feed: ##{fp.post.id} \"#{fp.post.text}\""
154
fp.destroy
155
end
156
+
157
+
deleted << fp.post
158
end
159
end
160
161
if dry
162
+
Signal.trap("SIGPIPE", "SYSTEM_DEFAULT")
163
+
printer = PostConsolePrinter.new(feed)
164
+
165
+
puts
166
+
puts Rainbow("Posts to delete:").red
167
+
puts Rainbow("==============================").red
168
+
puts
169
+
170
+
deleted.each do |p|
171
+
printer.display(p)
172
+
end
173
+
174
+
puts Rainbow("#{deleted.length} post(s) would be deleted.").red
175
else
176
+
puts "Done (#{deleted.length} post(s) deleted)."
177
end
178
end
179
180
def rebuild_feed(feed, days, append_only, dry = false)
181
if append_only
182
feed_posts = FeedPost.where(feed_id: feed.feed_id)
183
current_post_ids = Set.new(feed_posts.pluck('post_id'))
···
191
current_post_ids = []
192
end
193
194
+
puts "Counting posts..."
195
+
posts = Post.order('time, id')
196
+
start = posts.where("time <= DATETIME('now', '-#{days} days')").last
197
+
total = start ? Post.where("time > DATETIME('now', '-#{days} days')").count : Post.count
198
+
199
offset = 0
200
page = 100000
201
matched_posts = []
···
210
break if batch.empty?
211
212
batch.each_with_index do |post, i|
213
+
$stderr.print "Processing posts... [#{offset + i + 1}/#{total}]\r" if i % 100 == 99
214
$stderr.flush
215
216
if !current_post_ids.include?(post.id) && feed.post_matches?(post)
217
+
matched_posts << post
218
+
FeedPost.create!(feed_id: feed.feed_id, post: post, time: post.time) unless dry
219
end
220
end
221
···
225
226
$stderr.puts "Processing posts... Done." + " " * 30
227
228
+
if dry || ENV['VERBOSE']
229
if append_only
230
+
puts (dry ? "Posts to add: " : "Added posts: ") + matched_posts.length.to_s
231
puts "=============================="
232
puts
233
end
···
235
Signal.trap("SIGPIPE", "SYSTEM_DEFAULT")
236
printer = PostConsolePrinter.new(feed)
237
238
+
matched_posts.reverse.each do |p|
239
printer.display(p)
240
end
241
+3
-1
lib/tasks/posts.rake
+3
-1
lib/tasks/posts.rake
···
21
WHERE feed_posts.id IS NULL AND posts.time < DATETIME('now', '-#{days} days')
22
}
23
24
+
time_start = Time.now
25
result = Post.where("id IN (#{subquery})").delete_all
26
+
time_end = Time.now
27
28
+
puts "#{time_end}: Deleted #{result} posts older than #{time_limit} in #{(time_end - time_start)}s"
29
end