Template of a custom feed generator service for the Bluesky network in Ruby
1require 'json'
2require 'sinatra/activerecord'
3require 'skyfall'
4
5require_relative 'config'
6require_relative 'models/feed_post'
7require_relative 'models/post'
8require_relative 'models/subscription'
9require_relative 'utils'
10
11class FirehoseStream
12 attr_accessor :start_cursor, :show_progress, :log_status, :log_posts, :save_posts, :replay_events
13
14 DEFAULT_JETSTREAM = 'jetstream2.us-east.bsky.network'
15 POSTS_BATCH_SIZE = 100
16
17 include Utils
18
19 def initialize(service = nil)
20 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
21 @service = service || DEFAULT_JETSTREAM
22
23 @show_progress = (@env == :development) ? true : false
24 @log_status = true
25 @log_posts = (@env == :development) ? :matching : false
26 @save_posts = (@env == :development) ? :all : :matching
27 @replay_events = (@env == :development) ? false : true
28
29 @feeds = BlueFactory.all_feeds.select(&:is_updating?)
30 @post_queue = []
31 end
32
33 def start
34 return if @sky
35
36 last_cursor = load_or_init_cursor
37 cursor = @replay_events ? (@start_cursor || last_cursor) : nil
38
39 @sky = sky = Skyfall::Jetstream.new(@service, {
40 cursor: cursor,
41
42 # we ask Jetstream to only send us post records, since we don't need anything else
43 # if you need to process e.g. likes or follows too, update or remove this param
44 wanted_collections: ['app.bsky.feed.post'],
45 })
46
47 # set your user agent here to identify yourself on the relay
48 # @sky.user_agent = "My Feed Server (@my.handle) #{@sky.version_string}"
49
50 @sky.check_heartbeat = true
51
52 @sky.on_message do |m|
53 process_message(m)
54 end
55
56 if @log_status
57 @sky.on_connecting { |u| log "Connecting to #{u}..." }
58
59 @sky.on_connect {
60 @replaying = !!(cursor)
61 log "Connected ✓"
62 }
63
64 @sky.on_disconnect {
65 log "Disconnected."
66 }
67
68 @sky.on_reconnect {
69 log "Connection lost, reconnecting..."
70 }
71
72 @sky.on_timeout {
73 log "Trying to reconnect..."
74 }
75
76 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
77 end
78
79 @sky.connect
80 end
81
82 def stop
83 save_queued_posts
84 save_cursor(@sky.cursor) unless @sky.nil?
85
86 @sky&.disconnect
87 @sky = nil
88 end
89
90 def load_or_init_cursor
91 if sub = Subscription.find_by(service: @service)
92 sub.cursor
93 else
94 Subscription.create!(service: @service, cursor: 0)
95 nil
96 end
97 end
98
99 def save_cursor(cursor)
100 Subscription.where(service: @service).update_all(cursor: cursor)
101 end
102
103 def process_message(msg)
104 if msg.type == :info
105 # ATProto error, the only one right now is "OutdatedCursor"
106 log "InfoMessage: #{msg}"
107
108 elsif msg.type == :identity
109 # use these events if you want to track handle changes:
110 # log "Handle change: #{msg.repo} => #{msg.handle}"
111
112 elsif msg.type == :account
113 # tracking account status changes, e.g. suspensions, deactivations and deletes
114 process_account_message(msg)
115
116 elsif msg.unknown?
117 log "Unknown message type: #{msg.type} (#{msg.seq})"
118 end
119
120 return unless msg.type == :commit
121
122 if @replaying
123 log "Replaying events since #{msg.time.getlocal} -->"
124 @replaying = false
125 end
126
127 msg.operations.each do |op|
128 case op.type
129 when :bsky_post
130 process_post(msg, op)
131
132 when :bsky_like, :bsky_repost
133 # if you want to use the number of likes and/or reposts for filtering or sorting:
134 # add a likes/reposts table, then add/remove records here depending on op.action
135 # (you'll need to track like records and not just have a single numeric "likes" field,
136 # because delete events only include the uri/rkey of the like, not of the liked post)
137
138 when :bsky_follow
139 # if you want to make a personalized feed that needs info about given user's follows/followers:
140 # add a followers table, then add/remove records here depending on op.action
141
142 else
143 # other types like :bsky_block, :bsky_profile (includes profile edits)
144 end
145 end
146 end
147
148 def process_account_message(msg)
149 if msg.status == :deleted
150 # delete all data we have stored about this account
151 FeedPost.joins(:post).where(post: { repo: msg.did }).delete_all
152 Post.where(repo: msg.did).delete_all
153 end
154 end
155
156 def process_post(msg, op)
157 if op.action == :delete
158 if post = Post.find_by_repo_rkey(op.repo, op.rkey)
159 post.destroy
160 end
161 end
162
163 return unless op.action == :create
164
165 begin
166 if op.raw_record.nil?
167 log "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})"
168 return
169 end
170 rescue CBOR::UnpackError => e
171 log "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}"
172 return
173 end
174
175 # ignore posts with past date from Twitter etc. imported using some kind of tool
176 begin
177 post_time = Time.parse(op.raw_record['createdAt'])
178 return if post_time < msg.time - 86400
179 rescue StandardError => e
180 log "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})"
181 return
182 end
183
184 record = op.raw_record
185 text = record['text']
186
187 # to save space, delete redundant post text and type from the saved data JSON
188 record.delete('$type')
189 record.delete('text')
190 trimmed_json = JSON.generate(record)
191
192 # tip: if you don't need full record data for debugging, delete the data column in posts
193 post = Post.new(
194 repo: op.repo,
195 time: msg.time,
196 text: text,
197 rkey: op.rkey,
198 data: trimmed_json,
199 record: record
200 )
201
202 if !post.valid?
203 if post.errors.has_key?(:data)
204 post.trim_too_long_data
205 end
206
207 if !post.valid?
208 log "Error: post is invalid: #{op.uri} (#{msg.seq}): #{post.errors.to_a.join(', ')}"
209 return
210 end
211 end
212
213 matched = false
214
215 @feeds.each do |feed|
216 if feed.post_matches?(post)
217 post.feed_posts.build(feed_id: feed.feed_id, time: msg.time) unless !@save_posts
218 matched = true
219 end
220 end
221
222 if @log_posts == :all || @log_posts && matched
223 puts
224 puts text
225 end
226
227 if @save_posts == :all || @save_posts && matched
228 @post_queue << post
229 end
230
231 # wait until we have 100 posts and then save them all in one insert, if possible
232 if @post_queue.length >= POSTS_BATCH_SIZE
233 save_queued_posts
234 save_cursor(@sky.cursor)
235 end
236
237 print '.' if @show_progress && @log_posts != :all
238 rescue StandardError => e
239 log "Error in #process_post: #{e}"
240
241 unless e.message == "nesting of 100 is too deep"
242 log msg.inspect
243 log e.backtrace.reject { |x| x.include?('/ruby/') }
244 end
245 end
246
247 def save_queued_posts
248 # we can only squash posts into one insert statement if they don't have nested feed_posts
249 # so we save those without feed_posts first:
250
251 matched, unmatched = @post_queue.partition { |x| x.feed_posts.length > 0 }
252
253 if unmatched.length > 0
254 values = unmatched.map { |p| p.attributes.except('id') }
255 Post.insert_all(values)
256 end
257
258 @post_queue = matched
259 return if @post_queue.empty?
260
261 # and for those that do have feed_posts, we save them normally, in one transaction:
262
263 ActiveRecord::Base.transaction do
264 @post_queue.each do |p|
265 # skip validations since we've checked the posts before adding them to the queue
266 p.save!(validate: false)
267 end
268 end
269
270 @post_queue = []
271 rescue StandardError => e
272 # there shouldn't be any ActiveRecord errors raised, but SQLite might find some issues which
273 # aren't covered by AR validations; so in that case, try to save any valid posts one by one:
274
275 @post_queue.each do |p|
276 begin
277 ActiveRecord::Base.transaction do
278 p.save!(validate: false)
279 end
280 rescue StandardError => e
281 log "Error in #save_queued_posts: #{e}"
282
283 unless e.message == "nesting of 100 is too deep"
284 log p.inspect
285 log e.backtrace.reject { |x| x.include?('/ruby/') }
286 end
287 end
288 end
289
290 @post_queue = []
291 end
292
293 def log(text)
294 puts if @show_progress
295 puts "[#{Time.now}] #{text}"
296 end
297
298 def inspect
299 vars = instance_variables - [:@feeds]
300 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
301 "#<#{self.class}:0x#{object_id} #{values}>"
302 end
303end