Don't forget to lycansubscribe
1require 'skyfall'
2
3require_relative 'init'
4require_relative 'models/import_job'
5require_relative 'models/post'
6require_relative 'models/subscription'
7require_relative 'models/user'
8
9class FirehoseClient
10 attr_accessor :start_cursor, :service
11
12 DEFAULT_RELAY = 'bsky.network'
13
14 def initialize
15 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
16
17 if ENV['RELAY_HOST']
18 @service = ENV['RELAY_HOST']
19 elsif ENV['JETSTREAM_HOST']
20 @service = ENV['JETSTREAM_HOST']
21 @jetstream = true
22 else
23 @service = DEFAULT_RELAY
24 end
25 end
26
27 def start
28 return if @sky
29
30 @active_users = load_users
31
32 log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})"
33
34 last_cursor = load_or_init_cursor
35 cursor = @start_cursor || last_cursor
36
37 @sky = if @jetstream
38 Skyfall::Jetstream.new(@service, { cursor: cursor, wanted_collections: [:bsky_post, :bsky_like, :bsky_repost] })
39 else
40 Skyfall::Firehose.new(@service, :subscribe_repos, cursor)
41 end
42
43 @sky.user_agent = (ENV['FIREHOSE_USER_AGENT'] || "Lycan (https://tangled.sh/@mackuba.eu/lycan)") + ' ' + @sky.version_string
44 @sky.check_heartbeat = true
45
46 @sky.on_message do |m|
47 start_time = Time.now
48 diff = start_time - @last_update
49
50 if diff > 30
51 log "Receiving messages again after #{sprintf('%.1f', diff)}s, starting from #{m.time.getlocal}"
52 end
53
54 @last_update = start_time
55 process_message(m)
56 end
57
58 @sky.on_connecting { |u| log "Connecting to #{u}..." }
59 @sky.on_connect {
60 log "Connected ✓"
61
62 @replaying = true
63 @last_update = Time.now
64
65 @live_check_timer ||= EM::PeriodicTimer.new(20) do
66 now = Time.now
67 diff = now - @last_update
68
69 if diff > 30
70 log "Timer: last update #{sprintf('%.1f', diff)}s ago"
71 end
72 end
73
74 @jobs_timer ||= EM::PeriodicTimer.new(3) do
75 ImportJob.all.each do |job|
76 @active_users[job.user.did] = job.user
77 job.create_imports
78 job.destroy
79 end
80 end
81 }
82
83 @sky.on_disconnect {
84 log "Disconnected."
85 }
86
87 @sky.on_reconnect {
88 log "Connection lost, reconnecting..."
89
90 @timer&.cancel
91 @timer = nil
92 }
93
94 @sky.on_timeout {
95 log "Trying to reconnect..."
96 }
97
98 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
99
100 @sky.connect
101 end
102
103 def stop
104 save_cursor(@sky.cursor) unless @sky.nil?
105
106 @sky&.disconnect
107 @sky = nil
108 end
109
110 def load_or_init_cursor
111 if sub = Subscription.find_by(service: @service)
112 sub.cursor
113 else
114 Subscription.create!(service: @service, cursor: 0)
115 nil
116 end
117 end
118
119 def save_cursor(cursor)
120 Subscription.where(service: @service).update_all(cursor: cursor)
121 end
122
123 def load_users
124 User.active.map { |u| [u.did, u] }.then { |list| Hash[list] }
125 end
126
127 def process_message(msg)
128 save_cursor(msg.seq) if msg.seq % 1000 == 0
129
130 case msg.type
131 when :info
132 log "InfoMessage: #{msg}"
133 when :account
134 process_account_event(msg)
135 when :commit
136 if @replaying
137 log "Replaying events since #{msg.time.getlocal} -->"
138 @replaying = false
139 end
140
141 @current_user = @active_users[msg.repo]
142
143 msg.operations.each do |op|
144 case op.type
145 when :bsky_like
146 process_like(msg, op)
147 when :bsky_repost
148 process_repost(msg, op)
149 when :bsky_post
150 process_post(msg, op)
151 end
152 end
153 end
154 rescue CBOR::UnpackError
155 # ignore invalid records
156 end
157
158 def process_account_event(msg)
159 if msg.status == :deleted
160 if user = User.find_by(did: msg.repo)
161 user.destroy
162 @active_users.delete_if { |u| u.id == user.id }
163 end
164 end
165 end
166
167 def process_like(msg, op)
168 return unless @current_user
169
170 if op.action == :create
171 return if op.raw_record.nil?
172 @current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose)
173 elsif op.action == :delete
174 @current_user.likes.where(rkey: op.rkey).delete_all
175 end
176 end
177
178 def process_repost(msg, op)
179 return unless @current_user
180
181 if op.action == :create
182 return if op.raw_record.nil?
183 @current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose)
184 elsif op.action == :delete
185 @current_user.reposts.where(rkey: op.rkey).delete_all
186 end
187 end
188
189 def process_post(msg, op)
190 if op.action == :create
191 return if op.raw_record.nil?
192
193 if @current_user
194 @current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose)
195 @current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose)
196 end
197 elsif op.action == :delete
198 if @current_user
199 @current_user.quotes.where(rkey: op.rkey).delete_all
200 @current_user.pins.where(rkey: op.rkey).delete_all
201 end
202
203 if post = Post.find_by_at_uri(op.uri)
204 post.destroy
205 end
206 end
207 end
208
209 def log(text)
210 puts "[#{Time.now}] #{text}"
211 end
212
213 def inspect
214 vars = instance_variables - [:@jobs_timer, :@live_check_timer]
215 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
216 "#<#{self.class}:0x#{object_id} #{values}>"
217 end
218end