Don't forget to lycansubscribe
1require 'skyfall'
2
3require_relative 'init'
4require_relative 'models/import_job'
5require_relative 'models/post'
6require_relative 'models/subscription'
7require_relative 'models/user'
8
9class FirehoseClient
10 attr_accessor :start_cursor, :service
11
12 DEFAULT_RELAY = 'bsky.network'
13
14 def initialize
15 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
16 @service = DEFAULT_RELAY
17 end
18
19 def start
20 return if @sky
21
22 @active_users = load_users
23
24 log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})"
25
26 last_cursor = load_or_init_cursor
27 cursor = @start_cursor || last_cursor
28
29 @sky = Skyfall::Firehose.new(@service, :subscribe_repos, cursor)
30 @sky.user_agent = "Lycan (https://tangled.sh/@mackuba.eu/lycan) #{@sky.version_string}"
31 @sky.check_heartbeat = true
32
33 @sky.on_message do |m|
34 start_time = Time.now
35 diff = start_time - @last_update
36
37 if diff > 30
38 log "Receiving messages again after #{sprintf('%.1f', diff)}s, starting from #{m.time.getlocal}"
39 end
40
41 @last_update = start_time
42 process_message(m)
43 end
44
45 @sky.on_connecting { |u| log "Connecting to #{u}..." }
46 @sky.on_connect {
47 log "Connected ✓"
48
49 @replaying = true
50 @last_update = Time.now
51
52 @live_check_timer ||= EM::PeriodicTimer.new(20) do
53 now = Time.now
54 diff = now - @last_update
55
56 if diff > 30
57 log "Timer: last update #{sprintf('%.1f', diff)}s ago"
58 end
59 end
60
61 @jobs_timer ||= EM::PeriodicTimer.new(3) do
62 ImportJob.all.each do |job|
63 @active_users[job.user.did] = job.user
64 job.create_imports
65 job.destroy
66 end
67 end
68 }
69
70 @sky.on_disconnect {
71 log "Disconnected."
72 }
73
74 @sky.on_reconnect {
75 log "Connection lost, reconnecting..."
76
77 @timer&.cancel
78 @timer = nil
79 }
80
81 @sky.on_timeout {
82 log "Trying to reconnect..."
83 }
84
85 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
86
87 @sky.connect
88 end
89
90 def stop
91 save_cursor(@sky.cursor) unless @sky.nil?
92
93 @sky&.disconnect
94 @sky = nil
95 end
96
97 def load_or_init_cursor
98 if sub = Subscription.find_by(service: @service)
99 sub.cursor
100 else
101 Subscription.create!(service: @service, cursor: 0)
102 nil
103 end
104 end
105
106 def save_cursor(cursor)
107 Subscription.where(service: @service).update_all(cursor: cursor)
108 end
109
110 def load_users
111 User.active.map { |u| [u.did, u] }.then { |list| Hash[list] }
112 end
113
114 def process_message(msg)
115 save_cursor(msg.seq) if msg.seq % 1000 == 0
116
117 case msg.type
118 when :info
119 log "InfoMessage: #{msg}"
120 when :account
121 process_account_event(msg)
122 when :commit
123 if @replaying
124 log "Replaying events since #{msg.time.getlocal} -->"
125 @replaying = false
126 end
127
128 @current_user = @active_users[msg.repo]
129
130 msg.operations.each do |op|
131 case op.type
132 when :bsky_like
133 process_like(msg, op)
134 when :bsky_repost
135 process_repost(msg, op)
136 when :bsky_post
137 process_post(msg, op)
138 end
139 end
140 end
141 end
142
143 def process_account_event(msg)
144 if msg.status == :deleted
145 if user = User.find_by(did: msg.repo)
146 user.destroy
147 @active_users.delete_if { |u| u.id == user.id }
148 end
149 end
150 end
151
152 def process_like(msg, op)
153 return unless @current_user
154
155 if op.action == :create
156 @current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose)
157 elsif op.action == :delete
158 @current_user.likes.where(rkey: op.rkey).delete_all
159 end
160 end
161
162 def process_repost(msg, op)
163 return unless @current_user
164
165 if op.action == :create
166 @current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose)
167 elsif op.action == :delete
168 @current_user.reposts.where(rkey: op.rkey).delete_all
169 end
170 end
171
172 def process_post(msg, op)
173 if op.action == :create
174 if @current_user
175 @current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose)
176 @current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose)
177 end
178 elsif op.action == :delete
179 if @current_user
180 @current_user.quotes.where(rkey: op.rkey).delete_all
181 @current_user.pins.where(rkey: op.rkey).delete_all
182 end
183
184 if post = Post.find_by_at_uri(op.uri)
185 post.destroy
186 end
187 end
188 end
189
190 def log(text)
191 puts "[#{Time.now}] #{text}"
192 end
193
194 def inspect
195 vars = instance_variables - [:@jobs_timer, :@live_check_timer]
196 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
197 "#<#{self.class}:0x#{object_id} #{values}>"
198 end
199end