require 'skyfall' require_relative 'init' require_relative 'models/import_job' require_relative 'models/post' require_relative 'models/subscription' require_relative 'models/user' class FirehoseClient attr_accessor :start_cursor, :service DEFAULT_RELAY = 'bsky.network' def initialize @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym if ENV['RELAY_HOST'] @service = ENV['RELAY_HOST'] elsif ENV['JETSTREAM_HOST'] @service = ENV['JETSTREAM_HOST'] @jetstream = true else @service = DEFAULT_RELAY end end def start return if @sky @active_users = load_users log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})" last_cursor = load_or_init_cursor cursor = @start_cursor || last_cursor @sky = if @jetstream Skyfall::Jetstream.new(@service, { cursor: cursor, wanted_collections: [:bsky_post, :bsky_like, :bsky_repost] }) else Skyfall::Firehose.new(@service, :subscribe_repos, cursor) end @sky.user_agent = (ENV['FIREHOSE_USER_AGENT'] || "Lycan (https://tangled.sh/@mackuba.eu/lycan)") + ' ' + @sky.version_string @sky.check_heartbeat = true @sky.on_message do |m| start_time = Time.now diff = start_time - @last_update if diff > 30 log "Receiving messages again after #{sprintf('%.1f', diff)}s, starting from #{m.time.getlocal}" end @last_update = start_time process_message(m) end @sky.on_connecting { |u| log "Connecting to #{u}..." } @sky.on_connect { log "Connected ✓" @replaying = true @last_update = Time.now @live_check_timer ||= EM::PeriodicTimer.new(20) do now = Time.now diff = now - @last_update if diff > 30 log "Timer: last update #{sprintf('%.1f', diff)}s ago" end end @jobs_timer ||= EM::PeriodicTimer.new(3) do ImportJob.all.each do |job| @active_users[job.user.did] = job.user job.create_imports job.destroy end end } @sky.on_disconnect { log "Disconnected." } @sky.on_reconnect { log "Connection lost, reconnecting..." @timer&.cancel @timer = nil } @sky.on_timeout { log "Trying to reconnect..." } @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" } @sky.connect end def stop save_cursor(@sky.cursor) unless @sky.nil? @sky&.disconnect @sky = nil end def load_or_init_cursor if sub = Subscription.find_by(service: @service) sub.cursor else Subscription.create!(service: @service, cursor: 0) nil end end def save_cursor(cursor) Subscription.where(service: @service).update_all(cursor: cursor) end def load_users User.active.map { |u| [u.did, u] }.then { |list| Hash[list] } end def process_message(msg) save_cursor(msg.seq) if msg.seq % 1000 == 0 case msg.type when :info log "InfoMessage: #{msg}" when :account process_account_event(msg) when :commit if @replaying log "Replaying events since #{msg.time.getlocal} -->" @replaying = false end @current_user = @active_users[msg.repo] msg.operations.each do |op| case op.type when :bsky_like process_like(msg, op) when :bsky_repost process_repost(msg, op) when :bsky_post process_post(msg, op) end end end rescue CBOR::UnpackError # ignore invalid records end def process_account_event(msg) if msg.status == :deleted if user = User.find_by(did: msg.repo) user.destroy @active_users.delete_if { |k, u| u.id == user.id } end end end def process_like(msg, op) return unless @current_user if op.action == :create return if op.raw_record.nil? @current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose) elsif op.action == :delete @current_user.likes.where(rkey: op.rkey).delete_all end rescue StandardError => e log "Error in #process_like (#{msg.seq}, #{op.uri}): #{e}" log e.backtrace.reject { |x| x.include?('/ruby/') } sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) end def process_repost(msg, op) return unless @current_user if op.action == :create return if op.raw_record.nil? @current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose) elsif op.action == :delete @current_user.reposts.where(rkey: op.rkey).delete_all end rescue StandardError => e log "Error in #process_repost (#{msg.seq}, #{op.uri}): #{e}" log e.backtrace.reject { |x| x.include?('/ruby/') } sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) end def process_post(msg, op) if op.action == :create return if op.raw_record.nil? if @current_user @current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose) @current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose) end elsif op.action == :delete if @current_user @current_user.quotes.where(rkey: op.rkey).delete_all @current_user.pins.where(rkey: op.rkey).delete_all end if post = Post.find_by_at_uri(op.uri) post.destroy end end rescue StandardError => e log "Error in #process_post (#{msg.seq}, #{op.uri}): #{e}" log e.backtrace.reject { |x| x.include?('/ruby/') } sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) end def log(text) puts "[#{Time.now}] #{text}" end def inspect vars = instance_variables - [:@jobs_timer, :@live_check_timer] values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") "#<#{self.class}:0x#{object_id} #{values}>" end end