Template of a custom feed generator service for the Bluesky network in Ruby

save cursor and use it on reconnect to replay events

+31 -3
app/firehose_stream.rb
··· 5 5 require_relative 'config' 6 6 require_relative 'models/feed_post' 7 7 require_relative 'models/post' 8 + require_relative 'models/subscription' 8 9 9 10 class FirehoseStream 10 - attr_accessor :show_progress, :log_status, :log_posts, :save_posts 11 + attr_accessor :show_progress, :log_status, :log_posts, :save_posts, :replay_events 11 12 12 13 def initialize 13 14 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 15 + @service = 'bsky.social' 14 16 15 17 @show_progress = (@env == :development) ? true : false 16 18 @log_status = true 17 19 @log_posts = (@env == :development) ? :matching : false 18 20 @save_posts = (@env == :development) ? :all : :matching 21 + @replay_events = (@env == :development) ? false : true 19 22 20 23 @feeds = BlueFactory.all_feeds 21 24 end ··· 23 26 def start 24 27 return if @sky 25 28 26 - @sky = Skyfall::Stream.new('bsky.social', :subscribe_repos) 29 + last_cursor = load_or_init_cursor 30 + cursor = @replay_events ? last_cursor : nil 31 + 32 + @sky = Skyfall::Stream.new(@service, :subscribe_repos, cursor) 27 33 28 34 @sky.on_message do |m| 29 35 handle_message(m) 30 36 end 31 37 32 38 if @log_status 33 - @sky.on_connect { puts "Connected #{Time.now} ✓" } 39 + @sky.on_connect { @replaying = !!(cursor); puts "Connected #{Time.now} ✓" } 34 40 @sky.on_disconnect { puts; puts "Disconnected #{Time.now}" } 35 41 @sky.on_reconnect { puts "Reconnecting..." } 36 42 @sky.on_error { |e| puts "ERROR: #{Time.now} #{e}" } ··· 44 50 @sky = nil 45 51 end 46 52 53 + def load_or_init_cursor 54 + if sub = Subscription.find_by(service: @service) 55 + sub.cursor 56 + else 57 + Subscription.create!(service: @service, cursor: 0) 58 + nil 59 + end 60 + end 61 + 62 + def save_cursor(cursor) 63 + Subscription.where(service: @service).update_all(cursor: cursor) 64 + end 65 + 47 66 def handle_message(msg) 67 + if msg.seq % 10 == 0 68 + save_cursor(msg.seq) 69 + end 70 + 71 + if @replaying 72 + puts "Replaying events since #{msg.time.getlocal} -->" 73 + @replaying = false 74 + end 75 + 48 76 return if msg.type != :commit 49 77 50 78 msg.operations.each do |op|
+5
app/models/subscription.rb
··· 1 + require 'active_record' 2 + 3 + class Subscription < ActiveRecord::Base 4 + validates_presence_of :service, :cursor 5 + end
+10
db/migrate/20230727134424_add_subscriptions.rb
··· 1 + class AddSubscriptions < ActiveRecord::Migration[6.1] 2 + def change 3 + create_table :subscriptions do |t| 4 + t.string :service, null: false 5 + t.integer :cursor, null: false 6 + end 7 + 8 + add_index :subscriptions, :service, unique: true 9 + end 10 + end
+7 -1
db/schema.rb
··· 10 10 # 11 11 # It's strongly recommended that you check this file into your version control system. 12 12 13 - ActiveRecord::Schema.define(version: 2023_06_15_155215) do 13 + ActiveRecord::Schema.define(version: 2023_07_27_134424) do 14 14 15 15 create_table "feed_posts", force: :cascade do |t| 16 16 t.integer "feed_id", null: false ··· 26 26 t.text "data", null: false 27 27 t.string "rkey", null: false 28 28 t.index ["rkey"], name: "index_posts_on_rkey" 29 + end 30 + 31 + create_table "subscriptions", force: :cascade do |t| 32 + t.string "service", null: false 33 + t.integer "cursor", null: false 34 + t.index ["service"], name: "index_subscriptions_on_service", unique: true 29 35 end 30 36 31 37 end
+8
firehose.rb
··· 27 27 puts " -da = save all posts to database" 28 28 puts " -dm = save only matching posts to database" 29 29 puts " -nd = don't save any posts" 30 + puts 31 + puts " * Replaying missed events: [default: -nr in development, -r in production]" 32 + puts " -r = pass a cursor param when connecting to replay any missed events" 33 + puts " -nr = don't replay missed events" 30 34 end 31 35 32 36 firehose = FirehoseStream.new ··· 51 55 firehose.save_posts = :all 52 56 when '-nd' 53 57 firehose.save_posts = false 58 + when '-r' 59 + firehose.replay_events = true 60 + when '-nr' 61 + firehose.replay_events = false 54 62 when '-h', '--help' 55 63 print_help 56 64 exit 0