Template of a custom feed generator service for the Bluesky network in Ruby
1require 'json'
2require 'sinatra/activerecord'
3require 'skyfall'
4
5require_relative 'config'
6require_relative 'models/feed_post'
7require_relative 'models/post'
8require_relative 'models/subscription'
9
10class FirehoseStream
11 attr_accessor :start_cursor, :show_progress, :log_status, :log_posts, :save_posts, :replay_events
12
13 DEFAULT_JETSTREAM = 'jetstream2.us-east.bsky.network'
14 POSTS_BATCH_SIZE = 100
15
16 def initialize(service = nil)
17 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
18 @service = service || DEFAULT_JETSTREAM
19
20 @show_progress = (@env == :development) ? true : false
21 @log_status = true
22 @log_posts = (@env == :development) ? :matching : false
23 @save_posts = (@env == :development) ? :all : :matching
24 @replay_events = (@env == :development) ? false : true
25
26 @feeds = BlueFactory.all_feeds.select(&:is_updating?)
27 @post_queue = []
28 end
29
30 def start
31 return if @sky
32
33 last_cursor = load_or_init_cursor
34 cursor = @replay_events ? (@start_cursor || last_cursor) : nil
35
36 @sky = sky = Skyfall::Jetstream.new(@service, {
37 cursor: cursor,
38
39 # we ask Jetstream to only send us post records, since we don't need anything else
40 # if you need to process e.g. likes or follows too, update or remove this param
41 wanted_collections: ['app.bsky.feed.post'],
42 })
43
44 # set your user agent here to identify yourself on the relay
45 # @sky.user_agent = "My Feed Server (@my.handle) #{@sky.version_string}"
46
47 @sky.check_heartbeat = true
48
49 @sky.on_message do |m|
50 process_message(m)
51 end
52
53 if @log_status
54 @sky.on_connecting { |u| log "Connecting to #{u}..." }
55
56 @sky.on_connect {
57 @replaying = !!(cursor)
58 log "Connected ✓"
59 }
60
61 @sky.on_disconnect {
62 log "Disconnected."
63 save_cursor(sky.cursor)
64 }
65
66 @sky.on_timeout { log "Trying to reconnect..." }
67 @sky.on_reconnect { log "Connection lost, reconnecting..." }
68 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
69 end
70
71 @sky.connect
72 end
73
74 def stop
75 save_queued_posts
76 @sky&.disconnect
77 @sky = nil
78 end
79
80 def load_or_init_cursor
81 if sub = Subscription.find_by(service: @service)
82 sub.cursor
83 else
84 Subscription.create!(service: @service, cursor: 0)
85 nil
86 end
87 end
88
89 def save_cursor(cursor)
90 Subscription.where(service: @service).update_all(cursor: cursor)
91 end
92
93 def process_message(msg)
94 if msg.type == :info
95 # AtProto error, the only one right now is "OutdatedCursor"
96 log "InfoMessage: #{msg}"
97
98 elsif msg.type == :identity
99 # use these events if you want to track handle changes:
100 # log "Handle change: #{msg.repo} => #{msg.handle}"
101
102 elsif msg.type == :account
103 # tracking account status changes, e.g. suspensions, deactivations and deletes
104 process_account_message(msg)
105
106 elsif msg.is_a?(Skyfall::Firehose::UnknownMessage)
107 log "Unknown message type: #{msg.type} (#{msg.seq})"
108 end
109
110 return unless msg.type == :commit
111
112 if @replaying
113 log "Replaying events since #{msg.time.getlocal} -->"
114 @replaying = false
115 end
116
117 if msg.seq % 10 == 0
118 save_cursor(msg.seq)
119 end
120
121 msg.operations.each do |op|
122 case op.type
123 when :bsky_post
124 process_post(msg, op)
125
126 when :bsky_like, :bsky_repost
127 # if you want to use the number of likes and/or reposts for filtering or sorting:
128 # add a likes/reposts column to feeds, then do +1 / -1 here depending on op.action
129
130 when :bsky_follow
131 # if you want to make a personalized feed that needs info about given user's follows/followers:
132 # add a followers table, then add/remove records here depending on op.action
133
134 else
135 # other types like :bsky_block, :bsky_profile (includes profile edits)
136 end
137 end
138 end
139
140 def process_account_message(msg)
141 if msg.status == :deleted
142 # delete all data we have stored about this account
143 FeedPost.joins(:post).where(post: { repo: msg.did }).delete_all
144 Post.where(repo: msg.did).delete_all
145 end
146 end
147
148 def process_post(msg, op)
149 if op.action == :delete
150 if post = Post.find_by_repo_rkey(op.repo, op.rkey)
151 post.destroy
152 end
153 end
154
155 return unless op.action == :create
156
157 begin
158 if op.raw_record.nil?
159 log "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})"
160 return
161 end
162 rescue CBOR::UnpackError => e
163 log "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}"
164 return
165 end
166
167 # ignore posts with past date from Twitter etc. imported using some kind of tool
168 begin
169 post_time = Time.parse(op.raw_record['createdAt'])
170 return if post_time < msg.time - 86400
171 rescue StandardError => e
172 log "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})"
173 return
174 end
175
176 text = op.raw_record['text']
177
178 # to save space, delete redundant post text and type from the saved data JSON
179 trimmed_record = op.raw_record.dup
180 trimmed_record.delete('$type')
181 trimmed_record.delete('text')
182 trimmed_json = JSON.generate(trimmed_record)
183
184 # tip: if you don't need full record data for debugging, delete the data column in posts
185 post = Post.new(
186 repo: op.repo,
187 time: msg.time,
188 text: text,
189 rkey: op.rkey,
190 data: trimmed_json,
191 record: op.raw_record
192 )
193
194 if !post.valid?
195 if post.errors.has_key?(:data)
196 post.trim_too_long_data
197 end
198
199 if !post.valid?
200 log "Error: post is invalid: #{op.uri} (#{msg.seq}): #{post.errors.to_a.join(', ')}"
201 return
202 end
203 end
204
205 matched = false
206
207 @feeds.each do |feed|
208 if feed.post_matches?(post)
209 post.feed_posts.build(feed_id: feed.feed_id, time: msg.time) unless !@save_posts
210 matched = true
211 end
212 end
213
214 if @log_posts == :all || @log_posts && matched
215 puts
216 puts text
217 end
218
219 if @save_posts == :all || @save_posts && matched
220 @post_queue << post
221 end
222
223 # wait until we have 100 posts and then save them all in one insert, if possible
224 if @post_queue.length >= POSTS_BATCH_SIZE
225 save_queued_posts
226 end
227
228 print '.' if @show_progress && @log_posts != :all
229 rescue StandardError => e
230 log "Error in #process_post: #{e}"
231
232 unless e.message == "nesting of 100 is too deep"
233 log msg.inspect
234 log e.backtrace.reject { |x| x.include?('/ruby/') }
235 end
236 end
237
238 def save_queued_posts
239 # we can only squash posts into one insert statement if they don't have nested feed_posts
240 # so we save those without feed_posts first:
241
242 matched, unmatched = @post_queue.partition { |x| x.feed_posts.length > 0 }
243
244 if unmatched.length > 0
245 values = unmatched.map { |p| p.attributes.except('id') }
246 Post.insert_all(values)
247 end
248
249 @post_queue = matched
250 return if @post_queue.empty?
251
252 # and for those that do have feed_posts, we save them normally, in one transaction:
253
254 ActiveRecord::Base.transaction do
255 @post_queue.each do |p|
256 # skip validations since we've checked the posts before adding them to the queue
257 p.save!(validate: false)
258 end
259 end
260
261 @post_queue = []
262 rescue StandardError => e
263 # there shouldn't be any ActiveRecord errors raised, but SQLite might find some issues which
264 # aren't covered by AR validations; so in that case, try to save any valid posts one by one:
265
266 @post_queue.each do |p|
267 begin
268 ActiveRecord::Base.transaction do
269 p.save!(validate: false)
270 end
271 rescue StandardError => e
272 log "Error in #save_queued_posts: #{e}"
273
274 unless e.message == "nesting of 100 is too deep"
275 log p.inspect
276 log e.backtrace.reject { |x| x.include?('/ruby/') }
277 end
278 end
279 end
280
281 @post_queue = []
282 end
283
284 def log(text)
285 puts if @show_progress
286 puts "[#{Time.now}] #{text}"
287 end
288
289 def inspect
290 vars = instance_variables - [:@feeds, :@timer]
291 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
292 "#<#{self.class}:0x#{object_id} #{values}>"
293 end
294end