Template of a custom feed generator service for the Bluesky network in Ruby

Compare changes

Choose any two refs to compare.

+1
.gitignore
··· 1 db/*.sqlite3* 2 log
··· 1 + .DS_Store 2 db/*.sqlite3* 3 log
+15 -5
Gemfile
··· 1 source "https://rubygems.org" 2 3 - gem 'blue_factory', '~> 0.1.2' 4 - gem 'skyfall', '~> 0.1.2' 5 6 - gem 'activerecord', '~> 6.0' 7 gem 'sinatra-activerecord', '~> 2.0' 8 - gem 'sqlite3' 9 gem 'rake' 10 11 group :development do 12 - gem 'webrick' 13 gem 'capistrano', '~> 2.0' 14 end
··· 1 source "https://rubygems.org" 2 3 + gem 'blue_factory', '~> 0.1.6' 4 + gem 'skyfall', '~> 0.6' 5 + gem 'didkit', '~> 0.2' 6 7 + gem 'activerecord', '~> 8.0' 8 gem 'sinatra-activerecord', '~> 2.0' 9 + gem 'sqlite3', '~> 2.5' 10 gem 'rake' 11 + gem 'rainbow' 12 + gem 'irb' 13 14 group :development do 15 + gem 'debug' 16 + gem 'thin' 17 gem 'capistrano', '~> 2.0' 18 + 19 + # for net-ssh & capistrano 20 + gem 'ed25519', '>= 1.2', '< 2.0' 21 + gem 'bcrypt_pbkdf', '>= 1.0', '< 2.0' 22 end 23 + 24 + gem 'ostruct' # for rake, to remove when rake is updated
+106 -44
Gemfile.lock
··· 1 GEM 2 remote: https://rubygems.org/ 3 specs: 4 - activemodel (6.1.7.3) 5 - activesupport (= 6.1.7.3) 6 - activerecord (6.1.7.3) 7 - activemodel (= 6.1.7.3) 8 - activesupport (= 6.1.7.3) 9 - activesupport (6.1.7.3) 10 - concurrent-ruby (~> 1.0, >= 1.0.2) 11 i18n (>= 1.6, < 2) 12 minitest (>= 5.1) 13 - tzinfo (~> 2.0) 14 - zeitwerk (~> 2.3) 15 base32 (0.3.4) 16 - blue_factory (0.1.2) 17 sinatra (~> 3.0) 18 - capistrano (2.15.10) 19 highline 20 net-scp (>= 1.0.0) 21 net-sftp (>= 2.0.0) 22 net-ssh (>= 2.0.14) 23 net-ssh-gateway (>= 1.1.0) 24 - cbor (0.5.9.6) 25 - concurrent-ruby (1.2.2) 26 - event_emitter (0.2.6) 27 - highline (2.1.0) 28 - i18n (1.14.1) 29 concurrent-ruby (~> 1.0) 30 - minitest (5.18.0) 31 - mustermann (3.0.0) 32 ruby2_keywords (~> 0.0.1) 33 - net-scp (4.0.0) 34 net-ssh (>= 2.6.5, < 8.0.0) 35 net-sftp (4.0.0) 36 net-ssh (>= 5.0.0, < 8.0.0) 37 - net-ssh (7.1.0) 38 net-ssh-gateway (2.0.0) 39 net-ssh (>= 4.0.0) 40 - rack (2.2.7) 41 - rack-protection (3.0.6) 42 - rack 43 - rake (13.0.6) 44 ruby2_keywords (0.0.5) 45 - sinatra (3.0.6) 46 mustermann (~> 3.0) 47 rack (~> 2.2, >= 2.2.4) 48 - rack-protection (= 3.0.6) 49 tilt (~> 2.0) 50 - sinatra-activerecord (2.0.26) 51 activerecord (>= 4.1) 52 sinatra (>= 1.0) 53 - skyfall (0.1.2) 54 base32 (~> 0.3, >= 0.3.4) 55 cbor (~> 0.5, >= 0.5.9.6) 56 - websocket-client-simple (~> 0.6, >= 0.6.1) 57 - sqlite3 (1.6.3-arm64-darwin) 58 - sqlite3 (1.6.3-x86_64-linux) 59 - tilt (2.2.0) 60 tzinfo (2.0.6) 61 concurrent-ruby (~> 1.0) 62 - webrick (1.8.1) 63 - websocket (1.2.9) 64 - websocket-client-simple (0.6.1) 65 - event_emitter 66 - websocket 67 - zeitwerk (2.6.8) 68 69 PLATFORMS 70 arm64-darwin 71 x86_64-linux 72 73 DEPENDENCIES 74 - activerecord (~> 6.0) 75 - blue_factory (~> 0.1.2) 76 capistrano (~> 2.0) 77 rake 78 sinatra-activerecord (~> 2.0) 79 - skyfall (~> 0.1.2) 80 - sqlite3 81 - webrick 82 83 BUNDLED WITH 84 - 2.4.14
··· 1 GEM 2 remote: https://rubygems.org/ 3 specs: 4 + activemodel (8.0.2) 5 + activesupport (= 8.0.2) 6 + activerecord (8.0.2) 7 + activemodel (= 8.0.2) 8 + activesupport (= 8.0.2) 9 + timeout (>= 0.4.0) 10 + activesupport (8.0.2) 11 + base64 12 + benchmark (>= 0.3) 13 + bigdecimal 14 + concurrent-ruby (~> 1.0, >= 1.3.1) 15 + connection_pool (>= 2.2.5) 16 + drb 17 i18n (>= 1.6, < 2) 18 + logger (>= 1.4.2) 19 minitest (>= 5.1) 20 + securerandom (>= 0.3) 21 + tzinfo (~> 2.0, >= 2.0.5) 22 + uri (>= 0.13.1) 23 base32 (0.3.4) 24 + base64 (0.3.0) 25 + bcrypt_pbkdf (1.1.1) 26 + benchmark (0.4.1) 27 + bigdecimal (3.2.2) 28 + blue_factory (0.1.6) 29 sinatra (~> 3.0) 30 + capistrano (2.15.11) 31 highline 32 net-scp (>= 1.0.0) 33 net-sftp (>= 2.0.0) 34 net-ssh (>= 2.0.14) 35 net-ssh-gateway (>= 1.1.0) 36 + cbor (0.5.9.8) 37 + concurrent-ruby (1.3.5) 38 + connection_pool (2.5.3) 39 + daemons (1.4.1) 40 + date (3.4.1) 41 + debug (1.11.0) 42 + irb (~> 1.10) 43 + reline (>= 0.3.8) 44 + didkit (0.2.3) 45 + drb (2.2.3) 46 + ed25519 (1.4.0) 47 + erb (5.0.2) 48 + eventmachine (1.2.7) 49 + faye-websocket (0.12.0) 50 + eventmachine (>= 0.12.0) 51 + websocket-driver (>= 0.8.0) 52 + highline (3.1.2) 53 + reline 54 + i18n (1.14.7) 55 concurrent-ruby (~> 1.0) 56 + io-console (0.8.1) 57 + irb (1.15.2) 58 + pp (>= 0.6.0) 59 + rdoc (>= 4.0.0) 60 + reline (>= 0.4.2) 61 + logger (1.7.0) 62 + minitest (5.25.5) 63 + mustermann (3.0.3) 64 ruby2_keywords (~> 0.0.1) 65 + net-scp (4.1.0) 66 net-ssh (>= 2.6.5, < 8.0.0) 67 net-sftp (4.0.0) 68 net-ssh (>= 5.0.0, < 8.0.0) 69 + net-ssh (7.3.0) 70 net-ssh-gateway (2.0.0) 71 net-ssh (>= 4.0.0) 72 + ostruct (0.6.2) 73 + pp (0.6.2) 74 + prettyprint 75 + prettyprint (0.2.0) 76 + psych (5.2.6) 77 + date 78 + stringio 79 + rack (2.2.17) 80 + rack-protection (3.2.0) 81 + base64 (>= 0.1.0) 82 + rack (~> 2.2, >= 2.2.4) 83 + rainbow (3.1.1) 84 + rake (13.3.0) 85 + rdoc (6.14.2) 86 + erb 87 + psych (>= 4.0.0) 88 + reline (0.6.1) 89 + io-console (~> 0.5) 90 ruby2_keywords (0.0.5) 91 + securerandom (0.4.1) 92 + sinatra (3.2.0) 93 mustermann (~> 3.0) 94 rack (~> 2.2, >= 2.2.4) 95 + rack-protection (= 3.2.0) 96 tilt (~> 2.0) 97 + sinatra-activerecord (2.0.28) 98 activerecord (>= 4.1) 99 sinatra (>= 1.0) 100 + skyfall (0.6.0) 101 base32 (~> 0.3, >= 0.3.4) 102 + base64 (~> 0.1) 103 cbor (~> 0.5, >= 0.5.9.6) 104 + eventmachine (~> 1.2, >= 1.2.7) 105 + faye-websocket (~> 0.12) 106 + sqlite3 (2.7.2-arm64-darwin) 107 + sqlite3 (2.7.2-x86_64-linux-gnu) 108 + stringio (3.1.7) 109 + thin (2.0.1) 110 + daemons (~> 1.0, >= 1.0.9) 111 + eventmachine (~> 1.0, >= 1.0.4) 112 + logger 113 + rack (>= 1, < 4) 114 + tilt (2.6.1) 115 + timeout (0.4.3) 116 tzinfo (2.0.6) 117 concurrent-ruby (~> 1.0) 118 + uri (1.0.3) 119 + websocket-driver (0.8.0) 120 + base64 121 + websocket-extensions (>= 0.1.0) 122 + websocket-extensions (0.1.5) 123 124 PLATFORMS 125 arm64-darwin 126 x86_64-linux 127 128 DEPENDENCIES 129 + activerecord (~> 8.0) 130 + bcrypt_pbkdf (>= 1.0, < 2.0) 131 + blue_factory (~> 0.1.6) 132 capistrano (~> 2.0) 133 + debug 134 + didkit (~> 0.2) 135 + ed25519 (>= 1.2, < 2.0) 136 + irb 137 + ostruct 138 + rainbow 139 rake 140 sinatra-activerecord (~> 2.0) 141 + skyfall (~> 0.6) 142 + sqlite3 (~> 2.5) 143 + thin 144 145 BUNDLED WITH 146 + 2.6.9
+1 -1
LICENSE.txt
··· 1 The zlib License 2 3 - Copyright (c) 2023 Jakub Suder 4 5 This software is provided 'as-is', without any express or implied 6 warranty. In no event will the authors be held liable for any damages
··· 1 The zlib License 2 3 + Copyright (c) 2025 Jakub Suder 4 5 This software is provided 'as-is', without any express or implied 6 warranty. In no event will the authors be held liable for any damages
+159 -22
README.md
··· 1 - <h1>Bluesky feeds in Ruby <img src="https://github.com/mackuba/bluesky-feeds-rb/assets/28465/81159f5a-82f6-4520-82c1-434057905a2c" style="width: 28px; margin-left: 5px; position: relative; top: 1px;"></h1> 2 3 - This repo is an example or template that you can use to create a "feed generator" service for the Bluesky social network that hosts custom feeds. It's a reimplementation of the official TypeScript [feed-generator](https://github.com/bluesky-social/feed-generator) example in Ruby. 4 5 6 - ## How do feed generators work 7 8 - **\#TODO** - please read the README of the official [feed-generator](https://github.com/bluesky-social/feed-generator) project. 9 10 11 ## Architecture of the app 12 13 The project can be divided into three major parts: 14 15 - 1. The "input" part, which subscribes to the firehose stream on the Bluesky server, reads and processes all incoming messages, and saves relevant posts and any other data to a local database. 16 2. The "output" part, which makes the list of posts available as a feed server that implements the required "feed generator" endpoints. 17 - 3. Everything else in the middle - the database, the models and feed classes. 18 19 - The first two parts were mostly abstracted away in the forms of two Ruby gems, namely [skyfall](https://github.com/mackuba/skyfall) for connecting to the firehose and [blue_factory](https://github.com/mackuba/blue_factory) for hosting the feed generator interface. The part in the middle is mostly up to you, since it depends greatly on what exactly you want to achieve (what kind of feed algorithms to implement, what data you need to keep, what database to use and so on) - but you can use this project as a good starting point. 20 21 - See the repositories of these two projects for more info on what they implement and how you can configure and use them. 22 23 24 ## Setting up 25 26 - First, you need to set up the database. By default, the app is configured to use SQLite and to create database files in `db` directory. If you want to use e.g. MySQL or PostgreSQL, you need to add a different database adapter gem to the [`Gemfile`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/Gemfile) and change the configuration in [`config/database.yml`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/config/database.yml). 27 28 To create the database, run the migrations: 29 ··· 31 bundle exec rake db:migrate 32 ``` 33 34 - The feed configuration is done in [`app/config.rb`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/app/config.rb). You need to set there: 35 36 - the DID identifier of the publisher (your account) 37 - the hostname on which the service will be running 38 39 - Next, you need to create some feed classes in [`app/feeds`](https://github.com/mackuba/bluesky-feeds-rb/tree/master/app/feeds). See the included feeds like [`StarWarsFeed`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/app/feeds/star_wars_feed.rb) as an example. The [`Feed`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/app/feeds/feed.rb) superclass provides a `#get_posts` implementation which loads the posts in a feed in response to a request and passes the URIs to the server. 40 41 Once you have the feeds prepared, configure them in `app/config.rb`: 42 ··· 44 BlueFactory.add_feed 'starwars', StarWarsFeed.new 45 ``` 46 47 48 - ### Running in development 49 50 - To run the firehose stream, use the [`firehose.rb`](https://github.com/mackuba/bluesky-feeds-rb/tree/master/firehose.rb) script. By default, it will save all posts to the database and print progress dots for every saved post, and will print the text of each post that matches any feed's conditions. See the options in the file to change this. 51 52 - In another terminal window, use the [`server.rb`](https://github.com/mackuba/bluesky-feeds-rb/tree/master/server.rb) script to run the server. It should respond to such requests: 53 54 ``` 55 curl -i http://localhost:3000/.well-known/did.json ··· 57 curl -i http://localhost:3000/xrpc/app.bsky.feed.getFeedSkeleton?feed=at://did:plc:.../app.bsky.feed.generator/starwars 58 ``` 59 60 - ### Running in production 61 62 - First, you need to make sure that the firehose script is always running and is restarted if necessary. One option to do this could be writing a `systemd` service config file and adding it to `/etc/systemd/system`. You can find an example service file in [`dist/bsky_feeds.service`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/dist/bsky_feeds.service). 63 64 - To run the server part, you need an HTTP server and a Ruby app server. The choice is up to you and the configuration will depend on your selected config. My recommendation is Nginx with either Passenger (runs your app automatically from Nginx) or something like Puma (needs to be started by e.g. `systemd` like the firehose). You can find an example of Nginx configuration for Passenger in [`dist/feeds-nginx.conf`](https://github.com/mackuba/bluesky-feeds-rb/blob/master/dist/feeds-nginx.conf). 65 66 67 - ## Publishing the feed 68 69 Once you have the feed deployed to the production server, you can use the `bluesky:publish` Rake task (from the [blue_factory](https://github.com/mackuba/blue_factory) gem) to upload the feed configuration to the Bluesky network. 70 71 You need to make sure that you have configured the feed's metadata in the feed class: 72 73 - - `display_name` (required) - the publicly visible name of your feed, e.g. "Star Wars Feed" (should be something short) 74 - - `description` (optional) - a longer (~1-2 lines) description of what the feed does, displayed on the feed page as the "bio" 75 - - `avatar_file` (optional) - path to an avatar image from the project's root (PNG or JPG) 76 77 When you're ready, run the rake task passing the feed key (you will be asked for the uploader account's password): 78 ··· 81 ``` 82 83 84 ## Credits 85 86 - Copyright ยฉ 2023 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/mackuba.eu)). 87 88 The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT). 89
··· 1 + <h1>Bluesky feeds in Ruby &nbsp;<img src="https://raw.githubusercontent.com/mackuba/bluesky-feeds-rb/refs/heads/master/images/ruby.png" width="26"></h1> 2 + 3 + This repo is an example or template that you can use to create a "feed generator" service for the Bluesky social network which hosts custom feeds. It's a reimplementation of the official TypeScript [feed-generator](https://github.com/bluesky-social/feed-generator) example in Ruby. 4 + 5 + This app is extracted from my personal feed service app running on [blue.mackuba.eu](https://blue.mackuba.eu) which hosts all my custom feeds. My own project has the exact same structure, it just has more feeds, models and stuff in it (and I recently migrated it to Postgres). 6 7 + 8 + ## How feed generators work 9 + 10 + This is well explained on the Bluesky documentation site, in the section "[Custom Feeds](https://docs.bsky.app/docs/starter-templates/custom-feeds)", and in the readme of the official TypeScript [feed-generator](https://github.com/bluesky-social/feed-generator) project. 11 + 12 + The gist is this: 13 14 + - you (feed operator) run a service on your server, which implements a few specific XRPC endpoints 15 + - a feed record is uploaded to your account, including metadata and location of the feed generator service 16 + - when the user wants to load the feed, the AppView makes a request to your service on their behalf 17 + - your service looks at the request params, and returns a list of posts it selected in the form of at:// URIs 18 + - the AppView takes those URIs and maps them to full posts, which it returns to the user's app 19 20 + How exactly those posts are selected to be returned in the given request is completely up to you, the only requirement is that these are posts that the AppView will have in its database, since you only send URIs, not actual post data. In most cases, these will be "X latest posts matching some condition". In the request, you get the URI of the specific feed (there can be, and usually is, more than one on the service), `limit`, `cursor`, and an authentication token from which you can extract the DID of the calling user (in case the feed is a personalized one). 21 22 + It's not a strict requirement, but in order to be able to pick and return those post URIs, in almost all cases the feed service also needs to have a separate component that streams posts from the relay "firehose" and saves some or all of them to a local database. 23 24 25 ## Architecture of the app 26 27 The project can be divided into three major parts: 28 29 + 1. The "input" part, which subscribes to the firehose stream on the Bluesky relay, reads and processes all incoming messages, and saves relevant posts and any other data to a local database. 30 2. The "output" part, which makes the list of posts available as a feed server that implements the required "feed generator" endpoints. 31 + 3. Everything else in the middle โ€“ the database, the models and feed classes. 32 + 33 + The first two parts were mostly abstracted away in the forms of two Ruby gems, namely [skyfall](https://github.com/mackuba/skyfall) for connecting to the firehose and [blue_factory](https://github.com/mackuba/blue_factory) for hosting the feed generator interface. See the repositories of these two projects for more info on what they implement and how you can configure and use them. 34 + 35 + The part in the middle is mostly up to you, since it depends greatly on what exactly you want to achieve (what kind of feed algorithms to implement, what data you need to keep, what database to use and so on) โ€“ but you can use this project as a good starting point. 36 + 37 + (The rest of the readme assumes you know Ruby to some degree and are at least somewhat familiar with ActiveRecord.) 38 + 39 + 40 + ### Feeds 41 + 42 + The Bluesky API allows you to run feeds using basically any algorithm you want, and there are several main categories of feeds: chronological feeds based on keywords or some simple conditions, "top of the week" feeds sorted by number of likes, or Discover-like feeds with a personalized algorithm and a random-ish order. 43 + 44 + The [blue_factory](https://github.com/mackuba/blue_factory) gem used here should allow you to build any kind of feed you want; its main API is a `get_posts` method that you implement in a feed class, which gets request params and returns an array of hashes with post URIs. The decision on how to pick these URIs is up to you. 45 + 46 + However, the sample code in this project is mostly targeted at the most common type of feeds, the keyword-based chronological ones. It defines a base feed class [Feed](app/feeds/feed.rb), which includes an implementation of `get_posts` that loads post records from the database, where they have been already assigned earlier to the given feed, so the request handling involves only a very simple query filtered by feed ID. The subclasses of `Feed` provide their versions of a `post_matches?` method, which is used by the firehose client process to determine where a post should be added. 47 + 48 + If you want to implement a different kind of feed, e.g. a Following-style feed like my "[Follows & Replies](https://bsky.app/profile/did:plc:oio4hkxaop4ao4wz2pp3f4cr/feed/follows-replies)", that should also be possible with this architecture, but you need to implement a custom version of `get_posts` in a given feed class that does some more complex queries. 49 + 50 + The feed classes also include a set of simple getter methods that return metadata about the given feed, like name, description, avatar etc. 51 + 52 + 53 + ### Database & models 54 + 55 + By default, the app is configured to use SQLite and to create database files in `db` directory. Using MySQL or PostgreSQL should also be possible with some minor changes in the code (I've tried both) โ€“ but SQLite has been working mostly fine for me in production with a database as large as 200+ GB (>200 mln post records). The important thing here is that there's only one "writer" (the firehose process), and the Sinatra server process(es) only read data, so you don't normally run into concurrent write issues, unless you add different unrelated features. 56 + 57 + There are two main tables/models: [Post](app/models/post.rb) and [FeedPost](app/models/feed_post.rb). `Post` stores post records as received from the Bluesky relay โ€“ the DID of the author, "rkey" of the post record, the post text, and other data as JSON. 58 + 59 + `FeedPost` records link specific posts into specific feeds. When a post is saved, the post instance is passed to all configured feed classes, and each of them checks (via `post_matches?`) if the post matches the feed's keywords and if it should be included in that feed. In such case, a matching `FeedPost` is also created. `feed_posts` is kind of like a many-to-many join table, except there is no `feeds` table, it's sort of virtual (feeds are defined in code). `FeedPost` records have a `post_id` referencing a `Post`, and a `feed_id` with the feed number, which is defined in subclasses of the `Feed` class (which is *not* an AR model). Each `Feed` class has one different `feed_id` assigned in code. 60 + 61 + The app can be configured to either save every single post, with only some of them having `FeedPost` records referencing them, or to save only the posts which have been added to at least one feed. The mode is selected by using the options `-da` or `-dm` respectively in [`bin/firehose`](bin/firehose). By default, the app uses `-da` (all) mode in development and the `-dm` (matching) mode in production. 62 + 63 + Saving all posts allows you to rescan posts when you make changes to a feed and include older posts that were skipped before, but at the cost of storing orders of magnitude more data (around 4 mln posts daily as of July 2025). Saving only matching posts keeps the database much more compact and manageable, but without the ability to re-check missed older posts (or to build feeds using different algorithms than plain keyword matching, e.g. Following-style feeds). 64 + 65 + There is an additional `subscriptions` table, which stores the most recent cursor for the relay you're connecting to. This is used when reconnecting after network connection issues or downtime, so you can catch up the missed events added in the meantime since last known position. 66 + 67 + 68 + ### Firehose client 69 + 70 + The firehose client service, using the [skyfall](https://github.com/mackuba/skyfall) gem, is implemented in [`app/firehose_stream.rb`](app/firehose_stream.rb). Skyfall handles things like connecting to the websocket and parsing the returned messages; what you need is mostly to provide lifecycle callbacks (which mostly print logs), and to handle the incoming messages by checking the message type and included data. 71 + 72 + The most important message type is `:commit`, which includes record operations. For those messages, we check the record type and process the record accordingly โ€“ in this case, we're only really looking at post records (`:bsky_post`). For "delete" events we find and delete a `Post` record, for "create" events we build one from the provided data, then pass it through all configured feeds to see if it matches any, then optionally create `FeedPost` references. 73 + 74 + All processing here is done inline, single-threaded, within the event processing loop. This should be [more than fine](https://journal.mackuba.eu/2025/06/24/firehose-go-brrr/) in practice even with firehose traffic several times bigger than it is now, as long as you aren't doing (a lot of) network requests within the loop. This could be expanded into a multi-process setup with a Redis queue and multiple workers, but right now there's no need for that. 75 + 76 77 + ### XRPC Server 78 79 + The server implementation is technically in the [blue_factory](https://github.com/mackuba/blue_factory) gem. It's based on [Sinatra](https://sinatrarb.com), and the Sinatra class implementing the 3 required endpoints is included there in [`server.rb`](https://github.com/mackuba/blue_factory/blob/master/lib/blue_factory/server.rb) and can be used as is. It's configured using static methods on the `BlueFactory` module, which is done in [`app/config.rb`](app/config.rb) here. 80 + 81 + As an optional thing, the [`app/server.rb`](app/server.rb) file includes some slightly convoluted code that lets you run a block of code in the context of the Sinatra server class, where you can use any normal [Sinatra APIs](https://sinatrarb.com/intro.html) to define additional routes, helpers, change Sinatra configuration, and so on. The example there adds a root `/` endpoint, which returns simple HTML listing available feeds. 82 83 84 ## Setting up 85 86 + This app should run on any somewhat recent version of Ruby, but of course it's recommended to run one that's still maintained, ideally the latest one. It's also recommended to install it with [YJIT support](https://www.leemeichin.com/posts/ruby-32-jit.html), and on Linux also with [jemalloc](https://scalingo.com/blog/improve-ruby-application-memory-jemalloc). 87 + 88 + First, you need to install the dependencies of course: 89 + 90 + ``` 91 + bundle install 92 + ``` 93 + 94 + Next, set up the SQLite database. If you want to use e.g. MySQL or PostgreSQL, you need to add a different database adapter gem to the [`Gemfile`](./Gemfile) and change the configuration in [`config/database.yml`](config/database.yml). 95 96 To create the database, run the migrations: 97 ··· 99 bundle exec rake db:migrate 100 ``` 101 102 + 103 + ### Configuration 104 + 105 + The feed configuration is done in [`app/config.rb`](app/config.rb). You need to set there: 106 107 - the DID identifier of the publisher (your account) 108 - the hostname on which the service will be running 109 110 + Next, you need to create some feed classes in [`app/feeds`](app/feeds). See the included feeds like [StarWarsFeed](app/feeds/star_wars_feed.rb) as an example. 111 112 Once you have the feeds prepared, configure them in `app/config.rb`: 113 ··· 115 BlueFactory.add_feed 'starwars', StarWarsFeed.new 116 ``` 117 118 + The first argument is the "rkey" which will be visible at the end of the feed URL. 119 + 120 + If you want to implement some kind of authentication or personalization in your feeds, uncomment the `:enable_unsafe_auth` line in the `config.rb`, and see the commented out alternative implementation of `get_posts` in [`app/feeds/feed.rb`](app/feeds/feed.rb#L68-L99). 121 + 122 + (Note: as the "unsafe" part in the name implies, this does not currently fully validate the user tokens โ€“ see the "[Authentication](https://github.com/mackuba/blue_factory#authentication)" section in the `blue_factory` readme for more info.) 123 + 124 + 125 + ## Running in development 126 + 127 + The app uses two separate processes, for the firehose stream client, and for the XRPC server that handles incoming feed requests. 128 129 + To run the firehose client, use the [`bin/firehose`](bin/firehose) script. By default, it will save all posts to the database and print progress dots for every saved post, and will print the text of each post that matches any feed's conditions. See the options in the file or in `--help` output to change this. 130 131 + The app uses one of Bluesky's official [Jetstream](https://github.com/bluesky-social/jetstream) servers as the source by default. If you want to use a different Jetstream server, edit `DEFAULT_JETSTREAM` in [`app/firehose_stream.rb`](app/firehose_stream.rb#L14), or pass a `FIREHOSE=...` env variable on the command line. You can also use a full ATProto relay instead โ€“ in that case you will also need to replace the [initializer in the `start` method](app/firehose_stream.rb#L39-L45). 132 133 + In another terminal window, use the [`bin/server`](bin/server) script to run the server. It should respond to such requests: 134 135 ``` 136 curl -i http://localhost:3000/.well-known/did.json ··· 138 curl -i http://localhost:3000/xrpc/app.bsky.feed.getFeedSkeleton?feed=at://did:plc:.../app.bsky.feed.generator/starwars 139 ``` 140 141 + ### Useful Rake tasks 142 + 143 + While working on feeds, you may find these two Rake tasks useful: 144 + 145 + ``` 146 + bundle exec rake print_feed KEY=starwars | less -r 147 + ``` 148 149 + This task prints the posts included in a feed in a readable format, reverse-chronologically. Optionally, add e.g. `N=500` to include more entries (default is 100). 150 151 + ``` 152 + bundle exec rake rebuild_feed ... 153 + ``` 154 155 + This task rescans the posts in the database after you edit some feed code, and adds/removes posts to the feed if they now match / no longer match. It has three main modes: 156 157 + ``` 158 + bundle exec rake rebuild_feed KEY=starwars DAYS=7 159 + ``` 160 + 161 + - removes all current posts from the feed, scans the given number of days back and re-adds matching posts to the feed 162 + 163 + ``` 164 + bundle exec rake rebuild_feed KEY=starwars DAYS=7 APPEND_ONLY=1 165 + ``` 166 + 167 + - scans the given number of days back and adds additional matching posts to the feed, but without touching existing posts in the feed 168 + 169 + ``` 170 + bundle exec rake rebuild_feed KEY=starwars ONLY_EXISTING=1 171 + ``` 172 + 173 + - quickly checks only posts included currently in the feed, and removes them if needed 174 + 175 + There are also `DRY_RUN`, `VERBOSE` and `UNSAFE` env options, see [`feeds.rake`](lib/tasks/feeds.rake) for more info. 176 + 177 + 178 + ## Running in production 179 + 180 + In my Ruby projects I'm using the classic [Capistrano](https://capistranorb.com) tool to deploy to production servers (and in the ancient 2.x version, since I still haven't found the time to migrate my setup scripts to 3.xโ€ฆ). There is a sample [`deploy.rb`](config/deploy.rb) config file included in the `config` directory. To use something like Docker or a service like Heroku, you'll need to adapt the config for your specific setup. 181 + 182 + On the server, you need to make sure that the firehose process is always running and is restarted if necessary. One option to do this could be writing a systemd service config file and adding it to `/etc/systemd/system`. You can find an example service file in [`dist/bsky_feeds.service`](dist/bsky_feeds.service). 183 + 184 + To run the XRPC service, you need an HTTP server (reverse proxy) and a Ruby app server. The choice is up to you and the configuration will depend on your selected config. My recommendation is Nginx with either Passenger (runs your app automatically from Nginx) or something like Puma (needs to be started by e.g. systemd like the firehose). You can find an example of Nginx configuration for Passenger in [`dist/feeds-nginx.conf`](dist/feeds-nginx.conf). 185 + 186 + 187 + ### Publishing the feed 188 189 Once you have the feed deployed to the production server, you can use the `bluesky:publish` Rake task (from the [blue_factory](https://github.com/mackuba/blue_factory) gem) to upload the feed configuration to the Bluesky network. 190 191 You need to make sure that you have configured the feed's metadata in the feed class: 192 193 + - `display_name` (required) โ€“ the publicly visible name of your feed, e.g. "Star Wars Feed" (should be something short) 194 + - `description` (optional) โ€“ a longer (~1-2 lines) description of what the feed does, displayed on the feed page as the "bio" 195 + - `avatar_file` (optional) โ€“ path to an avatar image from the project's root (PNG or JPG) 196 197 When you're ready, run the rake task passing the feed key (you will be asked for the uploader account's password): 198 ··· 201 ``` 202 203 204 + ### App maintenance 205 + 206 + If you're running the app in "save all" mode in production, at some point you will probably need to start cleaning up older posts periodically. You can use this Rake task for this: 207 + 208 + ``` 209 + bundle exec rake cleanup_posts DAYS=30 210 + ``` 211 + 212 + This will delete posts older than 30 days, but only if they aren't assigned to any feed. 213 + 214 + Another Rake task lets you remove a specific post manually from a feed โ€“ this might be useful e.g. if you notice that something unexpected ๐Ÿ† has been included in your feed, and you want to quickly delete it from there without having to edit & redeploy the code: 215 + 216 + ``` 217 + bundle exec rake delete_feed_item URL=https://bsky.app/profile/example.com/post/xxx 218 + ``` 219 + 220 + 221 ## Credits 222 223 + Copyright ยฉ 2025 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/mackuba.eu)). 224 225 The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT). 226
+3 -55
Rakefile
··· 1 require 'bundler/setup' 2 - 3 require 'blue_factory/rake' 4 require 'sinatra/activerecord' 5 require 'sinatra/activerecord/rake' 6 7 - require_relative 'app/config' 8 - 9 - def get_feed 10 - if ENV['KEY'].to_s == '' 11 - puts "Please specify feed key as KEY=feedname (the part of the feed's at:// URI after the last slash)" 12 - exit 1 13 - end 14 - 15 - feed_key = ENV['KEY'] 16 - feed = BlueFactory.get_feed(feed_key) 17 - 18 - if feed.nil? 19 - puts "No feed configured for key '#{feed_key}' - use `BlueFactory.add_feed '#{feed_key}', MyFeed.new`" 20 - exit 1 21 - end 22 - 23 - feed 24 - end 25 - 26 - desc "Print posts in the feed, starting from the newest ones (limit = N)" 27 - task :print_feed do 28 - feed = get_feed 29 - limit = ENV['N'] ? ENV['N'].to_i : 100 30 - 31 - posts = FeedPost.where(feed_id: feed.feed_id).joins(:post).order('feed_posts.time DESC').limit(limit).map(&:post) 32 - 33 - posts.each do |s| 34 - puts "#{s.time} * https://bsky.app/profile/#{s.repo}/post/#{s.rkey}" 35 - puts s.text 36 - puts 37 - end 38 - end 39 40 - desc "Rescan all posts and rebuild the feed from scratch" 41 - task :rebuild_feed do 42 - feed = get_feed 43 - 44 - puts "Cleaning up feed..." 45 - FeedPost.where(feed_id: feed.feed_id).delete_all 46 - 47 - total = Post.count 48 - 49 - puts "Loading posts..." 50 - posts = Post.all.to_a 51 - 52 - posts.each_with_index do |post, i| 53 - print "Processing posts... [#{i + 1}/#{total}]\r" 54 - $stdout.flush 55 - 56 - if feed.post_matches?(post) 57 - FeedPost.create!(feed_id: feed.feed_id, post: post, time: post.time) 58 - end 59 - end 60 - 61 - puts "Processing posts... Done." + " " * 30 62 end
··· 1 require 'bundler/setup' 2 require 'blue_factory/rake' 3 require 'sinatra/activerecord' 4 require 'sinatra/activerecord/rake' 5 6 + Rake.add_rakelib File.join(__dir__, 'lib', 'tasks') 7 8 + if ENV['ARLOG'] == '1' 9 + ActiveRecord::Base.logger = Logger.new(STDOUT) 10 end
+7 -18
app/config.rb
··· 1 - Dir[File.join(__dir__, 'feeds', '*.rb')].each { |f| require(f) } 2 3 - require 'blue_factory' 4 - require 'sinatra/activerecord' 5 - 6 - ActiveRecord::Base.connection.execute "PRAGMA journal_mode = WAL" 7 8 BlueFactory.set :publisher_did, 'did:plc:<your_identifier_here>' 9 BlueFactory.set :hostname, 'feeds.example.com' 10 11 BlueFactory.add_feed 'linux', LinuxFeed.new 12 BlueFactory.add_feed 'starwars', StarWarsFeed.new 13 - 14 - # do any additional config & customization on BlueFactory::Server here: 15 - # 16 - # BlueFactory::Server.disable :logging 17 - # BlueFactory::Server.set :port, 4000 18 - # 19 - # BlueFactory::Server.get '/' do 20 - # redirect 'https://web.example.com' 21 - # end 22 - # 23 - # BlueFactory::Server.before do 24 - # headers "X-Powered-By" => "BlueFactory/#{BlueFactory::VERSION}" 25 - # end
··· 1 + require_relative 'init' 2 3 + Dir[File.join(__dir__, 'feeds', '*.rb')].each { |f| require(f) } 4 5 BlueFactory.set :publisher_did, 'did:plc:<your_identifier_here>' 6 BlueFactory.set :hostname, 'feeds.example.com' 7 8 + # uncomment to enable authentication (note: does not verify signatures) 9 + # see Feed#get_posts(params, visitor_did) in app/feeds/feed.rb 10 + # BlueFactory.set :enable_unsafe_auth, true 11 + 12 + BlueFactory.add_feed 'kit', KitFeed.new 13 BlueFactory.add_feed 'linux', LinuxFeed.new 14 BlueFactory.add_feed 'starwars', StarWarsFeed.new
+49
app/feeds/feed.rb
··· 1 require 'blue_factory/errors' 2 require 'time' 3 4 require_relative '../models/feed_post' ··· 31 nil 32 end 33 34 def get_posts(params) 35 limit = check_query_limit(params) 36 query = FeedPost.where(feed_id: feed_id).joins(:post).select('posts.repo, posts.rkey, feed_posts.time, post_id') ··· 48 49 { cursor: cursor, posts: posts.map { |p| 'at://' + p.repo + '/app.bsky.feed.post/' + p.rkey }} 50 end 51 52 53 private
··· 1 require 'blue_factory/errors' 2 + require 'rainbow' 3 require 'time' 4 5 require_relative '../models/feed_post' ··· 32 nil 33 end 34 35 + # (optional) special feed type (return :video for video feeds) 36 + def content_mode 37 + nil 38 + end 39 + 40 + # (optional) should posts be added to the feed from the firehose? 41 + def is_updating? 42 + true 43 + end 44 + 45 + # if the feed matches posts using keywords/regexps, highlight these keywords in the passed text 46 + def colored_text(text) 47 + text 48 + end 49 + 50 def get_posts(params) 51 limit = check_query_limit(params) 52 query = FeedPost.where(feed_id: feed_id).joins(:post).select('posts.repo, posts.rkey, feed_posts.time, post_id') ··· 64 65 { cursor: cursor, posts: posts.map { |p| 'at://' + p.repo + '/app.bsky.feed.post/' + p.rkey }} 66 end 67 + 68 + # Use this version of the method when enable_unsafe_auth is set to true in app/config.rb. 69 + # This method is called with the DID of the user viewing the feed decoded from the auth header. 70 + # 71 + # Important: BlueFactory does not verify auth signatures of the JWT token at the moment, 72 + # which means that anyone can easily spoof the DID in the header - do not use for anything critical! 73 + # 74 + # def get_posts(params, visitor_did) 75 + # # You can use this DID to e.g.: 76 + # # 1) Provide a personalized feed: 77 + # 78 + # user = User.find_by(did: visitor_did) 79 + # raise BlueFactory::AuthorizationError unless user 80 + # build_feed_for_user(user) 81 + # 82 + # # 2) Only allow whitelisted users to view the feed and show it as empty to others: 83 + # if visitor_did == BlueFactory.publisher_did || ALLOWED_USERS.include?(visitor_did) 84 + # get_feed_posts(params) 85 + # else 86 + # { posts: [] } 87 + # end 88 + # 89 + # # 3) Ban some users from accessing the feed: 90 + # # (note: you can return a custom error message that will be shown in the error banner in the app) 91 + # if BANNED_USERS.include?(visitor_did) 92 + # raise BlueFactory::AuthorizationError, "You shall not pass!" 93 + # else 94 + # get_feed_posts(params) 95 + # end 96 + # 97 + # # 4) Collect feed analytics (please take the privacy of your users into account) 98 + # Stats.count_visit(visitor_did) 99 + # end 100 101 102 private
+50
app/feeds/kit_feed.rb
···
··· 1 + require_relative 'feed' 2 + 3 + class KitFeed < Feed 4 + KIT_REGEX = [/\bkit\b/i] 5 + CAT_REGEX = [/\bcat\b/i, /\bkitty\b/i] 6 + 7 + PAUL = 'did:plc:ragtjsm2j2vknwkz3zp4oxrd' 8 + 9 + def feed_id 10 + 3 11 + end 12 + 13 + def display_name 14 + "Kit Feed" 15 + end 16 + 17 + def description 18 + "Photos of Paul's lovely cat Kit ๐Ÿฑ" 19 + end 20 + 21 + def avatar_file 22 + "images/kitkat.jpg" 23 + end 24 + 25 + def post_matches?(post) 26 + return false unless post.repo == PAUL 27 + 28 + alt = embed_text(post) 29 + return false if alt.nil? 30 + 31 + KIT_REGEX.any? { |r| alt =~ r } || (CAT_REGEX.any? { |r| alt =~ r } && KIT_REGEX.any? { |r| post.text =~ r }) 32 + end 33 + 34 + def embed_text(post) 35 + if embed = post.record['embed'] 36 + if images = (embed['images'] || embed['media'] && embed['media']['images']) 37 + images.map { |i| i['alt'] }.compact.join("\n") 38 + end 39 + end 40 + end 41 + 42 + def colored_text(t) 43 + text = t.dup 44 + 45 + KIT_REGEX.each { |r| text.gsub!(r) { |s| Rainbow(s).green }} 46 + CAT_REGEX.each { |r| text.gsub!(r) { |s| Rainbow(s).bright.orange }} 47 + 48 + text 49 + end 50 + end
+45 -3
app/feeds/linux_feed.rb
··· 2 3 class LinuxFeed < Feed 4 REGEXPS = [ 5 - /linux/i, /debian/i, /ubuntu/i, /\bKDE\b/, /\bGTK\d?\b/ 6 ] 7 8 def feed_id ··· 14 end 15 16 def description 17 - "Feed with posts about Linux" 18 end 19 20 def avatar_file ··· 22 end 23 24 def post_matches?(post) 25 - REGEXPS.any? { |r| post.text =~ r } 26 end 27 end
··· 2 3 class LinuxFeed < Feed 4 REGEXPS = [ 5 + /linux/i, /debian/i, /ubuntu/i, /\bredhat\b/i, /\bRHEL\b/, /\bSUSE\b/, /\bCentOS\b/, /\bopensuse\b/i, 6 + /\bslackware\b/i, /\bKDE\b/, /\bGTK\d?\b/, /#GNOME\b/, /\bGNOME\s?\d+/, /\bkde plasma\b/i, 7 + /apt\-get/, /\bflatpak\b/i, /\b[Xx]org\b/ 8 + ] 9 + 10 + EXCLUDE = [ 11 + /\bmastos?\b/i, /mast[oa]d[oa]n/i, /\bfederat(ion|ed)\b/i, /fediverse/i, /at\s?protocol/i, 12 + /social (media|networks?)/i, /microblogging/i, /\bthreads\b/i, /\bnostr\b/i, 13 + /the linux of/i, /linux (bros|nerds)/i, /ubuntu tv/i 14 + ] 15 + 16 + LINK_EXCLUDES = [ 17 + /\bamzn\.to\b/i, /\bwww\.amazon\.com\b/i, /\bmercadolivre\.com\b/i, 18 + ] 19 + 20 + MUTED_PROFILES = [ 21 + 'did:plc:35c6qworuvguvwnpjwfq3b5p', # Linux Kernel Releases 22 + 'did:plc:ppuqidjyabv5iwzeoxt4fq5o', # GitHub Trending JS/TS 23 + 'did:plc:eidn2o5kwuaqcss7zo7ivye5', # GitHub Trending 24 + 'did:plc:lontmsdex36tfjyxjlznnea7', # RustTrending 25 + 'did:plc:myutg2pwkjbukv7pq2hp5mtl', # CVE Alerts 26 ] 27 28 def feed_id ··· 34 end 35 36 def description 37 + "All posts on Bluesky about Linux and its popular distributions & desktop environments" 38 end 39 40 def avatar_file ··· 42 end 43 44 def post_matches?(post) 45 + return false if MUTED_PROFILES.include?(post.repo) 46 + 47 + REGEXPS.any? { |r| post.text =~ r } && !(EXCLUDE.any? { |r| post.text =~ r }) && !has_forbidden_links?(post) 48 + end 49 + 50 + def has_forbidden_links?(post) 51 + if embed = post.record['embed'] 52 + if link = (embed['external'] || embed['media'] && embed['media']['external']) 53 + return true if LINK_EXCLUDES.any? { |r| r =~ link['uri'] } 54 + end 55 + end 56 + 57 + return false 58 + end 59 + 60 + def colored_text(t) 61 + text = t.dup 62 + 63 + EXCLUDE.each { |r| text.gsub!(r) { |s| Rainbow(s).red }} 64 + LINK_EXCLUDES.each { |r| text.gsub!(r) { |s| Rainbow(s).red }} 65 + REGEXPS.each { |r| text.gsub!(r) { |s| Rainbow(s).green }} 66 + 67 + text 68 end 69 end
+9 -1
app/feeds/star_wars_feed.rb
··· 16 end 17 18 def description 19 - "Feed with posts about Star Wars" 20 end 21 22 def avatar_file ··· 25 26 def post_matches?(post) 27 REGEXPS.any? { |r| post.text =~ r } 28 end 29 end
··· 16 end 17 18 def description 19 + "Feed with posts mentioning Star Wars movies and TV shows and various SW characters" 20 end 21 22 def avatar_file ··· 25 26 def post_matches?(post) 27 REGEXPS.any? { |r| post.text =~ r } 28 + end 29 + 30 + def colored_text(t) 31 + text = t.dup 32 + 33 + REGEXPS.each { |r| text.gsub!(r) { |s| Rainbow(s).green }} 34 + 35 + text 36 end 37 end
+236 -35
app/firehose_stream.rb
··· 5 require_relative 'config' 6 require_relative 'models/feed_post' 7 require_relative 'models/post' 8 9 class FirehoseStream 10 - attr_accessor :show_progress, :log_status, :log_posts, :save_posts 11 12 - def initialize 13 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 14 15 @show_progress = (@env == :development) ? true : false 16 @log_status = true 17 @log_posts = (@env == :development) ? :matching : false 18 @save_posts = (@env == :development) ? :all : :matching 19 20 - @feeds = BlueFactory.all_feeds 21 end 22 23 def start 24 return if @sky 25 26 - @sky = Skyfall::Stream.new('bsky.social', :subscribe_repos) 27 28 @sky.on_message do |m| 29 - handle_message(m) 30 end 31 32 if @log_status 33 - @sky.on_connect { puts "Connected #{Time.now} โœ“" } 34 - @sky.on_disconnect { puts; puts "Disconnected #{Time.now}" } 35 - @sky.on_reconnect { puts "Reconnecting..." } 36 - @sky.on_error { |e| puts "ERROR: #{Time.now} #{e}" } 37 end 38 39 @sky.connect 40 end 41 42 def stop 43 @sky&.disconnect 44 @sky = nil 45 end 46 47 - def handle_message(msg) 48 - return if msg.type != :commit 49 50 msg.operations.each do |op| 51 case op.type ··· 54 55 when :bsky_like, :bsky_repost 56 # if you want to use the number of likes and/or reposts for filtering or sorting: 57 - # add a likes/reposts column to feeds, then do +1 / -1 here depending on op.action 58 59 when :bsky_follow 60 # if you want to make a personalized feed that needs info about given user's follows/followers: ··· 63 else 64 # other types like :bsky_block, :bsky_profile (includes profile edits) 65 end 66 end 67 end 68 69 def process_post(msg, op) 70 if op.action == :delete 71 - if post = Post.find_by(repo: op.repo, rkey: op.rkey) 72 post.destroy 73 end 74 end ··· 76 return unless op.action == :create 77 78 begin 79 - text = op.raw_record['text'] 80 81 - # tip: if you don't need full record data for debugging, delete the data column in posts 82 - post = Post.new( 83 - repo: op.repo, 84 - time: msg.time, 85 - text: text, 86 - rkey: op.rkey, 87 - data: JSON.generate(op.raw_record) 88 - ) 89 90 - matched = false 91 92 - @feeds.each do |feed| 93 - if feed.post_matches?(post) 94 - FeedPost.create!(feed_id: feed.feed_id, post: post, time: msg.time) unless !@save_posts 95 - matched = true 96 - end 97 end 98 99 - if @log_posts == :all || @log_posts && matched 100 - puts 101 - puts text 102 end 103 104 - post.save! if @save_posts == :all 105 - rescue StandardError => e 106 - puts "Error: #{e}" 107 - p msg unless @env == :production || e.message == "nesting of 100 is too deep" 108 end 109 110 print '.' if @show_progress && @log_posts != :all 111 end 112 end
··· 5 require_relative 'config' 6 require_relative 'models/feed_post' 7 require_relative 'models/post' 8 + require_relative 'models/subscription' 9 + require_relative 'utils' 10 11 class FirehoseStream 12 + attr_accessor :start_cursor, :show_progress, :log_status, :log_posts, :save_posts, :replay_events 13 + 14 + DEFAULT_JETSTREAM = 'jetstream2.us-east.bsky.network' 15 + POSTS_BATCH_SIZE = 100 16 + 17 + include Utils 18 19 + def initialize(service = nil) 20 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 21 + @service = service || DEFAULT_JETSTREAM 22 23 @show_progress = (@env == :development) ? true : false 24 @log_status = true 25 @log_posts = (@env == :development) ? :matching : false 26 @save_posts = (@env == :development) ? :all : :matching 27 + @replay_events = (@env == :development) ? false : true 28 29 + @feeds = BlueFactory.all_feeds.select(&:is_updating?) 30 + @post_queue = [] 31 end 32 33 def start 34 return if @sky 35 36 + last_cursor = load_or_init_cursor 37 + cursor = @replay_events ? (@start_cursor || last_cursor) : nil 38 + 39 + @sky = sky = Skyfall::Jetstream.new(@service, { 40 + cursor: cursor, 41 + 42 + # we ask Jetstream to only send us post records, since we don't need anything else 43 + # if you need to process e.g. likes or follows too, update or remove this param 44 + wanted_collections: ['app.bsky.feed.post'], 45 + }) 46 + 47 + # set your user agent here to identify yourself on the relay 48 + # @sky.user_agent = "My Feed Server (@my.handle) #{@sky.version_string}" 49 + 50 + @sky.check_heartbeat = true 51 52 @sky.on_message do |m| 53 + process_message(m) 54 end 55 56 if @log_status 57 + @sky.on_connecting { |u| log "Connecting to #{u}..." } 58 + 59 + @sky.on_connect { 60 + @message_counter = 0 61 + @replaying = !!(cursor) 62 + log "Connected โœ“" 63 + } 64 + 65 + @sky.on_disconnect { 66 + log "Disconnected." 67 + } 68 + 69 + @sky.on_reconnect { 70 + log "Connection lost, reconnecting..." 71 + } 72 + 73 + @sky.on_timeout { 74 + log "Trying to reconnect..." 75 + } 76 + 77 + @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" } 78 end 79 80 @sky.connect 81 end 82 83 def stop 84 + save_queued_posts 85 + save_cursor(@sky.cursor) unless @sky.nil? 86 + 87 @sky&.disconnect 88 @sky = nil 89 end 90 91 + def load_or_init_cursor 92 + if sub = Subscription.find_by(service: @service) 93 + sub.cursor 94 + else 95 + Subscription.create!(service: @service, cursor: 0) 96 + nil 97 + end 98 + end 99 + 100 + def save_cursor(cursor) 101 + Subscription.where(service: @service).update_all(cursor: cursor) 102 + end 103 + 104 + def process_message(msg) 105 + if msg.type == :info 106 + # ATProto error, the only one right now is "OutdatedCursor" 107 + log "InfoMessage: #{msg}" 108 + 109 + elsif msg.type == :identity 110 + # use these events if you want to track handle changes: 111 + # log "Handle change: #{msg.repo} => #{msg.handle}" 112 + 113 + elsif msg.type == :account 114 + # tracking account status changes, e.g. suspensions, deactivations and deletes 115 + process_account_message(msg) 116 + 117 + elsif msg.unknown? 118 + log "Unknown message type: #{msg.type} (#{msg.seq})" 119 + end 120 + 121 + return unless msg.type == :commit 122 + 123 + if @replaying 124 + log "Replaying events since #{msg.time.getlocal} -->" 125 + @replaying = false 126 + end 127 + 128 + @message_counter += 1 129 + 130 + if @message_counter % 100 == 0 131 + # save current cursor every 100 events 132 + save_cursor(msg.seq) 133 + end 134 135 msg.operations.each do |op| 136 case op.type ··· 139 140 when :bsky_like, :bsky_repost 141 # if you want to use the number of likes and/or reposts for filtering or sorting: 142 + # add a likes/reposts table, then add/remove records here depending on op.action 143 + # (you'll need to track like records and not just have a single numeric "likes" field, 144 + # because delete events only include the uri/rkey of the like, not of the liked post) 145 146 when :bsky_follow 147 # if you want to make a personalized feed that needs info about given user's follows/followers: ··· 150 else 151 # other types like :bsky_block, :bsky_profile (includes profile edits) 152 end 153 + end 154 + end 155 + 156 + def process_account_message(msg) 157 + if msg.status == :deleted 158 + # delete all data we have stored about this account 159 + FeedPost.joins(:post).where(post: { repo: msg.did }).delete_all 160 + Post.where(repo: msg.did).delete_all 161 end 162 end 163 164 def process_post(msg, op) 165 if op.action == :delete 166 + if post = Post.find_by_repo_rkey(op.repo, op.rkey) 167 post.destroy 168 end 169 end ··· 171 return unless op.action == :create 172 173 begin 174 + if op.raw_record.nil? 175 + log "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})" 176 + return 177 + end 178 + rescue CBOR::UnpackError => e 179 + log "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}" 180 + return 181 + end 182 + 183 + # ignore posts with past date from Twitter etc. imported using some kind of tool 184 + begin 185 + post_time = Time.parse(op.raw_record['createdAt']) 186 + return if post_time < msg.time - 86400 187 + rescue StandardError => e 188 + log "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})" 189 + return 190 + end 191 192 + record = op.raw_record 193 + text = record['text'] 194 195 + # to save space, delete redundant post text and type from the saved data JSON 196 + record.delete('$type') 197 + record.delete('text') 198 + trimmed_json = JSON.generate(record) 199 200 + # tip: if you don't need full record data for debugging, delete the data column in posts 201 + post = Post.new( 202 + repo: op.repo, 203 + time: msg.time, 204 + text: text, 205 + rkey: op.rkey, 206 + data: trimmed_json, 207 + record: record 208 + ) 209 + 210 + if !post.valid? 211 + if post.errors.has_key?(:data) 212 + post.trim_too_long_data 213 end 214 215 + if !post.valid? 216 + log "Error: post is invalid: #{op.uri} (#{msg.seq}): #{post.errors.to_a.join(', ')}" 217 + return 218 end 219 + end 220 221 + matched = false 222 + 223 + @feeds.each do |feed| 224 + if feed.post_matches?(post) 225 + post.feed_posts.build(feed_id: feed.feed_id, time: msg.time) unless !@save_posts 226 + matched = true 227 + end 228 + end 229 + 230 + if @log_posts == :all || @log_posts && matched 231 + puts 232 + puts text 233 + end 234 + 235 + if @save_posts == :all 236 + # wait until we have 100 posts and then save them all in one insert, if possible 237 + @post_queue << post 238 + 239 + if @post_queue.length >= POSTS_BATCH_SIZE 240 + save_queued_posts 241 + end 242 + elsif @save_posts == :matching && matched 243 + # save immediately because matched posts might be rare; we've already checked validations 244 + post.save!(validate: false) 245 end 246 247 print '.' if @show_progress && @log_posts != :all 248 + rescue StandardError => e 249 + log "Error in #process_post: #{e}" 250 + 251 + unless e.message == "nesting of 100 is too deep" 252 + log msg.inspect 253 + log e.backtrace.reject { |x| x.include?('/ruby/') } 254 + end 255 + end 256 + 257 + def save_queued_posts 258 + # we can only squash posts into one insert statement if they don't have nested feed_posts 259 + # so we save those without feed_posts first: 260 + 261 + matched, unmatched = @post_queue.partition { |x| x.feed_posts.length > 0 } 262 + 263 + if unmatched.length > 0 264 + values = unmatched.map { |p| p.attributes.except('id') } 265 + Post.insert_all(values) 266 + end 267 + 268 + @post_queue = matched 269 + return if @post_queue.empty? 270 + 271 + # and for those that do have feed_posts, we save them normally, in one transaction: 272 + 273 + ActiveRecord::Base.transaction do 274 + @post_queue.each do |p| 275 + # skip validations since we've checked the posts before adding them to the queue 276 + p.save!(validate: false) 277 + end 278 + end 279 + 280 + @post_queue = [] 281 + rescue StandardError => e 282 + # there shouldn't be any ActiveRecord errors raised, but SQLite might find some issues which 283 + # aren't covered by AR validations; so in that case, try to save any valid posts one by one: 284 + 285 + @post_queue.each do |p| 286 + begin 287 + ActiveRecord::Base.transaction do 288 + p.save!(validate: false) 289 + end 290 + rescue StandardError => e 291 + log "Error in #save_queued_posts: #{e}" 292 + 293 + unless e.message == "nesting of 100 is too deep" 294 + log p.inspect 295 + log e.backtrace.reject { |x| x.include?('/ruby/') } 296 + end 297 + end 298 + end 299 + 300 + @post_queue = [] 301 + end 302 + 303 + def log(text) 304 + puts if @show_progress 305 + puts "[#{Time.now}] #{text}" 306 + end 307 + 308 + def inspect 309 + vars = instance_variables - [:@feeds] 310 + values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 311 + "#<#{self.class}:0x#{object_id} #{values}>" 312 end 313 end
+26
app/init.rb
···
··· 1 + require 'blue_factory' 2 + require 'sinatra/activerecord' 3 + 4 + if defined?(RubyVM::YJIT) && RubyVM::YJIT.respond_to?(:enabled?) 5 + if !RubyVM::YJIT.enabled? 6 + if RubyVM::YJIT.respond_to?(:enable) 7 + # Ruby 3.3+ 8 + RubyVM::YJIT.enable 9 + else 10 + # Ruby 3.2 11 + puts "-" * 106 12 + puts "Note: YJIT is not enabled. To improve performance, enable it by adding an ENV var RUBYOPT=\"--enable-yjit\"." 13 + puts "-" * 106 14 + end 15 + end 16 + else 17 + puts "-" * 112 18 + puts "Note: YJIT is not enabled. To improve performance, it's recommended to " + 19 + ((RUBY_VERSION.to_f >= 3.2) ? "install Ruby with YJIT support turned on." : "update to a newer Ruby with YJIT support.") 20 + puts "-" * 112 21 + end 22 + 23 + ar_logger = ActiveRecord::Base.logger 24 + ActiveRecord::Base.logger = nil 25 + ActiveRecord::Base.connection.execute "PRAGMA journal_mode = WAL" 26 + ActiveRecord::Base.logger = ar_logger
+75
app/models/post.rb
··· 1 require 'active_record' 2 3 class Post < ActiveRecord::Base 4 validates_presence_of :repo, :time, :data, :rkey 5 validates :text, length: { minimum: 0, allow_nil: false } 6 end
··· 1 require 'active_record' 2 + require 'json' 3 + 4 + require_relative 'feed_post' 5 6 class Post < ActiveRecord::Base 7 validates_presence_of :repo, :time, :data, :rkey 8 validates :text, length: { minimum: 0, allow_nil: false } 9 + validates_length_of :repo, maximum: 60 10 + validates_length_of :rkey, maximum: 16 11 + validates_length_of :text, maximum: 1000 12 + validates_length_of :data, maximum: 10000 13 + 14 + has_many :feed_posts, dependent: :destroy 15 + 16 + attr_writer :record 17 + 18 + def self.find_by_repo_rkey(repo, rkey) 19 + # the '+' is to make sure that SQLite uses the rkey index and not a different one 20 + Post.where("+repo = ?", repo).where(rkey: rkey).first 21 + end 22 + 23 + def self.find_by_at_uri(uri) 24 + parts = uri.gsub(%r(^at://), '').split('/') 25 + return nil unless parts.length == 3 && parts[1] == 'app.bsky.feed.post' 26 + 27 + find_by_repo_rkey(parts[0], parts[2]) 28 + end 29 + 30 + def record 31 + @record ||= JSON.parse(data) 32 + end 33 + 34 + def at_uri 35 + "at://#{repo}/app.bsky.feed.post/#{rkey}" 36 + end 37 + 38 + def quoted_post_uri 39 + if embed = record['embed'] 40 + if embed['$type'] == "app.bsky.embed.record" 41 + return embed['record']['uri'] 42 + elsif embed['$type'] == "app.bsky.embed.recordWithMedia" 43 + if embed['record']['$type'] == "app.bsky.embed.record" 44 + return embed['record']['record']['uri'] 45 + end 46 + end 47 + end 48 + 49 + return nil 50 + end 51 + 52 + def thread_root_uri 53 + if root = (record['reply'] && record['reply']['root']) 54 + root['uri'] 55 + else 56 + nil 57 + end 58 + end 59 + 60 + def parent_uri 61 + if parent = (record['reply'] && record['reply']['parent']) 62 + parent['uri'] 63 + else 64 + nil 65 + end 66 + end 67 + 68 + def trim_too_long_data 69 + if embed = record['embed'] 70 + if external = embed['external'] 71 + external['description'] = '' 72 + end 73 + end 74 + 75 + if record['bridgyOriginalText'] 76 + record['bridgyOriginalText'] = '' 77 + end 78 + 79 + self.data = JSON.generate(record) 80 + end 81 end
+5
app/models/subscription.rb
···
··· 1 + require 'active_record' 2 + 3 + class Subscription < ActiveRecord::Base 4 + validates_presence_of :service, :cursor 5 + end
+32
app/post_console_printer.rb
···
··· 1 + require 'json' 2 + require 'rainbow' 3 + 4 + class PostConsolePrinter 5 + def initialize(feed) 6 + @feed = feed 7 + Rainbow.enabled = true 8 + end 9 + 10 + def display(post) 11 + print Rainbow(post.time).bold + ' * ' + Rainbow(post.id).bold + ' * ' 12 + 13 + langs = post.record['langs'] 14 + if langs.nil? 15 + print '[nil] * ' 16 + elsif langs != ['en'] 17 + print "[#{langs.join(', ')}] * " 18 + end 19 + 20 + puts Rainbow("https://bsky.app/profile/#{post.repo}/post/#{post.rkey}").darkgray 21 + puts 22 + puts @feed.colored_text(post.text) 23 + if post.record['embed'] 24 + json = JSON.generate(post.record['embed']) 25 + colored = @feed.colored_text(json) 26 + puts colored unless colored == json 27 + end 28 + puts 29 + puts "---" 30 + puts 31 + end 32 + end
+56
app/server.rb
···
··· 1 + require 'blue_factory' 2 + 3 + module Server 4 + def self.configure 5 + self.instance_method(:run).bind_call(BlueFactory::Server) 6 + end 7 + 8 + def run 9 + register Sinatra::ActiveRecordExtension 10 + 11 + # do any additional config & customization on BlueFactory::Server here 12 + # see Sinatra docs for more info: https://sinatrarb.com/intro.html 13 + # e.g.: 14 + # 15 + # disable :logging 16 + # enable :static 17 + # set :views, File.expand_path('views', __dir__) 18 + # set :default_encoding, 'cp1250' 19 + # 20 + # before do 21 + # headers "X-Powered-By" => "BlueFactory/#{BlueFactory::VERSION}" 22 + # end 23 + # 24 + # get '/' do 25 + # erb :index 26 + # end 27 + 28 + get '/' do 29 + content_type 'text/html' 30 + 31 + html = %( 32 + <style> 33 + body { width: 960px; margin: 40px auto; } li { margin: 5px 0px; } 34 + a { text-decoration: none; color: #00e; } a:hover { text-decoration: underline; } a:visited { color: #00e; } 35 + </style> 36 + <h2>Bluesky Feed Server at #{request.host}</h2> 37 + <p>This is an AT Protocol XRPC service hosting a Bluesky custom feed generator.</p> 38 + <p>Available feeds:</p> 39 + <ul> 40 + ) 41 + 42 + BlueFactory.feed_keys.each do |k| 43 + feed = BlueFactory.get_feed(k) 44 + title = feed.display_name 45 + html << %(<li><a href="https://bsky.app/profile/#{BlueFactory.publisher_did}/feed/#{k}">#{title}</a></li>\n) 46 + end 47 + 48 + html << %( 49 + </ul> 50 + <p>Powered by Ruby and <a href="https://github.com/mackuba/blue_factory">BlueFactory</a>.</p> 51 + ) 52 + 53 + html 54 + end 55 + end 56 + end
+13
app/utils.rb
···
··· 1 + require 'didkit' 2 + 3 + module Utils 4 + def handle_from_did(did) 5 + DID.new(did).get_validated_handle 6 + end 7 + 8 + def did_from_handle(handle) 9 + DID.resolve_handle(handle).did 10 + end 11 + 12 + extend self 13 + end
+9
bin/console
···
··· 1 + #!/usr/bin/env ruby 2 + 3 + require 'bundler/setup' 4 + require 'irb' 5 + require_relative '../app/config' 6 + 7 + Dir[File.join(__dir__, '..', 'app', '**', '*.rb')].each { |f| require(f) } 8 + 9 + IRB.start(__FILE__)
+99
bin/firehose
···
··· 1 + #!/usr/bin/env ruby 2 + 3 + $LOAD_PATH.unshift(File.expand_path('..', __dir__)) 4 + 5 + require 'bundler/setup' 6 + require 'app/firehose_stream' 7 + 8 + $stdout.sync = true 9 + 10 + if ENV['ARLOG'] == '1' 11 + ActiveRecord::Base.logger = Logger.new(STDOUT) 12 + else 13 + ActiveRecord::Base.logger = nil 14 + end 15 + 16 + def print_help 17 + puts "Usage: #{$0} [options...]" 18 + puts "Options:" 19 + puts 20 + puts " * Showing progress: [default: show in development]" 21 + puts " -p = show progress dots for each received message" 22 + puts " -np = don't show progress dots" 23 + puts 24 + puts " * Logging status changes: [default: log in any mode]" 25 + puts " -ns = don't log status changes" 26 + puts 27 + puts " * Logging post text: [default: -lm in development, -nl in production]" 28 + puts " -lm = log text of matching posts" 29 + puts " -la = log text of every post" 30 + puts " -nl = don't log posts" 31 + puts 32 + puts " * Saving posts to db: [default: -da in development, -dm in production]" 33 + puts " -da = save all posts to database" 34 + puts " -dm = save only matching posts to database" 35 + puts " -nd = don't save any posts" 36 + puts 37 + puts " * Replaying missed events: [default: -nr in development, -r in production]" 38 + puts " -r = pass a cursor param when connecting to replay any missed events" 39 + puts " -nr = don't replay missed events" 40 + puts " -r12345 = start from this specific cursor" 41 + end 42 + 43 + firehose = FirehoseStream.new(ENV['FIREHOSE']) 44 + 45 + ARGV.each do |arg| 46 + case arg 47 + when '-p' 48 + firehose.show_progress = true 49 + when '-np' 50 + firehose.show_progress = false 51 + when '-ns' 52 + firehose.log_status = false 53 + when '-lm' 54 + firehose.log_posts = :matching 55 + when '-la' 56 + firehose.log_posts = :all 57 + when '-nl' 58 + firehose.log_posts = false 59 + when '-dm' 60 + firehose.save_posts = :matching 61 + when '-da' 62 + firehose.save_posts = :all 63 + when '-nd' 64 + firehose.save_posts = false 65 + when '-r' 66 + firehose.replay_events = true 67 + when /^\-r(\d+)$/ 68 + firehose.replay_events = true 69 + firehose.start_cursor = $1.to_i 70 + when '-nr' 71 + firehose.replay_events = false 72 + when '-h', '--help' 73 + print_help 74 + exit 0 75 + else 76 + puts "Unrecognized option: #{arg}" 77 + print_help 78 + exit 1 79 + end 80 + end 81 + 82 + trap("SIGINT") { 83 + puts 84 + firehose.log "Stopping..." 85 + 86 + EM.add_timer(0) { 87 + firehose.stop 88 + } 89 + } 90 + 91 + trap("SIGTERM") { 92 + firehose.log "Shutting down the service..." 93 + 94 + EM.add_timer(0) { 95 + firehose.stop 96 + } 97 + } 98 + 99 + firehose.start
+13
bin/server
···
··· 1 + #!/usr/bin/env ruby 2 + 3 + $LOAD_PATH.unshift(File.expand_path('..', __dir__)) 4 + 5 + require 'bundler/setup' 6 + 7 + require 'app/config' 8 + require 'app/server' 9 + 10 + Server.configure 11 + 12 + BlueFactory::Server.set :port, 3000 13 + BlueFactory::Server.run!
+2 -4
config/database.yml
··· 1 development: 2 adapter: sqlite3 3 database: db/blueskydev.sqlite3 4 - pool: 5 5 - timeout: 10000 6 7 production: 8 adapter: sqlite3 9 database: db/bluesky.sqlite3 10 - pool: 5 11 - timeout: 10000
··· 1 development: 2 adapter: sqlite3 3 database: db/blueskydev.sqlite3 4 + timeout: 5000 5 6 production: 7 adapter: sqlite3 8 database: db/bluesky.sqlite3 9 + timeout: 30000
+4 -1
config/deploy.rb
··· 7 set :application, "bsky_feeds" 8 set :repository, "git@github.com:mackuba/bluesky-feeds-rb.git" 9 set :scm, :git 10 - set :keep_releases, 5 11 set :use_sudo, false 12 set :deploy_to, "/var/www/bsky_feeds" 13 set :deploy_via, :remote_cache 14 set :migrate_env, "RACK_ENV=production" 15 16 server "feeds.example.com", :app, :web, :db, :primary => true 17 ··· 34 35 task :link_shared do 36 run "ln -s #{shared_path}/bluesky.sqlite3 #{release_path}/db/bluesky.sqlite3" 37 end 38 end
··· 7 set :application, "bsky_feeds" 8 set :repository, "git@github.com:mackuba/bluesky-feeds-rb.git" 9 set :scm, :git 10 + set :keep_releases, 10 11 set :use_sudo, false 12 set :deploy_to, "/var/www/bsky_feeds" 13 set :deploy_via, :remote_cache 14 set :migrate_env, "RACK_ENV=production" 15 + set :public_children, [] 16 17 server "feeds.example.com", :app, :web, :db, :primary => true 18 ··· 35 36 task :link_shared do 37 run "ln -s #{shared_path}/bluesky.sqlite3 #{release_path}/db/bluesky.sqlite3" 38 + run "ln -s #{shared_path}/bluesky.sqlite3-shm #{release_path}/db/bluesky.sqlite3-shm" 39 + run "ln -s #{shared_path}/bluesky.sqlite3-wal #{release_path}/db/bluesky.sqlite3-wal" 40 end 41 end
+10 -3
config.ru
··· 1 require_relative 'app/config' 2 3 # might not be needed depending on the app server you use - comment out these lines to leave logs on STDOUT 4 Dir.mkdir('log') unless Dir.exist?('log') 5 - log = File.new("log/sinatra.log", "a+") 6 - log.sync = true 7 - use Rack::CommonLogger, log 8 9 run BlueFactory::Server
··· 1 require_relative 'app/config' 2 + require_relative 'app/server' 3 4 # might not be needed depending on the app server you use - comment out these lines to leave logs on STDOUT 5 Dir.mkdir('log') unless Dir.exist?('log') 6 + $sinatra_log = File.new("log/sinatra.log", "a+") 7 + 8 + # flush logs to the file immediately instead of buffering 9 + $sinatra_log.sync = true 10 + 11 + # Sinatra turns off its own logging to stdout if another logger is in the stack 12 + use Rack::CommonLogger, $sinatra_log 13 + 14 + Server.configure 15 16 run BlueFactory::Server
+10
db/migrate/20230727134424_add_subscriptions.rb
···
··· 1 + class AddSubscriptions < ActiveRecord::Migration[6.1] 2 + def change 3 + create_table :subscriptions do |t| 4 + t.string :service, null: false 5 + t.integer :cursor, null: false 6 + end 7 + 8 + add_index :subscriptions, :service, unique: true 9 + end 10 + end
+5
db/migrate/20230802222353_add_index_on_time.rb
···
··· 1 + class AddIndexOnTime < ActiveRecord::Migration[6.1] 2 + def change 3 + add_index :posts, :time 4 + end 5 + end
+7
db/migrate/20240604130301_add_posts_repo_index.rb
···
··· 1 + require_relative '../../app/models/post' 2 + 3 + class AddPostsRepoIndex < ActiveRecord::Migration[6.1] 4 + def change 5 + add_index :posts, [:repo, :time] 6 + end 7 + end
+15
db/migrate/20241017135717_add_missing_limits.rb
···
··· 1 + class AddMissingLimits < ActiveRecord::Migration[6.1] 2 + def up 3 + change_table :posts do |t| 4 + t.change :repo, :string, limit: 60, null: false 5 + t.change :rkey, :string, limit: 16, null: false 6 + end 7 + end 8 + 9 + def down 10 + change_table :posts do |t| 11 + t.change :repo, :string, null: false 12 + t.change :rkey, :string, null: false 13 + end 14 + end 15 + end
+5
db/migrate/20241022002658_add_feed_posts_post_index.rb
···
··· 1 + class AddFeedPostsPostIndex < ActiveRecord::Migration[6.1] 2 + def change 3 + add_index :feed_posts, :post_id 4 + end 5 + end
+13 -6
db/schema.rb
··· 10 # 11 # It's strongly recommended that you check this file into your version control system. 12 13 - ActiveRecord::Schema.define(version: 2023_06_15_155215) do 14 - 15 create_table "feed_posts", force: :cascade do |t| 16 t.integer "feed_id", null: false 17 t.integer "post_id", null: false 18 - t.datetime "time", null: false 19 t.index ["feed_id", "time"], name: "index_feed_posts_on_feed_id_and_time" 20 end 21 22 create_table "posts", force: :cascade do |t| 23 - t.string "repo", null: false 24 - t.datetime "time", null: false 25 t.string "text", null: false 26 t.text "data", null: false 27 - t.string "rkey", null: false 28 t.index ["rkey"], name: "index_posts_on_rkey" 29 end 30 31 end
··· 10 # 11 # It's strongly recommended that you check this file into your version control system. 12 13 + ActiveRecord::Schema[8.0].define(version: 2024_10_22_002658) do 14 create_table "feed_posts", force: :cascade do |t| 15 t.integer "feed_id", null: false 16 t.integer "post_id", null: false 17 + t.datetime "time", precision: nil, null: false 18 t.index ["feed_id", "time"], name: "index_feed_posts_on_feed_id_and_time" 19 + t.index ["post_id"], name: "index_feed_posts_on_post_id" 20 end 21 22 create_table "posts", force: :cascade do |t| 23 + t.string "repo", limit: 60, null: false 24 + t.datetime "time", precision: nil, null: false 25 t.string "text", null: false 26 t.text "data", null: false 27 + t.string "rkey", limit: 16, null: false 28 + t.index ["repo", "time"], name: "index_posts_on_repo_and_time" 29 t.index ["rkey"], name: "index_posts_on_rkey" 30 + t.index ["time"], name: "index_posts_on_time" 31 end 32 33 + create_table "subscriptions", force: :cascade do |t| 34 + t.string "service", null: false 35 + t.integer "cursor", null: false 36 + t.index ["service"], name: "index_subscriptions_on_service", unique: true 37 + end 38 end
+1 -1
dist/bsky_feeds.service
··· 6 Type=simple 7 User=alf 8 WorkingDirectory=/var/www/bsky_feeds/current 9 - ExecStart=/usr/bin/ruby firehose.rb 10 Environment="RACK_ENV=production" 11 TimeoutSec=15 12 Restart=on-failure
··· 6 Type=simple 7 User=alf 8 WorkingDirectory=/var/www/bsky_feeds/current 9 + ExecStart=/usr/bin/ruby bin/firehose 10 Environment="RACK_ENV=production" 11 TimeoutSec=15 12 Restart=on-failure
-65
firehose.rb
··· 1 - #!/usr/bin/env ruby 2 - 3 - require 'bundler/setup' 4 - require_relative 'app/firehose_stream' 5 - 6 - $stdout.sync = true 7 - 8 - ActiveRecord::Base.logger = nil 9 - 10 - def print_help 11 - puts "Usage: #{$0} [options...]" 12 - puts "Options:" 13 - puts 14 - puts " * Showing progress: [default: show in development]" 15 - puts " -p = show progress dots for each received message" 16 - puts " -np = don't show progress dots" 17 - puts 18 - puts " * Logging status changes: [default: log in any mode]" 19 - puts " -ns = don't log status changes" 20 - puts 21 - puts " * Logging post text: [default: -lm in development, -nl in production]" 22 - puts " -lm = log text of matching posts" 23 - puts " -la = log text of every post" 24 - puts " -nl = don't log posts" 25 - puts 26 - puts " * Saving posts to db: [default: -da in development, -dm in production]" 27 - puts " -da = save all posts to database" 28 - puts " -dm = save only matching posts to database" 29 - puts " -nd = don't save any posts" 30 - end 31 - 32 - firehose = FirehoseStream.new 33 - 34 - ARGV.each do |arg| 35 - case arg 36 - when '-p' 37 - firehose.show_progress = true 38 - when '-np' 39 - firehose.show_progress = false 40 - when '-ns' 41 - firehose.log_status = false 42 - when '-lm' 43 - firehose.log_posts = :matching 44 - when '-la' 45 - firehose.log_posts = :all 46 - when '-nl' 47 - firehose.log_posts = false 48 - when '-dm' 49 - firehose.save_posts = :matching 50 - when '-da' 51 - firehose.save_posts = :all 52 - when '-nd' 53 - firehose.save_posts = false 54 - when '-h', '--help' 55 - print_help 56 - exit 0 57 - else 58 - puts "Unrecognized option: #{arg}" 59 - print_help 60 - exit 1 61 - end 62 - end 63 - 64 - firehose.start 65 - sleep
···
images/kitkat.jpg

This is a binary file and will not be displayed.

images/linux_tux.png

This is a binary file and will not be displayed.

images/ruby.png

This is a binary file and will not be displayed.

+244
lib/tasks/feeds.rake
···
··· 1 + $LOAD_PATH.unshift(File.expand_path('../..', __dir__)) 2 + 3 + require 'app/config' 4 + require 'app/models/feed_post' 5 + require 'app/models/post' 6 + require 'app/post_console_printer' 7 + require 'app/utils' 8 + 9 + require 'base64' 10 + require 'json' 11 + require 'open-uri' 12 + require 'set' 13 + 14 + 15 + def get_feed 16 + if ENV['KEY'].to_s == '' 17 + puts "Please specify feed key as KEY=feedname (the part of the feed's at:// URI after the last slash)" 18 + exit 1 19 + end 20 + 21 + feed_key = ENV['KEY'] 22 + feed = BlueFactory.get_feed(feed_key) 23 + 24 + if feed.nil? 25 + puts "No feed configured for key '#{feed_key}' - use `BlueFactory.add_feed '#{feed_key}', MyFeed.new`" 26 + exit 1 27 + end 28 + 29 + feed 30 + end 31 + 32 + def make_jwt(payload) 33 + header = { typ: 'JWT', alg: 'ES256K' } 34 + sig = 'fakesig' 35 + 36 + fields = [header, payload].map { |d| Base64.encode64(JSON.generate(d)).chomp } + [sig] 37 + fields.join('.') 38 + end 39 + 40 + desc "Print posts in the feed, starting from the newest ones (limit = N)" 41 + task :print_feed do 42 + feed = get_feed 43 + limit = ENV['N'] ? ENV['N'].to_i : 100 44 + 45 + posts = FeedPost.where(feed_id: feed.feed_id).joins(:post).order('feed_posts.time DESC').limit(limit).map(&:post) 46 + 47 + # this fixes an error when piping a long output to less and then closing without reading it all 48 + Signal.trap("SIGPIPE", "SYSTEM_DEFAULT") 49 + 50 + printer = PostConsolePrinter.new(feed) 51 + 52 + posts.each do |s| 53 + printer.display(s) 54 + end 55 + end 56 + 57 + desc "Print feed by making an HTTP connection to the XRPC endpoint" 58 + task :test_feed do 59 + feed = get_feed 60 + limit = ENV['N'] ? ENV['N'].to_i : 100 61 + actor = ENV['DID'] || BlueFactory.publisher_did 62 + jwt = make_jwt({ iss: actor }) 63 + 64 + puts "Loading feed..." 65 + 66 + feed_uri = "at://#{BlueFactory.publisher_did}/app.bsky.feed.generator/#{ENV['KEY']}" 67 + port = ENV['PORT'] || BlueFactory::Server.settings.port 68 + url = "http://localhost:#{port}/xrpc/app.bsky.feed.getFeedSkeleton?limit=#{limit}&feed=#{feed_uri}" 69 + headers = { 'Authorization' => "Bearer #{jwt}" } 70 + 71 + json = JSON.parse(URI.open(url, headers).read) 72 + post_uris = json['feed'].map { |x| x['post'] } 73 + 74 + puts "Loading posts..." 75 + 76 + posts = post_uris.map { |uri| Post.find_by_at_uri(uri) }.compact 77 + 78 + Signal.trap("SIGPIPE", "SYSTEM_DEFAULT") 79 + printer = PostConsolePrinter.new(feed) 80 + 81 + posts.each do |s| 82 + printer.display(s) 83 + end 84 + end 85 + 86 + desc "Remove a single post from a feed" 87 + task :delete_feed_item do 88 + feed = get_feed 89 + 90 + if ENV['URL'].to_s == '' 91 + puts "Please specify post url as URL=https://bsky.app/..." 92 + exit 1 93 + end 94 + 95 + url = ENV['URL'] 96 + parts = url.gsub(/^https:\/\//, '').split('/') 97 + author = parts[2] 98 + rkey = parts[4] 99 + 100 + if author.start_with?('did:') 101 + did = author 102 + handle = Utils.handle_from_did(did) 103 + else 104 + handle = author 105 + did = Utils.did_from_handle(handle) 106 + end 107 + 108 + if item = FeedPost.joins(:post).find_by(feed_id: feed.feed_id, post: { repo: did, rkey: rkey }) 109 + item.destroy 110 + puts "Deleted post by @#{handle} from #{feed.display_name} feed" 111 + else 112 + puts "Post not found in the feed" 113 + end 114 + end 115 + 116 + desc "Rescan all posts and rebuild the feed from scratch (DAYS = number of days)" 117 + task :rebuild_feed do 118 + feed = get_feed 119 + method = ENV['UNSAFE'] ? :tap : :transaction 120 + dry = !!ENV['DRY_RUN'] 121 + 122 + if ENV['ONLY_EXISTING'] && ENV['APPEND_ONLY'] 123 + raise "APPEND_ONLY cannot be used together with ONLY_EXISTING" 124 + end 125 + 126 + ActiveRecord::Base.send(method) do 127 + if ENV['ONLY_EXISTING'] 128 + rescan_feed_items(feed, dry) 129 + else 130 + days = ENV['DAYS'] ? ENV['DAYS'].to_i : 7 131 + append_only = !!ENV['APPEND_ONLY'] 132 + 133 + matched_posts = rebuild_feed(feed, days, append_only, dry) 134 + 135 + if matched_posts && (filename = ENV['TO_FILE']) 136 + File.write(filename, matched_posts.map(&:id).to_json) 137 + end 138 + end 139 + end 140 + end 141 + 142 + def rescan_feed_items(feed, dry = false) 143 + feed_posts = FeedPost.where(feed_id: feed.feed_id).includes(:post).order('time DESC').to_a 144 + total = feed_posts.length 145 + 146 + puts "Processing posts..." 147 + 148 + deleted = [] 149 + 150 + feed_posts.each do |fp| 151 + if !feed.post_matches?(fp.post) 152 + if !dry 153 + puts "Deleting from feed: ##{fp.post.id} \"#{fp.post.text}\"" 154 + fp.destroy 155 + end 156 + 157 + deleted << fp.post 158 + end 159 + end 160 + 161 + if dry 162 + Signal.trap("SIGPIPE", "SYSTEM_DEFAULT") 163 + printer = PostConsolePrinter.new(feed) 164 + 165 + puts 166 + puts Rainbow("Posts to delete:").red 167 + puts Rainbow("==============================").red 168 + puts 169 + 170 + deleted.each do |p| 171 + printer.display(p) 172 + end 173 + 174 + puts Rainbow("#{deleted.length} post(s) would be deleted.").red 175 + else 176 + puts "Done (#{deleted.length} post(s) deleted)." 177 + end 178 + end 179 + 180 + def rebuild_feed(feed, days, append_only, dry = false) 181 + if append_only 182 + feed_posts = FeedPost.where(feed_id: feed.feed_id) 183 + current_post_ids = Set.new(feed_posts.pluck('post_id')) 184 + elsif !dry 185 + print "This will erase and replace the contents of the feed. Continue? [y/n]: " 186 + answer = STDIN.readline 187 + exit unless answer.strip.downcase == 'y' 188 + 189 + puts "Cleaning up feed..." 190 + FeedPost.where(feed_id: feed.feed_id).delete_all 191 + current_post_ids = [] 192 + end 193 + 194 + puts "Counting posts..." 195 + posts = Post.order('time, id') 196 + start = posts.where("time <= DATETIME('now', '-#{days} days')").last 197 + total = start ? Post.where("time > DATETIME('now', '-#{days} days')").count : Post.count 198 + 199 + offset = 0 200 + page = 100000 201 + matched_posts = [] 202 + 203 + loop do 204 + batch = if start 205 + posts.where("time > ? OR (time = ? AND id > ?)", start.time, start.time, start.id).limit(page).to_a 206 + else 207 + posts.limit(page).to_a 208 + end 209 + 210 + break if batch.empty? 211 + 212 + batch.each_with_index do |post, i| 213 + $stderr.print "Processing posts... [#{offset + i + 1}/#{total}]\r" if i % 100 == 99 214 + $stderr.flush 215 + 216 + if !current_post_ids.include?(post.id) && feed.post_matches?(post) 217 + matched_posts << post 218 + FeedPost.create!(feed_id: feed.feed_id, post: post, time: post.time) unless dry 219 + end 220 + end 221 + 222 + offset += page 223 + start = batch.last 224 + end 225 + 226 + $stderr.puts "Processing posts... Done." + " " * 30 227 + 228 + if dry || ENV['VERBOSE'] 229 + if append_only 230 + puts (dry ? "Posts to add: " : "Added posts: ") + matched_posts.length.to_s 231 + puts "==============================" 232 + puts 233 + end 234 + 235 + Signal.trap("SIGPIPE", "SYSTEM_DEFAULT") 236 + printer = PostConsolePrinter.new(feed) 237 + 238 + matched_posts.reverse.each do |p| 239 + printer.display(p) 240 + end 241 + 242 + matched_posts 243 + end 244 + end
+29
lib/tasks/posts.rake
···
··· 1 + $LOAD_PATH.unshift(File.expand_path('../..', __dir__)) 2 + 3 + require 'app/config' 4 + require 'app/models/post' 5 + 6 + 7 + desc "Delete posts older than N days that aren't included in a feed" 8 + task :cleanup_posts do 9 + days = ENV['DAYS'].to_i 10 + if days <= 0 11 + puts "Please specify number of days as e.g. DAYS=30 to delete posts older than that" 12 + exit 1 13 + end 14 + 15 + result = ActiveRecord::Base.connection.execute("SELECT DATETIME('now', '-#{days} days') AS time_limit") 16 + time_limit = result.first['time_limit'] 17 + 18 + subquery = %{ 19 + SELECT posts.id FROM posts 20 + LEFT JOIN feed_posts ON (feed_posts.post_id = posts.id) 21 + WHERE feed_posts.id IS NULL AND posts.time < DATETIME('now', '-#{days} days') 22 + } 23 + 24 + time_start = Time.now 25 + result = Post.where("id IN (#{subquery})").delete_all 26 + time_end = Time.now 27 + 28 + puts "#{time_end}: Deleted #{result} posts older than #{time_limit} in #{(time_end - time_start)}s" 29 + end
-8
server.rb
··· 1 - #!/usr/bin/env ruby 2 - 3 - require 'bundler/setup' 4 - 5 - require_relative 'app/config' 6 - 7 - BlueFactory::Server.set :port, 3000 8 - BlueFactory::Server.run!
···