Template of a custom feed generator service for the Bluesky network in Ruby
1require 'json'
2require 'sinatra/activerecord'
3require 'skyfall'
4
5require_relative 'config'
6require_relative 'models/feed_post'
7require_relative 'models/post'
8require_relative 'models/subscription'
9require_relative 'utils'
10
11class FirehoseStream
12 attr_accessor :start_cursor, :show_progress, :log_status, :log_posts, :save_posts, :replay_events
13
14 DEFAULT_JETSTREAM = 'jetstream2.us-east.bsky.network'
15 POSTS_BATCH_SIZE = 100
16
17 include Utils
18
19 def initialize(service = nil)
20 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
21 @service = service || DEFAULT_JETSTREAM
22
23 @show_progress = (@env == :development) ? true : false
24 @log_status = true
25 @log_posts = (@env == :development) ? :matching : false
26 @save_posts = (@env == :development) ? :all : :matching
27 @replay_events = (@env == :development) ? false : true
28
29 @feeds = BlueFactory.all_feeds.select(&:is_updating?)
30 @post_queue = []
31 end
32
33 def start
34 return if @sky
35
36 last_cursor = load_or_init_cursor
37 cursor = @replay_events ? (@start_cursor || last_cursor) : nil
38
39 @sky = sky = Skyfall::Jetstream.new(@service, {
40 cursor: cursor,
41
42 # we ask Jetstream to only send us post records, since we don't need anything else
43 # if you need to process e.g. likes or follows too, update or remove this param
44 wanted_collections: ['app.bsky.feed.post'],
45 })
46
47 # set your user agent here to identify yourself on the relay
48 # @sky.user_agent = "My Feed Server (@my.handle) #{@sky.version_string}"
49
50 @sky.check_heartbeat = true
51
52 @sky.on_message do |m|
53 process_message(m)
54 end
55
56 if @log_status
57 @sky.on_connecting { |u| log "Connecting to #{u}..." }
58
59 @sky.on_connect {
60 @replaying = !!(cursor)
61 log "Connected ✓"
62 }
63
64 @sky.on_disconnect {
65 log "Disconnected."
66 }
67
68 @sky.on_timeout { log "Trying to reconnect..." }
69 @sky.on_reconnect { log "Connection lost, reconnecting..." }
70 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
71 end
72
73 @sky.connect
74 end
75
76 def stop
77 save_queued_posts
78 save_cursor(@sky.cursor) unless @sky.nil?
79
80 @sky&.disconnect
81 @sky = nil
82 end
83
84 def load_or_init_cursor
85 if sub = Subscription.find_by(service: @service)
86 sub.cursor
87 else
88 Subscription.create!(service: @service, cursor: 0)
89 nil
90 end
91 end
92
93 def save_cursor(cursor)
94 Subscription.where(service: @service).update_all(cursor: cursor)
95 end
96
97 def process_message(msg)
98 if msg.type == :info
99 # AtProto error, the only one right now is "OutdatedCursor"
100 log "InfoMessage: #{msg}"
101
102 elsif msg.type == :identity
103 # use these events if you want to track handle changes:
104 # log "Handle change: #{msg.repo} => #{msg.handle}"
105
106 elsif msg.type == :account
107 # tracking account status changes, e.g. suspensions, deactivations and deletes
108 process_account_message(msg)
109
110 elsif msg.is_a?(Skyfall::Firehose::UnknownMessage)
111 log "Unknown message type: #{msg.type} (#{msg.seq})"
112 end
113
114 return unless msg.type == :commit
115
116 if @replaying
117 log "Replaying events since #{msg.time.getlocal} -->"
118 @replaying = false
119 end
120
121 msg.operations.each do |op|
122 case op.type
123 when :bsky_post
124 process_post(msg, op)
125
126 when :bsky_like, :bsky_repost
127 # if you want to use the number of likes and/or reposts for filtering or sorting:
128 # add a likes/reposts column to feeds, then do +1 / -1 here depending on op.action
129
130 when :bsky_follow
131 # if you want to make a personalized feed that needs info about given user's follows/followers:
132 # add a followers table, then add/remove records here depending on op.action
133
134 else
135 # other types like :bsky_block, :bsky_profile (includes profile edits)
136 end
137 end
138 end
139
140 def process_account_message(msg)
141 if msg.status == :deleted
142 # delete all data we have stored about this account
143 FeedPost.joins(:post).where(post: { repo: msg.did }).delete_all
144 Post.where(repo: msg.did).delete_all
145 end
146 end
147
148 def process_post(msg, op)
149 if op.action == :delete
150 if post = Post.find_by_repo_rkey(op.repo, op.rkey)
151 post.destroy
152 end
153 end
154
155 return unless op.action == :create
156
157 begin
158 if op.raw_record.nil?
159 log "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})"
160 return
161 end
162 rescue CBOR::UnpackError => e
163 log "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}"
164 return
165 end
166
167 # ignore posts with past date from Twitter etc. imported using some kind of tool
168 begin
169 post_time = Time.parse(op.raw_record['createdAt'])
170 return if post_time < msg.time - 86400
171 rescue StandardError => e
172 log "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})"
173 return
174 end
175
176 record = op.raw_record
177 text = record['text']
178
179 # to save space, delete redundant post text and type from the saved data JSON
180 record.delete('$type')
181 record.delete('text')
182 trimmed_json = JSON.generate(record)
183
184 # tip: if you don't need full record data for debugging, delete the data column in posts
185 post = Post.new(
186 repo: op.repo,
187 time: msg.time,
188 text: text,
189 rkey: op.rkey,
190 data: trimmed_json,
191 record: record
192 )
193
194 if !post.valid?
195 if post.errors.has_key?(:data)
196 post.trim_too_long_data
197 end
198
199 if !post.valid?
200 log "Error: post is invalid: #{op.uri} (#{msg.seq}): #{post.errors.to_a.join(', ')}"
201 return
202 end
203 end
204
205 matched = false
206
207 @feeds.each do |feed|
208 if feed.post_matches?(post)
209 post.feed_posts.build(feed_id: feed.feed_id, time: msg.time) unless !@save_posts
210 matched = true
211 end
212 end
213
214 if @log_posts == :all || @log_posts && matched
215 puts
216 puts text
217 end
218
219 if @save_posts == :all || @save_posts && matched
220 @post_queue << post
221 end
222
223 # wait until we have 100 posts and then save them all in one insert, if possible
224 if @post_queue.length >= POSTS_BATCH_SIZE
225 save_queued_posts
226 save_cursor(@sky.cursor)
227 end
228
229 print '.' if @show_progress && @log_posts != :all
230 rescue StandardError => e
231 log "Error in #process_post: #{e}"
232
233 unless e.message == "nesting of 100 is too deep"
234 log msg.inspect
235 log e.backtrace.reject { |x| x.include?('/ruby/') }
236 end
237 end
238
239 def save_queued_posts
240 # we can only squash posts into one insert statement if they don't have nested feed_posts
241 # so we save those without feed_posts first:
242
243 matched, unmatched = @post_queue.partition { |x| x.feed_posts.length > 0 }
244
245 if unmatched.length > 0
246 values = unmatched.map { |p| p.attributes.except('id') }
247 Post.insert_all(values)
248 end
249
250 @post_queue = matched
251 return if @post_queue.empty?
252
253 # and for those that do have feed_posts, we save them normally, in one transaction:
254
255 ActiveRecord::Base.transaction do
256 @post_queue.each do |p|
257 # skip validations since we've checked the posts before adding them to the queue
258 p.save!(validate: false)
259 end
260 end
261
262 @post_queue = []
263 rescue StandardError => e
264 # there shouldn't be any ActiveRecord errors raised, but SQLite might find some issues which
265 # aren't covered by AR validations; so in that case, try to save any valid posts one by one:
266
267 @post_queue.each do |p|
268 begin
269 ActiveRecord::Base.transaction do
270 p.save!(validate: false)
271 end
272 rescue StandardError => e
273 log "Error in #save_queued_posts: #{e}"
274
275 unless e.message == "nesting of 100 is too deep"
276 log p.inspect
277 log e.backtrace.reject { |x| x.include?('/ruby/') }
278 end
279 end
280 end
281
282 @post_queue = []
283 end
284
285 def log(text)
286 puts if @show_progress
287 puts "[#{Time.now}] #{text}"
288 end
289
290 def inspect
291 vars = instance_variables - [:@feeds]
292 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
293 "#<#{self.class}:0x#{object_id} #{values}>"
294 end
295end