Don't forget to lycansubscribe
1require 'skyfall' 2 3require_relative 'init' 4require_relative 'models/post' 5require_relative 'models/subscription' 6require_relative 'models/user' 7 8class FirehoseClient 9 attr_accessor :start_cursor, :service 10 11 DEFAULT_RELAY = 'bsky.network' 12 13 def initialize 14 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 15 @service = DEFAULT_RELAY 16 end 17 18 def start 19 return if @sky 20 21 @active_users = load_users 22 23 log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})" 24 25 last_cursor = load_or_init_cursor 26 cursor = @start_cursor || last_cursor 27 28 @sky = Skyfall::Firehose.new(@service, :subscribe_repos, cursor) 29 @sky.user_agent = "Lycan (https://tangled.sh/@mackuba.eu/lycan) #{@sky.version_string}" 30 @sky.check_heartbeat = true 31 32 @sky.on_message do |m| 33 start_time = Time.now 34 diff = start_time - @last_update 35 36 if diff > 30 37 log "Receiving messages again after #{sprintf('%.1f', diff)}s, starting from #{m.time.getlocal}" 38 end 39 40 @last_update = start_time 41 process_message(m) 42 end 43 44 @sky.on_connecting { |u| log "Connecting to #{u}..." } 45 @sky.on_connect { 46 log "Connected ✓" 47 48 @replaying = true 49 @last_update = Time.now 50 51 @timer ||= EM::PeriodicTimer.new(20) do 52 now = Time.now 53 diff = now - @last_update 54 55 if diff > 30 56 log "Timer: last update #{sprintf('%.1f', diff)}s ago" 57 end 58 end 59 } 60 61 @sky.on_disconnect { 62 log "Disconnected." 63 } 64 65 @sky.on_reconnect { 66 log "Connection lost, reconnecting..." 67 68 @timer&.cancel 69 @timer = nil 70 } 71 72 @sky.on_timeout { 73 log "Trying to reconnect..." 74 } 75 76 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" } 77 78 @sky.connect 79 end 80 81 def stop 82 save_cursor(@sky.cursor) unless @sky.nil? 83 84 @sky&.disconnect 85 @sky = nil 86 end 87 88 def load_or_init_cursor 89 if sub = Subscription.find_by(service: @service) 90 sub.cursor 91 else 92 Subscription.create!(service: @service, cursor: 0) 93 nil 94 end 95 end 96 97 def save_cursor(cursor) 98 Subscription.where(service: @service).update_all(cursor: cursor) 99 end 100 101 def load_users 102 User.active.map { |u| [u.did, u] }.then { |list| Hash[list] } 103 end 104 105 def process_message(msg) 106 save_cursor(msg.seq) if msg.seq % 1000 == 0 107 108 case msg.type 109 when :info 110 log "InfoMessage: #{msg}" 111 when :account 112 process_account_event(msg) 113 when :commit 114 if @replaying 115 log "Replaying events since #{msg.time.getlocal} -->" 116 @replaying = false 117 end 118 119 @current_user = @active_users[msg.repo] 120 121 msg.operations.each do |op| 122 case op.type 123 when :bsky_like 124 process_like(msg, op) 125 when :bsky_repost 126 process_repost(msg, op) 127 when :bsky_post 128 process_post(msg, op) 129 end 130 end 131 end 132 end 133 134 def process_account_event(msg) 135 if msg.status == :deleted 136 # ... 137 end 138 end 139 140 def process_like(msg, op) 141 return unless @current_user 142 143 if op.action == :create 144 @current_user.likes.import_from_record(op.uri, op.raw_record) 145 elsif op.action == :delete 146 @current_user.likes.where(rkey: op.rkey).delete_all 147 end 148 end 149 150 def process_repost(msg, op) 151 return unless @current_user 152 153 if op.action == :create 154 @current_user.reposts.import_from_record(op.uri, op.raw_record) 155 elsif op.action == :delete 156 @current_user.reposts.where(rkey: op.rkey).delete_all 157 end 158 end 159 160 def process_post(msg, op) 161 if op.action == :create 162 if @current_user 163 @current_user.quotes.import_from_record(op.uri, op.raw_record) 164 @current_user.pins.import_from_record(op.uri, op.raw_record) 165 end 166 elsif op.action == :delete 167 if @current_user 168 @current_user.quotes.where(rkey: op.rkey).delete_all 169 @current_user.pins.where(rkey: op.rkey).delete_all 170 end 171 172 if post = Post.find_by_at_uri(op.uri) 173 post.destroy 174 end 175 end 176 end 177 178 def log(text) 179 puts "[#{Time.now}] #{text}" 180 end 181 182 def inspect 183 vars = instance_variables - [:@timer] 184 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 185 "#<#{self.class}:0x#{object_id} #{values}>" 186 end 187end