Template of a custom feed generator service for the Bluesky network in Ruby
1require 'json'
2require 'sinatra/activerecord'
3require 'skyfall'
4
5require_relative 'config'
6require_relative 'models/feed_post'
7require_relative 'models/post'
8require_relative 'models/subscription'
9
10class FirehoseStream
11 attr_accessor :start_cursor, :show_progress, :log_status, :log_posts, :save_posts, :replay_events
12
13 DEFAULT_JETSTREAM = 'jetstream2.us-east.bsky.network'
14 POSTS_BATCH_SIZE = 100
15
16 def initialize(service = nil)
17 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
18 @service = service || DEFAULT_JETSTREAM
19
20 @show_progress = (@env == :development) ? true : false
21 @log_status = true
22 @log_posts = (@env == :development) ? :matching : false
23 @save_posts = (@env == :development) ? :all : :matching
24 @replay_events = (@env == :development) ? false : true
25
26 @feeds = BlueFactory.all_feeds.select(&:is_updating?)
27 @post_queue = []
28 end
29
30 def start
31 return if @sky
32
33 last_cursor = load_or_init_cursor
34 cursor = @replay_events ? (@start_cursor || last_cursor) : nil
35
36 @sky = sky = Skyfall::Jetstream.new(@service, {
37 cursor: cursor,
38
39 # we ask Jetstream to only send us post records, since we don't need anything else
40 # if you need to process e.g. likes or follows too, update or remove this param
41 wanted_collections: ['app.bsky.feed.post'],
42 })
43
44 # set your user agent here to identify yourself on the relay
45 # @sky.user_agent = "My Feed Server (@my.handle) #{@sky.version_string}"
46
47 @sky.check_heartbeat = true
48
49 @sky.on_message do |m|
50 process_message(m)
51 end
52
53 if @log_status
54 @sky.on_connecting { |u| log "Connecting to #{u}..." }
55
56 @sky.on_connect {
57 @replaying = !!(cursor)
58 log "Connected ✓"
59 }
60
61 @sky.on_disconnect {
62 log "Disconnected."
63 }
64
65 @sky.on_timeout { log "Trying to reconnect..." }
66 @sky.on_reconnect { log "Connection lost, reconnecting..." }
67 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
68 end
69
70 @sky.connect
71 end
72
73 def stop
74 save_queued_posts
75 save_cursor(@sky.cursor) unless @sky.nil?
76
77 @sky&.disconnect
78 @sky = nil
79 end
80
81 def load_or_init_cursor
82 if sub = Subscription.find_by(service: @service)
83 sub.cursor
84 else
85 Subscription.create!(service: @service, cursor: 0)
86 nil
87 end
88 end
89
90 def save_cursor(cursor)
91 Subscription.where(service: @service).update_all(cursor: cursor)
92 end
93
94 def process_message(msg)
95 if msg.type == :info
96 # AtProto error, the only one right now is "OutdatedCursor"
97 log "InfoMessage: #{msg}"
98
99 elsif msg.type == :identity
100 # use these events if you want to track handle changes:
101 # log "Handle change: #{msg.repo} => #{msg.handle}"
102
103 elsif msg.type == :account
104 # tracking account status changes, e.g. suspensions, deactivations and deletes
105 process_account_message(msg)
106
107 elsif msg.is_a?(Skyfall::Firehose::UnknownMessage)
108 log "Unknown message type: #{msg.type} (#{msg.seq})"
109 end
110
111 return unless msg.type == :commit
112
113 if @replaying
114 log "Replaying events since #{msg.time.getlocal} -->"
115 @replaying = false
116 end
117
118 msg.operations.each do |op|
119 case op.type
120 when :bsky_post
121 process_post(msg, op)
122
123 when :bsky_like, :bsky_repost
124 # if you want to use the number of likes and/or reposts for filtering or sorting:
125 # add a likes/reposts column to feeds, then do +1 / -1 here depending on op.action
126
127 when :bsky_follow
128 # if you want to make a personalized feed that needs info about given user's follows/followers:
129 # add a followers table, then add/remove records here depending on op.action
130
131 else
132 # other types like :bsky_block, :bsky_profile (includes profile edits)
133 end
134 end
135 end
136
137 def process_account_message(msg)
138 if msg.status == :deleted
139 # delete all data we have stored about this account
140 FeedPost.joins(:post).where(post: { repo: msg.did }).delete_all
141 Post.where(repo: msg.did).delete_all
142 end
143 end
144
145 def process_post(msg, op)
146 if op.action == :delete
147 if post = Post.find_by_repo_rkey(op.repo, op.rkey)
148 post.destroy
149 end
150 end
151
152 return unless op.action == :create
153
154 begin
155 if op.raw_record.nil?
156 log "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})"
157 return
158 end
159 rescue CBOR::UnpackError => e
160 log "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}"
161 return
162 end
163
164 # ignore posts with past date from Twitter etc. imported using some kind of tool
165 begin
166 post_time = Time.parse(op.raw_record['createdAt'])
167 return if post_time < msg.time - 86400
168 rescue StandardError => e
169 log "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})"
170 return
171 end
172
173 text = op.raw_record['text']
174
175 # to save space, delete redundant post text and type from the saved data JSON
176 trimmed_record = op.raw_record.dup
177 trimmed_record.delete('$type')
178 trimmed_record.delete('text')
179 trimmed_json = JSON.generate(trimmed_record)
180
181 # tip: if you don't need full record data for debugging, delete the data column in posts
182 post = Post.new(
183 repo: op.repo,
184 time: msg.time,
185 text: text,
186 rkey: op.rkey,
187 data: trimmed_json,
188 record: op.raw_record
189 )
190
191 if !post.valid?
192 if post.errors.has_key?(:data)
193 post.trim_too_long_data
194 end
195
196 if !post.valid?
197 log "Error: post is invalid: #{op.uri} (#{msg.seq}): #{post.errors.to_a.join(', ')}"
198 return
199 end
200 end
201
202 matched = false
203
204 @feeds.each do |feed|
205 if feed.post_matches?(post)
206 post.feed_posts.build(feed_id: feed.feed_id, time: msg.time) unless !@save_posts
207 matched = true
208 end
209 end
210
211 if @log_posts == :all || @log_posts && matched
212 puts
213 puts text
214 end
215
216 if @save_posts == :all || @save_posts && matched
217 @post_queue << post
218 end
219
220 # wait until we have 100 posts and then save them all in one insert, if possible
221 if @post_queue.length >= POSTS_BATCH_SIZE
222 save_queued_posts
223 save_cursor(@sky.cursor)
224 end
225
226 print '.' if @show_progress && @log_posts != :all
227 rescue StandardError => e
228 log "Error in #process_post: #{e}"
229
230 unless e.message == "nesting of 100 is too deep"
231 log msg.inspect
232 log e.backtrace.reject { |x| x.include?('/ruby/') }
233 end
234 end
235
236 def save_queued_posts
237 # we can only squash posts into one insert statement if they don't have nested feed_posts
238 # so we save those without feed_posts first:
239
240 matched, unmatched = @post_queue.partition { |x| x.feed_posts.length > 0 }
241
242 if unmatched.length > 0
243 values = unmatched.map { |p| p.attributes.except('id') }
244 Post.insert_all(values)
245 end
246
247 @post_queue = matched
248 return if @post_queue.empty?
249
250 # and for those that do have feed_posts, we save them normally, in one transaction:
251
252 ActiveRecord::Base.transaction do
253 @post_queue.each do |p|
254 # skip validations since we've checked the posts before adding them to the queue
255 p.save!(validate: false)
256 end
257 end
258
259 @post_queue = []
260 rescue StandardError => e
261 # there shouldn't be any ActiveRecord errors raised, but SQLite might find some issues which
262 # aren't covered by AR validations; so in that case, try to save any valid posts one by one:
263
264 @post_queue.each do |p|
265 begin
266 ActiveRecord::Base.transaction do
267 p.save!(validate: false)
268 end
269 rescue StandardError => e
270 log "Error in #save_queued_posts: #{e}"
271
272 unless e.message == "nesting of 100 is too deep"
273 log p.inspect
274 log e.backtrace.reject { |x| x.include?('/ruby/') }
275 end
276 end
277 end
278
279 @post_queue = []
280 end
281
282 def log(text)
283 puts if @show_progress
284 puts "[#{Time.now}] #{text}"
285 end
286
287 def inspect
288 vars = instance_variables - [:@feeds, :@timer]
289 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
290 "#<#{self.class}:0x#{object_id} #{values}>"
291 end
292end