Don't forget to lycansubscribe
1require 'skyfall'
2
3require_relative 'init'
4require_relative 'models/post'
5require_relative 'models/subscription'
6require_relative 'models/user'
7
8class FirehoseClient
9 attr_accessor :start_cursor, :service
10
11 DEFAULT_RELAY = 'bsky.network'
12
13 def initialize
14 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
15 @service = DEFAULT_RELAY
16 end
17
18 def start
19 return if @sky
20
21 @active_users = load_users
22
23 log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})"
24
25 last_cursor = load_or_init_cursor
26 cursor = @start_cursor || last_cursor
27
28 @sky = Skyfall::Firehose.new(@service, :subscribe_repos, cursor)
29 @sky.user_agent = "Lycan (https://tangled.sh/@mackuba.eu/lycan) #{@sky.version_string}"
30 @sky.check_heartbeat = true
31
32 @sky.on_message do |m|
33 start_time = Time.now
34 diff = start_time - @last_update
35
36 if diff > 30
37 log "Receiving messages again after #{sprintf('%.1f', diff)}s, starting from #{m.time.getlocal}"
38 end
39
40 @last_update = start_time
41 process_message(m)
42 end
43
44 @sky.on_connecting { |u| log "Connecting to #{u}..." }
45 @sky.on_connect {
46 log "Connected ✓"
47
48 @replaying = true
49 @last_update = Time.now
50
51 @timer ||= EM::PeriodicTimer.new(20) do
52 now = Time.now
53 diff = now - @last_update
54
55 if diff > 30
56 log "Timer: last update #{sprintf('%.1f', diff)}s ago"
57 end
58 end
59 }
60
61 @sky.on_disconnect {
62 log "Disconnected."
63 }
64
65 @sky.on_reconnect {
66 log "Connection lost, reconnecting..."
67
68 @timer&.cancel
69 @timer = nil
70 }
71
72 @sky.on_timeout {
73 log "Trying to reconnect..."
74 }
75
76 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
77
78 @sky.connect
79 end
80
81 def stop
82 save_cursor(@sky.cursor) unless @sky.nil?
83
84 @sky&.disconnect
85 @sky = nil
86 end
87
88 def load_or_init_cursor
89 if sub = Subscription.find_by(service: @service)
90 sub.cursor
91 else
92 Subscription.create!(service: @service, cursor: 0)
93 nil
94 end
95 end
96
97 def save_cursor(cursor)
98 Subscription.where(service: @service).update_all(cursor: cursor)
99 end
100
101 def load_users
102 User.active.map { |u| [u.did, u] }.then { |list| Hash[list] }
103 end
104
105 def process_message(msg)
106 save_cursor(msg.seq) if msg.seq % 1000 == 0
107
108 case msg.type
109 when :info
110 log "InfoMessage: #{msg}"
111 when :account
112 process_account_event(msg)
113 when :commit
114 if @replaying
115 log "Replaying events since #{msg.time.getlocal} -->"
116 @replaying = false
117 end
118
119 @current_user = @active_users[msg.repo]
120
121 msg.operations.each do |op|
122 case op.type
123 when :bsky_like
124 process_like(msg, op)
125 when :bsky_repost
126 process_repost(msg, op)
127 when :bsky_post
128 process_post(msg, op)
129 end
130 end
131 end
132 end
133
134 def process_account_event(msg)
135 if msg.status == :deleted
136 if user = User.find_by(did: msg.repo)
137 user.destroy
138 end
139 end
140 end
141
142 def process_like(msg, op)
143 return unless @current_user
144
145 if op.action == :create
146 @current_user.likes.import_from_record(op.uri, op.raw_record)
147 elsif op.action == :delete
148 @current_user.likes.where(rkey: op.rkey).delete_all
149 end
150 end
151
152 def process_repost(msg, op)
153 return unless @current_user
154
155 if op.action == :create
156 @current_user.reposts.import_from_record(op.uri, op.raw_record)
157 elsif op.action == :delete
158 @current_user.reposts.where(rkey: op.rkey).delete_all
159 end
160 end
161
162 def process_post(msg, op)
163 if op.action == :create
164 if @current_user
165 @current_user.quotes.import_from_record(op.uri, op.raw_record)
166 @current_user.pins.import_from_record(op.uri, op.raw_record)
167 end
168 elsif op.action == :delete
169 if @current_user
170 @current_user.quotes.where(rkey: op.rkey).delete_all
171 @current_user.pins.where(rkey: op.rkey).delete_all
172 end
173
174 if post = Post.find_by_at_uri(op.uri)
175 post.destroy
176 end
177 end
178 end
179
180 def log(text)
181 puts "[#{Time.now}] #{text}"
182 end
183
184 def inspect
185 vars = instance_variables - [:@timer]
186 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
187 "#<#{self.class}:0x#{object_id} #{values}>"
188 end
189end