Don't forget to lycansubscribe
1require 'skyfall'
2
3require_relative 'init'
4require_relative 'models/import_job'
5require_relative 'models/post'
6require_relative 'models/subscription'
7require_relative 'models/user'
8
9class FirehoseClient
10 attr_accessor :start_cursor, :service
11
12 DEFAULT_RELAY = 'bsky.network'
13
14 def initialize
15 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
16
17 if ENV['RELAY_HOST']
18 @service = ENV['RELAY_HOST']
19 elsif ENV['JETSTREAM_HOST']
20 @service = ENV['JETSTREAM_HOST']
21 @jetstream = true
22 else
23 @service = DEFAULT_RELAY
24 end
25 end
26
27 def start
28 return if @sky
29
30 @active_users = load_users
31
32 log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})"
33
34 last_cursor = load_or_init_cursor
35 cursor = @start_cursor || last_cursor
36
37 @sky = if @jetstream
38 Skyfall::Jetstream.new(@service, { cursor: cursor, wanted_collections: [:bsky_post, :bsky_like, :bsky_repost] })
39 else
40 Skyfall::Firehose.new(@service, :subscribe_repos, cursor)
41 end
42
43 @sky.user_agent = (ENV['FIREHOSE_USER_AGENT'] || "Lycan (https://tangled.sh/@mackuba.eu/lycan)") + ' ' + @sky.version_string
44 @sky.check_heartbeat = true
45
46 @sky.on_message do |m|
47 start_time = Time.now
48 diff = start_time - @last_update
49
50 if diff > 30
51 log "Receiving messages again after #{sprintf('%.1f', diff)}s, starting from #{m.time.getlocal}"
52 end
53
54 @last_update = start_time
55 process_message(m)
56 end
57
58 @sky.on_connecting { |u| log "Connecting to #{u}..." }
59 @sky.on_connect {
60 log "Connected ✓"
61
62 @replaying = true
63 @last_update = Time.now
64
65 @live_check_timer ||= EM::PeriodicTimer.new(20) do
66 now = Time.now
67 diff = now - @last_update
68
69 if diff > 30
70 log "Timer: last update #{sprintf('%.1f', diff)}s ago"
71 end
72 end
73
74 @jobs_timer ||= EM::PeriodicTimer.new(3) do
75 ImportJob.all.each do |job|
76 @active_users[job.user.did] = job.user
77 job.create_imports
78 job.destroy
79 end
80 end
81 }
82
83 @sky.on_disconnect {
84 log "Disconnected."
85 }
86
87 @sky.on_reconnect {
88 log "Connection lost, reconnecting..."
89
90 @timer&.cancel
91 @timer = nil
92 }
93
94 @sky.on_timeout {
95 log "Trying to reconnect..."
96 }
97
98 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
99
100 @sky.connect
101 end
102
103 def stop
104 save_cursor(@sky.cursor) unless @sky.nil?
105
106 @sky&.disconnect
107 @sky = nil
108 end
109
110 def load_or_init_cursor
111 if sub = Subscription.find_by(service: @service)
112 sub.cursor
113 else
114 Subscription.create!(service: @service, cursor: 0)
115 nil
116 end
117 end
118
119 def save_cursor(cursor)
120 Subscription.where(service: @service).update_all(cursor: cursor)
121 end
122
123 def load_users
124 User.active.map { |u| [u.did, u] }.then { |list| Hash[list] }
125 end
126
127 def process_message(msg)
128 save_cursor(msg.seq) if msg.seq % 1000 == 0
129
130 case msg.type
131 when :info
132 log "InfoMessage: #{msg}"
133 when :account
134 process_account_event(msg)
135 when :commit
136 if @replaying
137 log "Replaying events since #{msg.time.getlocal} -->"
138 @replaying = false
139 end
140
141 @current_user = @active_users[msg.repo]
142
143 msg.operations.each do |op|
144 case op.type
145 when :bsky_like
146 process_like(msg, op)
147 when :bsky_repost
148 process_repost(msg, op)
149 when :bsky_post
150 process_post(msg, op)
151 end
152 end
153 end
154 rescue CBOR::UnpackError
155 # ignore invalid records
156 end
157
158 def process_account_event(msg)
159 if msg.status == :deleted
160 if user = User.find_by(did: msg.repo)
161 user.destroy
162 @active_users.delete_if { |k, u| u.id == user.id }
163 end
164 end
165 end
166
167 def process_like(msg, op)
168 return unless @current_user
169
170 if op.action == :create
171 return if op.raw_record.nil?
172 @current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose)
173 elsif op.action == :delete
174 @current_user.likes.where(rkey: op.rkey).delete_all
175 end
176 rescue StandardError => e
177 log "Error in #process_like (#{msg.seq}, #{op.uri}): #{e}"
178 log e.backtrace.reject { |x| x.include?('/ruby/') }
179 sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed)
180 end
181
182 def process_repost(msg, op)
183 return unless @current_user
184
185 if op.action == :create
186 return if op.raw_record.nil?
187 @current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose)
188 elsif op.action == :delete
189 @current_user.reposts.where(rkey: op.rkey).delete_all
190 end
191 rescue StandardError => e
192 log "Error in #process_repost (#{msg.seq}, #{op.uri}): #{e}"
193 log e.backtrace.reject { |x| x.include?('/ruby/') }
194 sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed)
195 end
196
197 def process_post(msg, op)
198 if op.action == :create
199 return if op.raw_record.nil?
200
201 if @current_user
202 @current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose)
203 @current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose)
204 end
205 elsif op.action == :delete
206 if @current_user
207 @current_user.quotes.where(rkey: op.rkey).delete_all
208 @current_user.pins.where(rkey: op.rkey).delete_all
209 end
210
211 if post = Post.find_by_at_uri(op.uri)
212 post.destroy
213 end
214 end
215 rescue StandardError => e
216 log "Error in #process_post (#{msg.seq}, #{op.uri}): #{e}"
217 log e.backtrace.reject { |x| x.include?('/ruby/') }
218 sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed)
219 end
220
221 def log(text)
222 puts "[#{Time.now}] #{text}"
223 end
224
225 def inspect
226 vars = instance_variables - [:@jobs_timer, :@live_check_timer]
227 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
228 "#<#{self.class}:0x#{object_id} #{values}>"
229 end
230end