Template of a custom feed generator service for the Bluesky network in Ruby
at 0.4 8.7 kB view raw
1require 'json' 2require 'sinatra/activerecord' 3require 'skyfall' 4 5require_relative 'config' 6require_relative 'models/feed_post' 7require_relative 'models/post' 8require_relative 'models/subscription' 9require_relative 'utils' 10 11class FirehoseStream 12 attr_accessor :start_cursor, :show_progress, :log_status, :log_posts, :save_posts, :replay_events 13 14 DEFAULT_JETSTREAM = 'jetstream2.us-east.bsky.network' 15 POSTS_BATCH_SIZE = 100 16 17 include Utils 18 19 def initialize(service = nil) 20 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 21 @service = service || DEFAULT_JETSTREAM 22 23 @show_progress = (@env == :development) ? true : false 24 @log_status = true 25 @log_posts = (@env == :development) ? :matching : false 26 @save_posts = (@env == :development) ? :all : :matching 27 @replay_events = (@env == :development) ? false : true 28 29 @feeds = BlueFactory.all_feeds.select(&:is_updating?) 30 @post_queue = [] 31 end 32 33 def start 34 return if @sky 35 36 last_cursor = load_or_init_cursor 37 cursor = @replay_events ? (@start_cursor || last_cursor) : nil 38 39 @sky = sky = Skyfall::Jetstream.new(@service, { 40 cursor: cursor, 41 42 # we ask Jetstream to only send us post records, since we don't need anything else 43 # if you need to process e.g. likes or follows too, update or remove this param 44 wanted_collections: ['app.bsky.feed.post'], 45 }) 46 47 # set your user agent here to identify yourself on the relay 48 # @sky.user_agent = "My Feed Server (@my.handle) #{@sky.version_string}" 49 50 @sky.check_heartbeat = true 51 52 @sky.on_message do |m| 53 process_message(m) 54 end 55 56 if @log_status 57 @sky.on_connecting { |u| log "Connecting to #{u}..." } 58 59 @sky.on_connect { 60 @message_counter = 0 61 @replaying = !!(cursor) 62 log "Connected ✓" 63 } 64 65 @sky.on_disconnect { 66 log "Disconnected." 67 } 68 69 @sky.on_reconnect { 70 log "Connection lost, reconnecting..." 71 } 72 73 @sky.on_timeout { 74 log "Trying to reconnect..." 75 } 76 77 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" } 78 end 79 80 @sky.connect 81 end 82 83 def stop 84 save_queued_posts 85 save_cursor(@sky.cursor) unless @sky.nil? 86 87 @sky&.disconnect 88 @sky = nil 89 end 90 91 def load_or_init_cursor 92 if sub = Subscription.find_by(service: @service) 93 sub.cursor 94 else 95 Subscription.create!(service: @service, cursor: 0) 96 nil 97 end 98 end 99 100 def save_cursor(cursor) 101 Subscription.where(service: @service).update_all(cursor: cursor) 102 end 103 104 def process_message(msg) 105 if msg.type == :info 106 # ATProto error, the only one right now is "OutdatedCursor" 107 log "InfoMessage: #{msg}" 108 109 elsif msg.type == :identity 110 # use these events if you want to track handle changes: 111 # log "Handle change: #{msg.repo} => #{msg.handle}" 112 113 elsif msg.type == :account 114 # tracking account status changes, e.g. suspensions, deactivations and deletes 115 process_account_message(msg) 116 117 elsif msg.unknown? 118 log "Unknown message type: #{msg.type} (#{msg.seq})" 119 end 120 121 return unless msg.type == :commit 122 123 if @replaying 124 log "Replaying events since #{msg.time.getlocal} -->" 125 @replaying = false 126 end 127 128 @message_counter += 1 129 130 if @message_counter % 100 == 0 131 # save current cursor every 100 events 132 save_cursor(msg.seq) 133 end 134 135 msg.operations.each do |op| 136 case op.type 137 when :bsky_post 138 process_post(msg, op) 139 140 when :bsky_like, :bsky_repost 141 # if you want to use the number of likes and/or reposts for filtering or sorting: 142 # add a likes/reposts table, then add/remove records here depending on op.action 143 # (you'll need to track like records and not just have a single numeric "likes" field, 144 # because delete events only include the uri/rkey of the like, not of the liked post) 145 146 when :bsky_follow 147 # if you want to make a personalized feed that needs info about given user's follows/followers: 148 # add a followers table, then add/remove records here depending on op.action 149 150 else 151 # other types like :bsky_block, :bsky_profile (includes profile edits) 152 end 153 end 154 end 155 156 def process_account_message(msg) 157 if msg.status == :deleted 158 # delete all data we have stored about this account 159 FeedPost.joins(:post).where(post: { repo: msg.did }).delete_all 160 Post.where(repo: msg.did).delete_all 161 end 162 end 163 164 def process_post(msg, op) 165 if op.action == :delete 166 if post = Post.find_by_repo_rkey(op.repo, op.rkey) 167 post.destroy 168 end 169 end 170 171 return unless op.action == :create 172 173 begin 174 if op.raw_record.nil? 175 log "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})" 176 return 177 end 178 rescue CBOR::UnpackError => e 179 log "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}" 180 return 181 end 182 183 # ignore posts with past date from Twitter etc. imported using some kind of tool 184 begin 185 post_time = Time.parse(op.raw_record['createdAt']) 186 return if post_time < msg.time - 86400 187 rescue StandardError => e 188 log "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})" 189 return 190 end 191 192 record = op.raw_record 193 text = record['text'] 194 195 # to save space, delete redundant post text and type from the saved data JSON 196 record.delete('$type') 197 record.delete('text') 198 trimmed_json = JSON.generate(record) 199 200 # tip: if you don't need full record data for debugging, delete the data column in posts 201 post = Post.new( 202 repo: op.repo, 203 time: msg.time, 204 text: text, 205 rkey: op.rkey, 206 data: trimmed_json, 207 record: record 208 ) 209 210 if !post.valid? 211 if post.errors.has_key?(:data) 212 post.trim_too_long_data 213 end 214 215 if !post.valid? 216 log "Error: post is invalid: #{op.uri} (#{msg.seq}): #{post.errors.to_a.join(', ')}" 217 return 218 end 219 end 220 221 matched = false 222 223 @feeds.each do |feed| 224 if feed.post_matches?(post) 225 post.feed_posts.build(feed_id: feed.feed_id, time: msg.time) unless !@save_posts 226 matched = true 227 end 228 end 229 230 if @log_posts == :all || @log_posts && matched 231 puts 232 puts text 233 end 234 235 if @save_posts == :all 236 # wait until we have 100 posts and then save them all in one insert, if possible 237 @post_queue << post 238 239 if @post_queue.length >= POSTS_BATCH_SIZE 240 save_queued_posts 241 end 242 elsif @save_posts == :matching && matched 243 # save immediately because matched posts might be rare; we've already checked validations 244 post.save!(validate: false) 245 end 246 247 print '.' if @show_progress && @log_posts != :all 248 rescue StandardError => e 249 log "Error in #process_post: #{e}" 250 251 unless e.message == "nesting of 100 is too deep" 252 log msg.inspect 253 log e.backtrace.reject { |x| x.include?('/ruby/') } 254 end 255 end 256 257 def save_queued_posts 258 # we can only squash posts into one insert statement if they don't have nested feed_posts 259 # so we save those without feed_posts first: 260 261 matched, unmatched = @post_queue.partition { |x| x.feed_posts.length > 0 } 262 263 if unmatched.length > 0 264 values = unmatched.map { |p| p.attributes.except('id') } 265 Post.insert_all(values) 266 end 267 268 @post_queue = matched 269 return if @post_queue.empty? 270 271 # and for those that do have feed_posts, we save them normally, in one transaction: 272 273 ActiveRecord::Base.transaction do 274 @post_queue.each do |p| 275 # skip validations since we've checked the posts before adding them to the queue 276 p.save!(validate: false) 277 end 278 end 279 280 @post_queue = [] 281 rescue StandardError => e 282 # there shouldn't be any ActiveRecord errors raised, but SQLite might find some issues which 283 # aren't covered by AR validations; so in that case, try to save any valid posts one by one: 284 285 @post_queue.each do |p| 286 begin 287 ActiveRecord::Base.transaction do 288 p.save!(validate: false) 289 end 290 rescue StandardError => e 291 log "Error in #save_queued_posts: #{e}" 292 293 unless e.message == "nesting of 100 is too deep" 294 log p.inspect 295 log e.backtrace.reject { |x| x.include?('/ruby/') } 296 end 297 end 298 end 299 300 @post_queue = [] 301 end 302 303 def log(text) 304 puts if @show_progress 305 puts "[#{Time.now}] #{text}" 306 end 307 308 def inspect 309 vars = instance_variables - [:@feeds] 310 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 311 "#<#{self.class}:0x#{object_id} #{values}>" 312 end 313end