Don't forget to lycansubscribe
1require 'skyfall' 2 3require_relative 'init' 4require_relative 'models/subscription' 5 6class FirehoseClient 7 attr_accessor :start_cursor, :service 8 9 DEFAULT_RELAY = 'bsky.network' 10 11 def initialize 12 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 13 @service = DEFAULT_RELAY 14 end 15 16 def start 17 return if @sky 18 19 log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})" 20 21 last_cursor = load_or_init_cursor 22 cursor = @start_cursor || last_cursor 23 24 @sky = Skyfall::Firehose.new(@service, :subscribe_repos, cursor) 25 @sky.user_agent = "Lycan (https://tangled.sh/@mackuba.eu/lycan) #{@sky.version_string}" 26 @sky.check_heartbeat = true 27 28 @sky.on_message do |m| 29 start_time = Time.now 30 diff = start_time - @last_update 31 32 if diff > 30 33 log "Receiving messages again after #{sprintf('%.1f', diff)}s, starting from #{m.time.getlocal}" 34 end 35 36 @last_update = start_time 37 process_message(m) 38 end 39 40 @sky.on_connecting { |u| log "Connecting to #{u}..." } 41 @sky.on_connect { 42 log "Connected ✓" 43 44 @replaying = true 45 @last_update = Time.now 46 47 @timer ||= EM::PeriodicTimer.new(20) do 48 now = Time.now 49 diff = now - @last_update 50 51 if diff > 30 52 log "Timer: last update #{sprintf('%.1f', diff)}s ago" 53 end 54 end 55 } 56 57 @sky.on_disconnect { 58 log "Disconnected." 59 } 60 61 @sky.on_reconnect { 62 log "Connection lost, reconnecting..." 63 64 @timer&.cancel 65 @timer = nil 66 } 67 68 @sky.on_timeout { 69 log "Trying to reconnect..." 70 } 71 72 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" } 73 74 @sky.connect 75 end 76 77 def stop 78 save_cursor(@sky.cursor) unless @sky.nil? 79 80 @sky&.disconnect 81 @sky = nil 82 end 83 84 def load_or_init_cursor 85 if sub = Subscription.find_by(service: @service) 86 sub.cursor 87 else 88 Subscription.create!(service: @service, cursor: 0) 89 nil 90 end 91 end 92 93 def save_cursor(cursor) 94 Subscription.where(service: @service).update_all(cursor: cursor) 95 end 96 97 def process_message(msg) 98 save_cursor(msg.seq) if msg.seq % 1000 == 0 99 100 case msg.type 101 when :info 102 log "InfoMessage: #{msg}" 103 when :account 104 process_account_event(msg) 105 when :commit 106 if @replaying 107 log "Replaying events since #{msg.time.getlocal} -->" 108 @replaying = false 109 end 110 111 msg.operations.each do |op| 112 case op.type 113 when :bsky_post 114 # ... 115 end 116 end 117 end 118 end 119 120 def process_account_event(msg) 121 if msg.status == :deleted 122 # ... 123 end 124 end 125 126 def log(text) 127 puts "[#{Time.now}] #{text}" 128 end 129 130 def inspect 131 vars = instance_variables - [:@timer] 132 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 133 "#<#{self.class}:0x#{object_id} #{values}>" 134 end 135end