Don't forget to lycansubscribe
1require 'skyfall'
2
3require_relative 'init'
4require_relative 'models/import_job'
5require_relative 'models/post'
6require_relative 'models/subscription'
7require_relative 'models/user'
8
9class FirehoseClient
10 attr_accessor :start_cursor, :service
11
12 DEFAULT_RELAY = 'bsky.network'
13
14 def initialize
15 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
16
17 if ENV['RELAY_HOST']
18 @service = ENV['RELAY_HOST']
19 elsif ENV['JETSTREAM_HOST']
20 @service = ENV['JETSTREAM_HOST']
21 @jetstream = true
22 else
23 @service = DEFAULT_RELAY
24 end
25 end
26
27 def start
28 return if @sky
29
30 @active_users = load_users
31
32 log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})"
33
34 last_cursor = load_or_init_cursor
35 cursor = @start_cursor || last_cursor
36
37 @sky = if @jetstream
38 Skyfall::Jetstream.new(@service, { cursor: cursor, wanted_collections: [:bsky_post, :bsky_like, :bsky_repost] })
39 else
40 Skyfall::Firehose.new(@service, :subscribe_repos, cursor)
41 end
42
43 @sky.user_agent = (ENV['FIREHOSE_USER_AGENT'] || "Lycan (https://tangled.sh/@mackuba.eu/lycan)") + ' ' + @sky.version_string
44 @sky.check_heartbeat = true
45
46 @sky.on_message do |m|
47 start_time = Time.now
48 diff = start_time - @last_update
49
50 if diff > 30
51 log "Receiving messages again after #{sprintf('%.1f', diff)}s, starting from #{m.time.getlocal}"
52 end
53
54 @last_update = start_time
55 process_message(m)
56 end
57
58 @sky.on_connecting { |u| log "Connecting to #{u}..." }
59 @sky.on_connect {
60 log "Connected ✓"
61
62 @replaying = true
63 @last_update = Time.now
64
65 @live_check_timer ||= EM::PeriodicTimer.new(20) do
66 now = Time.now
67 diff = now - @last_update
68
69 if diff > 30
70 log "Timer: last update #{sprintf('%.1f', diff)}s ago"
71 end
72 end
73
74 @jobs_timer ||= EM::PeriodicTimer.new(3) do
75 ImportJob.all.each do |job|
76 @active_users[job.user.did] = job.user
77 job.create_imports
78 job.destroy
79 end
80 end
81 }
82
83 @sky.on_disconnect {
84 log "Disconnected."
85 }
86
87 @sky.on_reconnect {
88 log "Connection lost, reconnecting..."
89
90 @timer&.cancel
91 @timer = nil
92 }
93
94 @sky.on_timeout {
95 log "Trying to reconnect..."
96 }
97
98 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
99
100 @sky.connect
101 end
102
103 def stop
104 save_cursor(@sky.cursor) unless @sky.nil?
105
106 @sky&.disconnect
107 @sky = nil
108 end
109
110 def load_or_init_cursor
111 if sub = Subscription.find_by(service: @service)
112 sub.cursor
113 else
114 Subscription.create!(service: @service, cursor: 0)
115 nil
116 end
117 end
118
119 def save_cursor(cursor)
120 Subscription.where(service: @service).update_all(cursor: cursor)
121 end
122
123 def load_users
124 User.active.map { |u| [u.did, u] }.then { |list| Hash[list] }
125 end
126
127 def process_message(msg)
128 save_cursor(msg.seq) if msg.seq % 1000 == 0
129
130 case msg.type
131 when :info
132 log "InfoMessage: #{msg}"
133 when :account
134 process_account_event(msg)
135 when :commit
136 if @replaying
137 log "Replaying events since #{msg.time.getlocal} -->"
138 @replaying = false
139 end
140
141 @current_user = @active_users[msg.repo]
142
143 msg.operations.each do |op|
144 case op.type
145 when :bsky_like
146 process_like(msg, op)
147 when :bsky_repost
148 process_repost(msg, op)
149 when :bsky_post
150 process_post(msg, op)
151 end
152 end
153 end
154 end
155
156 def process_account_event(msg)
157 if msg.status == :deleted
158 if user = User.find_by(did: msg.repo)
159 user.destroy
160 @active_users.delete_if { |u| u.id == user.id }
161 end
162 end
163 end
164
165 def process_like(msg, op)
166 return unless @current_user
167
168 if op.action == :create
169 @current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose)
170 elsif op.action == :delete
171 @current_user.likes.where(rkey: op.rkey).delete_all
172 end
173 end
174
175 def process_repost(msg, op)
176 return unless @current_user
177
178 if op.action == :create
179 @current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose)
180 elsif op.action == :delete
181 @current_user.reposts.where(rkey: op.rkey).delete_all
182 end
183 end
184
185 def process_post(msg, op)
186 if op.action == :create
187 if @current_user
188 @current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose)
189 @current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose)
190 end
191 elsif op.action == :delete
192 if @current_user
193 @current_user.quotes.where(rkey: op.rkey).delete_all
194 @current_user.pins.where(rkey: op.rkey).delete_all
195 end
196
197 if post = Post.find_by_at_uri(op.uri)
198 post.destroy
199 end
200 end
201 end
202
203 def log(text)
204 puts "[#{Time.now}] #{text}"
205 end
206
207 def inspect
208 vars = instance_variables - [:@jobs_timer, :@live_check_timer]
209 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
210 "#<#{self.class}:0x#{object_id} #{values}>"
211 end
212end