Don't forget to lycansubscribe
1require 'skyfall' 2 3require_relative 'init' 4require_relative 'models/import_job' 5require_relative 'models/post' 6require_relative 'models/subscription' 7require_relative 'models/user' 8 9class FirehoseClient 10 attr_accessor :start_cursor, :service 11 12 DEFAULT_RELAY = 'bsky.network' 13 14 def initialize 15 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 16 17 if ENV['RELAY_HOST'] 18 @service = ENV['RELAY_HOST'] 19 elsif ENV['JETSTREAM_HOST'] 20 @service = ENV['JETSTREAM_HOST'] 21 @jetstream = true 22 else 23 @service = DEFAULT_RELAY 24 end 25 end 26 27 def start 28 return if @sky 29 30 @active_users = load_users 31 32 log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})" 33 34 last_cursor = load_or_init_cursor 35 cursor = @start_cursor || last_cursor 36 37 @sky = if @jetstream 38 Skyfall::Jetstream.new(@service, { cursor: cursor, wanted_collections: [:bsky_post, :bsky_like, :bsky_repost] }) 39 else 40 Skyfall::Firehose.new(@service, :subscribe_repos, cursor) 41 end 42 43 @sky.user_agent = (ENV['FIREHOSE_USER_AGENT'] || "Lycan (https://tangled.sh/@mackuba.eu/lycan)") + ' ' + @sky.version_string 44 @sky.check_heartbeat = true 45 46 @sky.on_message do |m| 47 start_time = Time.now 48 diff = start_time - @last_update 49 50 if diff > 30 51 log "Receiving messages again after #{sprintf('%.1f', diff)}s, starting from #{m.time.getlocal}" 52 end 53 54 @last_update = start_time 55 process_message(m) 56 end 57 58 @sky.on_connecting { |u| log "Connecting to #{u}..." } 59 @sky.on_connect { 60 log "Connected ✓" 61 62 @replaying = true 63 @last_update = Time.now 64 65 @live_check_timer ||= EM::PeriodicTimer.new(20) do 66 now = Time.now 67 diff = now - @last_update 68 69 if diff > 30 70 log "Timer: last update #{sprintf('%.1f', diff)}s ago" 71 end 72 end 73 74 @jobs_timer ||= EM::PeriodicTimer.new(3) do 75 ImportJob.all.each do |job| 76 @active_users[job.user.did] = job.user 77 job.create_imports 78 job.destroy 79 end 80 end 81 } 82 83 @sky.on_disconnect { 84 log "Disconnected." 85 } 86 87 @sky.on_reconnect { 88 log "Connection lost, reconnecting..." 89 90 @timer&.cancel 91 @timer = nil 92 } 93 94 @sky.on_timeout { 95 log "Trying to reconnect..." 96 } 97 98 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" } 99 100 @sky.connect 101 end 102 103 def stop 104 save_cursor(@sky.cursor) unless @sky.nil? 105 106 @sky&.disconnect 107 @sky = nil 108 end 109 110 def load_or_init_cursor 111 if sub = Subscription.find_by(service: @service) 112 sub.cursor 113 else 114 Subscription.create!(service: @service, cursor: 0) 115 nil 116 end 117 end 118 119 def save_cursor(cursor) 120 Subscription.where(service: @service).update_all(cursor: cursor) 121 end 122 123 def load_users 124 User.active.map { |u| [u.did, u] }.then { |list| Hash[list] } 125 end 126 127 def process_message(msg) 128 save_cursor(msg.seq) if msg.seq % 1000 == 0 129 130 case msg.type 131 when :info 132 log "InfoMessage: #{msg}" 133 when :account 134 process_account_event(msg) 135 when :commit 136 if @replaying 137 log "Replaying events since #{msg.time.getlocal} -->" 138 @replaying = false 139 end 140 141 @current_user = @active_users[msg.repo] 142 143 msg.operations.each do |op| 144 case op.type 145 when :bsky_like 146 process_like(msg, op) 147 when :bsky_repost 148 process_repost(msg, op) 149 when :bsky_post 150 process_post(msg, op) 151 end 152 end 153 end 154 end 155 156 def process_account_event(msg) 157 if msg.status == :deleted 158 if user = User.find_by(did: msg.repo) 159 user.destroy 160 @active_users.delete_if { |u| u.id == user.id } 161 end 162 end 163 end 164 165 def process_like(msg, op) 166 return unless @current_user 167 168 if op.action == :create 169 @current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose) 170 elsif op.action == :delete 171 @current_user.likes.where(rkey: op.rkey).delete_all 172 end 173 end 174 175 def process_repost(msg, op) 176 return unless @current_user 177 178 if op.action == :create 179 @current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose) 180 elsif op.action == :delete 181 @current_user.reposts.where(rkey: op.rkey).delete_all 182 end 183 end 184 185 def process_post(msg, op) 186 if op.action == :create 187 if @current_user 188 @current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose) 189 @current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose) 190 end 191 elsif op.action == :delete 192 if @current_user 193 @current_user.quotes.where(rkey: op.rkey).delete_all 194 @current_user.pins.where(rkey: op.rkey).delete_all 195 end 196 197 if post = Post.find_by_at_uri(op.uri) 198 post.destroy 199 end 200 end 201 end 202 203 def log(text) 204 puts "[#{Time.now}] #{text}" 205 end 206 207 def inspect 208 vars = instance_variables - [:@jobs_timer, :@live_check_timer] 209 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 210 "#<#{self.class}:0x#{object_id} #{values}>" 211 end 212end