Template of a custom feed generator service for the Bluesky network in Ruby
at 0.2 4.2 kB view raw
1require 'json' 2require 'sinatra/activerecord' 3require 'skyfall' 4 5require_relative 'config' 6require_relative 'models/feed_post' 7require_relative 'models/post' 8require_relative 'models/subscription' 9 10class FirehoseStream 11 attr_accessor :show_progress, :log_status, :log_posts, :save_posts, :replay_events 12 13 def initialize 14 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 15 @service = 'bsky.social' 16 17 @show_progress = (@env == :development) ? true : false 18 @log_status = true 19 @log_posts = (@env == :development) ? :matching : false 20 @save_posts = (@env == :development) ? :all : :matching 21 @replay_events = (@env == :development) ? false : true 22 23 @feeds = BlueFactory.all_feeds.select(&:is_updating?) 24 end 25 26 def start 27 return if @sky 28 29 last_cursor = load_or_init_cursor 30 cursor = @replay_events ? last_cursor : nil 31 32 @sky = Skyfall::Stream.new(@service, :subscribe_repos, cursor) 33 34 @sky.on_message do |m| 35 handle_message(m) 36 end 37 38 if @log_status 39 @sky.on_connecting { |u| puts "Connecting to #{u}..." } 40 @sky.on_connect { 41 @replaying = !!(cursor) 42 puts "Connected #{Time.now}" 43 } 44 @sky.on_disconnect { puts; puts "Disconnected #{Time.now}" } 45 @sky.on_reconnect { puts "Connection lost, reconnecting..." } 46 @sky.on_error { |e| puts "ERROR: #{Time.now} #{e.class} #{e.message}" } 47 end 48 49 @sky.connect 50 end 51 52 def stop 53 @sky&.disconnect 54 @sky = nil 55 end 56 57 def load_or_init_cursor 58 if sub = Subscription.find_by(service: @service) 59 sub.cursor 60 else 61 Subscription.create!(service: @service, cursor: 0) 62 nil 63 end 64 end 65 66 def save_cursor(cursor) 67 Subscription.where(service: @service).update_all(cursor: cursor) 68 end 69 70 def handle_message(msg) 71 if msg.is_a?(Skyfall::InfoMessage) 72 # AtProto error, the only one right now is "OutdatedCursor" 73 puts "InfoMessage: #{msg}" 74 elsif msg.is_a?(Skyfall::HandleMessage) 75 # use these events if you want to track handle changes: 76 # puts "Handle change: #{msg.repo} => #{msg.handle}" 77 elsif msg.is_a?(Skyfall::UnknownMessage) 78 puts "Unknown message type: #{msg.type}" 79 end 80 81 return unless msg.type == :commit 82 83 if @replaying 84 puts "Replaying events since #{msg.time.getlocal} -->" 85 @replaying = false 86 end 87 88 if msg.seq % 10 == 0 89 save_cursor(msg.seq) 90 end 91 92 msg.operations.each do |op| 93 case op.type 94 when :bsky_post 95 process_post(msg, op) 96 97 when :bsky_like, :bsky_repost 98 # if you want to use the number of likes and/or reposts for filtering or sorting: 99 # add a likes/reposts column to feeds, then do +1 / -1 here depending on op.action 100 101 when :bsky_follow 102 # if you want to make a personalized feed that needs info about given user's follows/followers: 103 # add a followers table, then add/remove records here depending on op.action 104 105 else 106 # other types like :bsky_block, :bsky_profile (includes profile edits) 107 end 108 end 109 end 110 111 def process_post(msg, op) 112 if op.action == :delete 113 if post = Post.find_by(repo: op.repo, rkey: op.rkey) 114 post.destroy 115 end 116 end 117 118 return unless op.action == :create 119 120 begin 121 text = op.raw_record['text'] 122 123 # tip: if you don't need full record data for debugging, delete the data column in posts 124 post = Post.new( 125 repo: op.repo, 126 time: msg.time, 127 text: text, 128 rkey: op.rkey, 129 data: JSON.generate(op.raw_record), 130 record: op.raw_record 131 ) 132 133 matched = false 134 135 @feeds.each do |feed| 136 if feed.post_matches?(post) 137 FeedPost.create!(feed_id: feed.feed_id, post: post, time: msg.time) unless !@save_posts 138 matched = true 139 end 140 end 141 142 if @log_posts == :all || @log_posts && matched 143 puts 144 puts text 145 end 146 147 post.save! if @save_posts == :all 148 rescue StandardError => e 149 puts "Error: #{e}" 150 p msg unless @env == :production || e.message == "nesting of 100 is too deep" 151 end 152 153 print '.' if @show_progress && @log_posts != :all 154 end 155end