Template of a custom feed generator service for the Bluesky network in Ruby
1require 'json'
2require 'sinatra/activerecord'
3require 'skyfall'
4
5require_relative 'config'
6require_relative 'models/feed_post'
7require_relative 'models/post'
8require_relative 'models/subscription'
9require_relative 'utils'
10
11class FirehoseStream
12 attr_accessor :start_cursor, :show_progress, :log_status, :log_posts, :save_posts, :replay_events
13
14 DEFAULT_JETSTREAM = 'jetstream2.us-east.bsky.network'
15 POSTS_BATCH_SIZE = 100
16
17 include Utils
18
19 def initialize(service = nil)
20 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
21 @service = service || DEFAULT_JETSTREAM
22
23 @show_progress = (@env == :development) ? true : false
24 @log_status = true
25 @log_posts = (@env == :development) ? :matching : false
26 @save_posts = (@env == :development) ? :all : :matching
27 @replay_events = (@env == :development) ? false : true
28
29 @feeds = BlueFactory.all_feeds.select(&:is_updating?)
30 @post_queue = []
31 end
32
33 def start
34 return if @sky
35
36 last_cursor = load_or_init_cursor
37 cursor = @replay_events ? (@start_cursor || last_cursor) : nil
38
39 @sky = sky = Skyfall::Jetstream.new(@service, {
40 cursor: cursor,
41
42 # we ask Jetstream to only send us post records, since we don't need anything else
43 # if you need to process e.g. likes or follows too, update or remove this param
44 wanted_collections: ['app.bsky.feed.post'],
45 })
46
47 # set your user agent here to identify yourself on the relay
48 # @sky.user_agent = "My Feed Server (@my.handle) #{@sky.version_string}"
49
50 @sky.check_heartbeat = true
51
52 @sky.on_message do |m|
53 process_message(m)
54 end
55
56 if @log_status
57 @sky.on_connecting { |u| log "Connecting to #{u}..." }
58
59 @sky.on_connect {
60 @message_counter = 0
61 @replaying = !!(cursor)
62 log "Connected ✓"
63 }
64
65 @sky.on_disconnect {
66 log "Disconnected."
67 }
68
69 @sky.on_reconnect {
70 log "Connection lost, reconnecting..."
71 }
72
73 @sky.on_timeout {
74 log "Trying to reconnect..."
75 }
76
77 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
78 end
79
80 @sky.connect
81 end
82
83 def stop
84 save_queued_posts
85 save_cursor(@sky.cursor) unless @sky.nil?
86
87 @sky&.disconnect
88 @sky = nil
89 end
90
91 def load_or_init_cursor
92 if sub = Subscription.find_by(service: @service)
93 sub.cursor
94 else
95 Subscription.create!(service: @service, cursor: 0)
96 nil
97 end
98 end
99
100 def save_cursor(cursor)
101 Subscription.where(service: @service).update_all(cursor: cursor)
102 end
103
104 def process_message(msg)
105 if msg.type == :info
106 # ATProto error, the only one right now is "OutdatedCursor"
107 log "InfoMessage: #{msg}"
108
109 elsif msg.type == :identity
110 # use these events if you want to track handle changes:
111 # log "Handle change: #{msg.repo} => #{msg.handle}"
112
113 elsif msg.type == :account
114 # tracking account status changes, e.g. suspensions, deactivations and deletes
115 process_account_message(msg)
116
117 elsif msg.unknown?
118 log "Unknown message type: #{msg.type} (#{msg.seq})"
119 end
120
121 return unless msg.type == :commit
122
123 if @replaying
124 log "Replaying events since #{msg.time.getlocal} -->"
125 @replaying = false
126 end
127
128 @message_counter += 1
129
130 if @message_counter % 100 == 0
131 # save current cursor every 100 events
132 save_cursor(msg.seq)
133 end
134
135 msg.operations.each do |op|
136 case op.type
137 when :bsky_post
138 process_post(msg, op)
139
140 when :bsky_like, :bsky_repost
141 # if you want to use the number of likes and/or reposts for filtering or sorting:
142 # add a likes/reposts table, then add/remove records here depending on op.action
143 # (you'll need to track like records and not just have a single numeric "likes" field,
144 # because delete events only include the uri/rkey of the like, not of the liked post)
145
146 when :bsky_follow
147 # if you want to make a personalized feed that needs info about given user's follows/followers:
148 # add a followers table, then add/remove records here depending on op.action
149
150 else
151 # other types like :bsky_block, :bsky_profile (includes profile edits)
152 end
153 end
154 end
155
156 def process_account_message(msg)
157 if msg.status == :deleted
158 # delete all data we have stored about this account
159 FeedPost.joins(:post).where(post: { repo: msg.did }).delete_all
160 Post.where(repo: msg.did).delete_all
161 end
162 end
163
164 def process_post(msg, op)
165 if op.action == :delete
166 if post = Post.find_by_repo_rkey(op.repo, op.rkey)
167 post.destroy
168 end
169 end
170
171 return unless op.action == :create
172
173 begin
174 if op.raw_record.nil?
175 log "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})"
176 return
177 end
178 rescue CBOR::UnpackError => e
179 log "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}"
180 return
181 end
182
183 # ignore posts with past date from Twitter etc. imported using some kind of tool
184 begin
185 post_time = Time.parse(op.raw_record['createdAt'])
186 return if post_time < msg.time - 86400
187 rescue StandardError => e
188 log "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})"
189 return
190 end
191
192 record = op.raw_record
193 text = record['text']
194
195 # to save space, delete redundant post text and type from the saved data JSON
196 record.delete('$type')
197 record.delete('text')
198 trimmed_json = JSON.generate(record)
199
200 # tip: if you don't need full record data for debugging, delete the data column in posts
201 post = Post.new(
202 repo: op.repo,
203 time: msg.time,
204 text: text,
205 rkey: op.rkey,
206 data: trimmed_json,
207 record: record
208 )
209
210 if !post.valid?
211 if post.errors.has_key?(:data)
212 post.trim_too_long_data
213 end
214
215 if !post.valid?
216 log "Error: post is invalid: #{op.uri} (#{msg.seq}): #{post.errors.to_a.join(', ')}"
217 return
218 end
219 end
220
221 matched = false
222
223 @feeds.each do |feed|
224 if feed.post_matches?(post)
225 post.feed_posts.build(feed_id: feed.feed_id, time: msg.time) unless !@save_posts
226 matched = true
227 end
228 end
229
230 if @log_posts == :all || @log_posts && matched
231 puts
232 puts text
233 end
234
235 if @save_posts == :all
236 # wait until we have 100 posts and then save them all in one insert, if possible
237 @post_queue << post
238
239 if @post_queue.length >= POSTS_BATCH_SIZE
240 save_queued_posts
241 end
242 elsif @save_posts == :matching && matched
243 # save immediately because matched posts might be rare; we've already checked validations
244 post.save!(validate: false)
245 end
246
247 print '.' if @show_progress && @log_posts != :all
248 rescue StandardError => e
249 log "Error in #process_post: #{e}"
250
251 unless e.message == "nesting of 100 is too deep"
252 log msg.inspect
253 log e.backtrace.reject { |x| x.include?('/ruby/') }
254 end
255 end
256
257 def save_queued_posts
258 # we can only squash posts into one insert statement if they don't have nested feed_posts
259 # so we save those without feed_posts first:
260
261 matched, unmatched = @post_queue.partition { |x| x.feed_posts.length > 0 }
262
263 if unmatched.length > 0
264 values = unmatched.map { |p| p.attributes.except('id') }
265 Post.insert_all(values)
266 end
267
268 @post_queue = matched
269 return if @post_queue.empty?
270
271 # and for those that do have feed_posts, we save them normally, in one transaction:
272
273 ActiveRecord::Base.transaction do
274 @post_queue.each do |p|
275 # skip validations since we've checked the posts before adding them to the queue
276 p.save!(validate: false)
277 end
278 end
279
280 @post_queue = []
281 rescue StandardError => e
282 # there shouldn't be any ActiveRecord errors raised, but SQLite might find some issues which
283 # aren't covered by AR validations; so in that case, try to save any valid posts one by one:
284
285 @post_queue.each do |p|
286 begin
287 ActiveRecord::Base.transaction do
288 p.save!(validate: false)
289 end
290 rescue StandardError => e
291 log "Error in #save_queued_posts: #{e}"
292
293 unless e.message == "nesting of 100 is too deep"
294 log p.inspect
295 log e.backtrace.reject { |x| x.include?('/ruby/') }
296 end
297 end
298 end
299
300 @post_queue = []
301 end
302
303 def log(text)
304 puts if @show_progress
305 puts "[#{Time.now}] #{text}"
306 end
307
308 def inspect
309 vars = instance_variables - [:@feeds]
310 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
311 "#<#{self.class}:0x#{object_id} #{values}>"
312 end
313end