Template of a custom feed generator service for the Bluesky network in Ruby
1require 'json' 2require 'sinatra/activerecord' 3require 'skyfall' 4 5require_relative 'config' 6require_relative 'models/feed_post' 7require_relative 'models/post' 8require_relative 'models/subscription' 9 10class FirehoseStream 11 attr_accessor :start_cursor, :show_progress, :log_status, :log_posts, :save_posts, :replay_events 12 13 DEFAULT_JETSTREAM = 'jetstream2.us-east.bsky.network' 14 POSTS_BATCH_SIZE = 100 15 16 def initialize(service = nil) 17 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 18 @service = service || DEFAULT_JETSTREAM 19 20 @show_progress = (@env == :development) ? true : false 21 @log_status = true 22 @log_posts = (@env == :development) ? :matching : false 23 @save_posts = (@env == :development) ? :all : :matching 24 @replay_events = (@env == :development) ? false : true 25 26 @feeds = BlueFactory.all_feeds.select(&:is_updating?) 27 @post_queue = [] 28 end 29 30 def start 31 return if @sky 32 33 last_cursor = load_or_init_cursor 34 cursor = @replay_events ? (@start_cursor || last_cursor) : nil 35 36 @sky = sky = Skyfall::Jetstream.new(@service, { 37 cursor: cursor, 38 39 # we ask Jetstream to only send us post records, since we don't need anything else 40 # if you need to process e.g. likes or follows too, update or remove this param 41 wanted_collections: ['app.bsky.feed.post'], 42 }) 43 44 # set your user agent here to identify yourself on the relay 45 # @sky.user_agent = "My Feed Server (@my.handle) #{@sky.version_string}" 46 47 @sky.check_heartbeat = true 48 49 @sky.on_message do |m| 50 process_message(m) 51 end 52 53 if @log_status 54 @sky.on_connecting { |u| log "Connecting to #{u}..." } 55 56 @sky.on_connect { 57 @replaying = !!(cursor) 58 log "Connected ✓" 59 } 60 61 @sky.on_disconnect { 62 log "Disconnected." 63 save_cursor(sky.cursor) 64 } 65 66 @sky.on_timeout { log "Trying to reconnect..." } 67 @sky.on_reconnect { log "Connection lost, reconnecting..." } 68 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" } 69 end 70 71 @sky.connect 72 end 73 74 def stop 75 save_queued_posts 76 @sky&.disconnect 77 @sky = nil 78 end 79 80 def load_or_init_cursor 81 if sub = Subscription.find_by(service: @service) 82 sub.cursor 83 else 84 Subscription.create!(service: @service, cursor: 0) 85 nil 86 end 87 end 88 89 def save_cursor(cursor) 90 Subscription.where(service: @service).update_all(cursor: cursor) 91 end 92 93 def process_message(msg) 94 if msg.type == :info 95 # AtProto error, the only one right now is "OutdatedCursor" 96 log "InfoMessage: #{msg}" 97 98 elsif msg.type == :identity 99 # use these events if you want to track handle changes: 100 # log "Handle change: #{msg.repo} => #{msg.handle}" 101 102 elsif msg.type == :account 103 # tracking account status changes, e.g. suspensions, deactivations and deletes 104 process_account_message(msg) 105 106 elsif msg.is_a?(Skyfall::Firehose::UnknownMessage) 107 log "Unknown message type: #{msg.type} (#{msg.seq})" 108 end 109 110 return unless msg.type == :commit 111 112 if @replaying 113 log "Replaying events since #{msg.time.getlocal} -->" 114 @replaying = false 115 end 116 117 if msg.seq % 10 == 0 118 save_cursor(msg.seq) 119 end 120 121 msg.operations.each do |op| 122 case op.type 123 when :bsky_post 124 process_post(msg, op) 125 126 when :bsky_like, :bsky_repost 127 # if you want to use the number of likes and/or reposts for filtering or sorting: 128 # add a likes/reposts column to feeds, then do +1 / -1 here depending on op.action 129 130 when :bsky_follow 131 # if you want to make a personalized feed that needs info about given user's follows/followers: 132 # add a followers table, then add/remove records here depending on op.action 133 134 else 135 # other types like :bsky_block, :bsky_profile (includes profile edits) 136 end 137 end 138 end 139 140 def process_account_message(msg) 141 if msg.status == :deleted 142 # delete all data we have stored about this account 143 FeedPost.joins(:post).where(post: { repo: msg.did }).delete_all 144 Post.where(repo: msg.did).delete_all 145 end 146 end 147 148 def process_post(msg, op) 149 if op.action == :delete 150 if post = Post.find_by_repo_rkey(op.repo, op.rkey) 151 post.destroy 152 end 153 end 154 155 return unless op.action == :create 156 157 begin 158 if op.raw_record.nil? 159 log "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})" 160 return 161 end 162 rescue CBOR::UnpackError => e 163 log "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}" 164 return 165 end 166 167 # ignore posts with past date from Twitter etc. imported using some kind of tool 168 begin 169 post_time = Time.parse(op.raw_record['createdAt']) 170 return if post_time < msg.time - 86400 171 rescue StandardError => e 172 log "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})" 173 return 174 end 175 176 text = op.raw_record['text'] 177 178 # to save space, delete redundant post text and type from the saved data JSON 179 trimmed_record = op.raw_record.dup 180 trimmed_record.delete('$type') 181 trimmed_record.delete('text') 182 trimmed_json = JSON.generate(trimmed_record) 183 184 # tip: if you don't need full record data for debugging, delete the data column in posts 185 post = Post.new( 186 repo: op.repo, 187 time: msg.time, 188 text: text, 189 rkey: op.rkey, 190 data: trimmed_json, 191 record: op.raw_record 192 ) 193 194 if !post.valid? 195 if post.errors.has_key?(:data) 196 post.trim_too_long_data 197 end 198 199 if !post.valid? 200 log "Error: post is invalid: #{op.uri} (#{msg.seq}): #{post.errors.to_a.join(', ')}" 201 return 202 end 203 end 204 205 matched = false 206 207 @feeds.each do |feed| 208 if feed.post_matches?(post) 209 post.feed_posts.build(feed_id: feed.feed_id, time: msg.time) unless !@save_posts 210 matched = true 211 end 212 end 213 214 if @log_posts == :all || @log_posts && matched 215 puts 216 puts text 217 end 218 219 if @save_posts == :all || @save_posts && matched 220 @post_queue << post 221 end 222 223 # wait until we have 100 posts and then save them all in one insert, if possible 224 if @post_queue.length >= POSTS_BATCH_SIZE 225 save_queued_posts 226 end 227 228 print '.' if @show_progress && @log_posts != :all 229 rescue StandardError => e 230 log "Error in #process_post: #{e}" 231 232 unless e.message == "nesting of 100 is too deep" 233 log msg.inspect 234 log e.backtrace.reject { |x| x.include?('/ruby/') } 235 end 236 end 237 238 def save_queued_posts 239 # we can only squash posts into one insert statement if they don't have nested feed_posts 240 # so we save those without feed_posts first: 241 242 matched, unmatched = @post_queue.partition { |x| x.feed_posts.length > 0 } 243 244 if unmatched.length > 0 245 values = unmatched.map { |p| p.attributes.except('id') } 246 Post.insert_all(values) 247 end 248 249 @post_queue = matched 250 return if @post_queue.empty? 251 252 # and for those that do have feed_posts, we save them normally, in one transaction: 253 254 ActiveRecord::Base.transaction do 255 @post_queue.each do |p| 256 # skip validations since we've checked the posts before adding them to the queue 257 p.save!(validate: false) 258 end 259 end 260 261 @post_queue = [] 262 rescue StandardError => e 263 # there shouldn't be any ActiveRecord errors raised, but SQLite might find some issues which 264 # aren't covered by AR validations; so in that case, try to save any valid posts one by one: 265 266 @post_queue.each do |p| 267 begin 268 ActiveRecord::Base.transaction do 269 p.save!(validate: false) 270 end 271 rescue StandardError => e 272 log "Error in #save_queued_posts: #{e}" 273 274 unless e.message == "nesting of 100 is too deep" 275 log p.inspect 276 log e.backtrace.reject { |x| x.include?('/ruby/') } 277 end 278 end 279 end 280 281 @post_queue = [] 282 end 283 284 def log(text) 285 puts if @show_progress 286 puts "[#{Time.now}] #{text}" 287 end 288 289 def inspect 290 vars = instance_variables - [:@feeds, :@timer] 291 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 292 "#<#{self.class}:0x#{object_id} #{values}>" 293 end 294end