Don't forget to lycansubscribe
1require 'skyfall' 2 3require_relative 'init' 4require_relative 'models/import_job' 5require_relative 'models/post' 6require_relative 'models/subscription' 7require_relative 'models/user' 8 9class FirehoseClient 10 attr_accessor :start_cursor, :service 11 12 DEFAULT_RELAY = 'bsky.network' 13 14 def initialize 15 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 16 @service = DEFAULT_RELAY 17 end 18 19 def start 20 return if @sky 21 22 @active_users = load_users 23 24 log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})" 25 26 last_cursor = load_or_init_cursor 27 cursor = @start_cursor || last_cursor 28 29 @sky = Skyfall::Firehose.new(@service, :subscribe_repos, cursor) 30 @sky.user_agent = "Lycan (https://tangled.sh/@mackuba.eu/lycan) #{@sky.version_string}" 31 @sky.check_heartbeat = true 32 33 @sky.on_message do |m| 34 start_time = Time.now 35 diff = start_time - @last_update 36 37 if diff > 30 38 log "Receiving messages again after #{sprintf('%.1f', diff)}s, starting from #{m.time.getlocal}" 39 end 40 41 @last_update = start_time 42 process_message(m) 43 end 44 45 @sky.on_connecting { |u| log "Connecting to #{u}..." } 46 @sky.on_connect { 47 log "Connected ✓" 48 49 @replaying = true 50 @last_update = Time.now 51 52 @live_check_timer ||= EM::PeriodicTimer.new(20) do 53 now = Time.now 54 diff = now - @last_update 55 56 if diff > 30 57 log "Timer: last update #{sprintf('%.1f', diff)}s ago" 58 end 59 end 60 61 @jobs_timer ||= EM::PeriodicTimer.new(3) do 62 ImportJob.all.each do |job| 63 @active_users[job.user.did] = job.user 64 job.create_imports 65 job.destroy 66 end 67 end 68 } 69 70 @sky.on_disconnect { 71 log "Disconnected." 72 } 73 74 @sky.on_reconnect { 75 log "Connection lost, reconnecting..." 76 77 @timer&.cancel 78 @timer = nil 79 } 80 81 @sky.on_timeout { 82 log "Trying to reconnect..." 83 } 84 85 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" } 86 87 @sky.connect 88 end 89 90 def stop 91 save_cursor(@sky.cursor) unless @sky.nil? 92 93 @sky&.disconnect 94 @sky = nil 95 end 96 97 def load_or_init_cursor 98 if sub = Subscription.find_by(service: @service) 99 sub.cursor 100 else 101 Subscription.create!(service: @service, cursor: 0) 102 nil 103 end 104 end 105 106 def save_cursor(cursor) 107 Subscription.where(service: @service).update_all(cursor: cursor) 108 end 109 110 def load_users 111 User.active.map { |u| [u.did, u] }.then { |list| Hash[list] } 112 end 113 114 def process_message(msg) 115 save_cursor(msg.seq) if msg.seq % 1000 == 0 116 117 case msg.type 118 when :info 119 log "InfoMessage: #{msg}" 120 when :account 121 process_account_event(msg) 122 when :commit 123 if @replaying 124 log "Replaying events since #{msg.time.getlocal} -->" 125 @replaying = false 126 end 127 128 @current_user = @active_users[msg.repo] 129 130 msg.operations.each do |op| 131 case op.type 132 when :bsky_like 133 process_like(msg, op) 134 when :bsky_repost 135 process_repost(msg, op) 136 when :bsky_post 137 process_post(msg, op) 138 end 139 end 140 end 141 end 142 143 def process_account_event(msg) 144 if msg.status == :deleted 145 if user = User.find_by(did: msg.repo) 146 user.destroy 147 @active_users.delete_if { |u| u.id == user.id } 148 end 149 end 150 end 151 152 def process_like(msg, op) 153 return unless @current_user 154 155 if op.action == :create 156 @current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose) 157 elsif op.action == :delete 158 @current_user.likes.where(rkey: op.rkey).delete_all 159 end 160 end 161 162 def process_repost(msg, op) 163 return unless @current_user 164 165 if op.action == :create 166 @current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose) 167 elsif op.action == :delete 168 @current_user.reposts.where(rkey: op.rkey).delete_all 169 end 170 end 171 172 def process_post(msg, op) 173 if op.action == :create 174 if @current_user 175 @current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose) 176 @current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose) 177 end 178 elsif op.action == :delete 179 if @current_user 180 @current_user.quotes.where(rkey: op.rkey).delete_all 181 @current_user.pins.where(rkey: op.rkey).delete_all 182 end 183 184 if post = Post.find_by_at_uri(op.uri) 185 post.destroy 186 end 187 end 188 end 189 190 def log(text) 191 puts "[#{Time.now}] #{text}" 192 end 193 194 def inspect 195 vars = instance_variables - [:@jobs_timer, :@live_check_timer] 196 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 197 "#<#{self.class}:0x#{object_id} #{values}>" 198 end 199end