Template of a custom feed generator service for the Bluesky network in Ruby
1require 'json'
2require 'sinatra/activerecord'
3require 'skyfall'
4
5require_relative 'config'
6require_relative 'models/feed_post'
7require_relative 'models/post'
8require_relative 'models/subscription'
9require_relative 'utils'
10
11class FirehoseStream
12 attr_accessor :start_cursor, :show_progress, :log_status, :log_posts, :save_posts, :replay_events
13
14 DEFAULT_JETSTREAM = 'jetstream2.us-east.bsky.network'
15 POSTS_BATCH_SIZE = 100
16
17 include Utils
18
19 def initialize(service = nil)
20 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
21 @service = service || DEFAULT_JETSTREAM
22
23 @show_progress = (@env == :development) ? true : false
24 @log_status = true
25 @log_posts = (@env == :development) ? :matching : false
26 @save_posts = (@env == :development) ? :all : :matching
27 @replay_events = (@env == :development) ? false : true
28
29 @feeds = BlueFactory.all_feeds.select(&:is_updating?)
30 @post_queue = []
31 end
32
33 def start
34 return if @sky
35
36 last_cursor = load_or_init_cursor
37 cursor = @replay_events ? (@start_cursor || last_cursor) : nil
38
39 @sky = sky = Skyfall::Jetstream.new(@service, {
40 cursor: cursor,
41
42 # we ask Jetstream to only send us post records, since we don't need anything else
43 # if you need to process e.g. likes or follows too, update or remove this param
44 wanted_collections: ['app.bsky.feed.post'],
45 })
46
47 # set your user agent here to identify yourself on the relay
48 # @sky.user_agent = "My Feed Server (@my.handle) #{@sky.version_string}"
49
50 @sky.check_heartbeat = true
51
52 @sky.on_message do |m|
53 process_message(m)
54 end
55
56 if @log_status
57 @sky.on_connecting { |u| log "Connecting to #{u}..." }
58
59 @sky.on_connect {
60 @replaying = !!(cursor)
61 log "Connected ✓"
62 }
63
64 @sky.on_disconnect {
65 log "Disconnected."
66 }
67
68 @sky.on_timeout { log "Trying to reconnect..." }
69 @sky.on_reconnect { log "Connection lost, reconnecting..." }
70 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
71 end
72
73 @sky.connect
74 end
75
76 def stop
77 save_queued_posts
78 save_cursor(@sky.cursor) unless @sky.nil?
79
80 @sky&.disconnect
81 @sky = nil
82 end
83
84 def load_or_init_cursor
85 if sub = Subscription.find_by(service: @service)
86 sub.cursor
87 else
88 Subscription.create!(service: @service, cursor: 0)
89 nil
90 end
91 end
92
93 def save_cursor(cursor)
94 Subscription.where(service: @service).update_all(cursor: cursor)
95 end
96
97 def process_message(msg)
98 if msg.type == :info
99 # AtProto error, the only one right now is "OutdatedCursor"
100 log "InfoMessage: #{msg}"
101
102 elsif msg.type == :identity
103 # use these events if you want to track handle changes:
104 # log "Handle change: #{msg.repo} => #{msg.handle}"
105
106 elsif msg.type == :account
107 # tracking account status changes, e.g. suspensions, deactivations and deletes
108 process_account_message(msg)
109
110 elsif msg.is_a?(Skyfall::Firehose::UnknownMessage)
111 log "Unknown message type: #{msg.type} (#{msg.seq})"
112 end
113
114 return unless msg.type == :commit
115
116 if @replaying
117 log "Replaying events since #{msg.time.getlocal} -->"
118 @replaying = false
119 end
120
121 msg.operations.each do |op|
122 case op.type
123 when :bsky_post
124 process_post(msg, op)
125
126 when :bsky_like, :bsky_repost
127 # if you want to use the number of likes and/or reposts for filtering or sorting:
128 # add a likes/reposts table, then add/remove records here depending on op.action
129 # (you'll need to track like records and not just have a single numeric "likes" field,
130 # because delete events only include the uri/rkey of the like, not of the liked post)
131
132 when :bsky_follow
133 # if you want to make a personalized feed that needs info about given user's follows/followers:
134 # add a followers table, then add/remove records here depending on op.action
135
136 else
137 # other types like :bsky_block, :bsky_profile (includes profile edits)
138 end
139 end
140 end
141
142 def process_account_message(msg)
143 if msg.status == :deleted
144 # delete all data we have stored about this account
145 FeedPost.joins(:post).where(post: { repo: msg.did }).delete_all
146 Post.where(repo: msg.did).delete_all
147 end
148 end
149
150 def process_post(msg, op)
151 if op.action == :delete
152 if post = Post.find_by_repo_rkey(op.repo, op.rkey)
153 post.destroy
154 end
155 end
156
157 return unless op.action == :create
158
159 begin
160 if op.raw_record.nil?
161 log "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})"
162 return
163 end
164 rescue CBOR::UnpackError => e
165 log "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}"
166 return
167 end
168
169 # ignore posts with past date from Twitter etc. imported using some kind of tool
170 begin
171 post_time = Time.parse(op.raw_record['createdAt'])
172 return if post_time < msg.time - 86400
173 rescue StandardError => e
174 log "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})"
175 return
176 end
177
178 record = op.raw_record
179 text = record['text']
180
181 # to save space, delete redundant post text and type from the saved data JSON
182 record.delete('$type')
183 record.delete('text')
184 trimmed_json = JSON.generate(record)
185
186 # tip: if you don't need full record data for debugging, delete the data column in posts
187 post = Post.new(
188 repo: op.repo,
189 time: msg.time,
190 text: text,
191 rkey: op.rkey,
192 data: trimmed_json,
193 record: record
194 )
195
196 if !post.valid?
197 if post.errors.has_key?(:data)
198 post.trim_too_long_data
199 end
200
201 if !post.valid?
202 log "Error: post is invalid: #{op.uri} (#{msg.seq}): #{post.errors.to_a.join(', ')}"
203 return
204 end
205 end
206
207 matched = false
208
209 @feeds.each do |feed|
210 if feed.post_matches?(post)
211 post.feed_posts.build(feed_id: feed.feed_id, time: msg.time) unless !@save_posts
212 matched = true
213 end
214 end
215
216 if @log_posts == :all || @log_posts && matched
217 puts
218 puts text
219 end
220
221 if @save_posts == :all || @save_posts && matched
222 @post_queue << post
223 end
224
225 # wait until we have 100 posts and then save them all in one insert, if possible
226 if @post_queue.length >= POSTS_BATCH_SIZE
227 save_queued_posts
228 save_cursor(@sky.cursor)
229 end
230
231 print '.' if @show_progress && @log_posts != :all
232 rescue StandardError => e
233 log "Error in #process_post: #{e}"
234
235 unless e.message == "nesting of 100 is too deep"
236 log msg.inspect
237 log e.backtrace.reject { |x| x.include?('/ruby/') }
238 end
239 end
240
241 def save_queued_posts
242 # we can only squash posts into one insert statement if they don't have nested feed_posts
243 # so we save those without feed_posts first:
244
245 matched, unmatched = @post_queue.partition { |x| x.feed_posts.length > 0 }
246
247 if unmatched.length > 0
248 values = unmatched.map { |p| p.attributes.except('id') }
249 Post.insert_all(values)
250 end
251
252 @post_queue = matched
253 return if @post_queue.empty?
254
255 # and for those that do have feed_posts, we save them normally, in one transaction:
256
257 ActiveRecord::Base.transaction do
258 @post_queue.each do |p|
259 # skip validations since we've checked the posts before adding them to the queue
260 p.save!(validate: false)
261 end
262 end
263
264 @post_queue = []
265 rescue StandardError => e
266 # there shouldn't be any ActiveRecord errors raised, but SQLite might find some issues which
267 # aren't covered by AR validations; so in that case, try to save any valid posts one by one:
268
269 @post_queue.each do |p|
270 begin
271 ActiveRecord::Base.transaction do
272 p.save!(validate: false)
273 end
274 rescue StandardError => e
275 log "Error in #save_queued_posts: #{e}"
276
277 unless e.message == "nesting of 100 is too deep"
278 log p.inspect
279 log e.backtrace.reject { |x| x.include?('/ruby/') }
280 end
281 end
282 end
283
284 @post_queue = []
285 end
286
287 def log(text)
288 puts if @show_progress
289 puts "[#{Time.now}] #{text}"
290 end
291
292 def inspect
293 vars = instance_variables - [:@feeds]
294 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
295 "#<#{self.class}:0x#{object_id} #{values}>"
296 end
297end