+58
-2
app/firehose_stream.rb
+58
-2
app/firehose_stream.rb
···
11
11
attr_accessor :start_cursor, :show_progress, :log_status, :log_posts, :save_posts, :replay_events
12
12
13
13
DEFAULT_JETSTREAM = 'jetstream2.us-east.bsky.network'
14
+
POSTS_BATCH_SIZE = 100
14
15
15
16
def initialize(service = nil)
16
17
@env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
···
23
24
@replay_events = (@env == :development) ? false : true
24
25
25
26
@feeds = BlueFactory.all_feeds.select(&:is_updating?)
27
+
@post_queue = []
26
28
end
27
29
28
30
def start
···
70
72
end
71
73
72
74
def stop
75
+
save_queued_posts
73
76
@sky&.disconnect
74
77
@sky = nil
75
78
end
···
203
206
204
207
@feeds.each do |feed|
205
208
if feed.post_matches?(post)
206
-
FeedPost.create!(feed_id: feed.feed_id, post: post, time: msg.time) unless !@save_posts
209
+
post.feed_posts.build(feed_id: feed.feed_id, time: msg.time) unless !@save_posts
207
210
matched = true
208
211
end
209
212
end
···
213
216
puts text
214
217
end
215
218
216
-
post.save! if @save_posts == :all
219
+
if @save_posts == :all || @save_posts && matched
220
+
@post_queue << post
221
+
end
222
+
223
+
# wait until we have 100 posts and then save them all in one insert, if possible
224
+
if @post_queue.length >= POSTS_BATCH_SIZE
225
+
save_queued_posts
226
+
end
217
227
218
228
print '.' if @show_progress && @log_posts != :all
219
229
rescue StandardError => e
···
223
233
log msg.inspect
224
234
log e.backtrace.reject { |x| x.include?('/ruby/') }
225
235
end
236
+
end
237
+
238
+
def save_queued_posts
239
+
# we can only squash posts into one insert statement if they don't have nested feed_posts
240
+
# so we save those without feed_posts first:
241
+
242
+
matched, unmatched = @post_queue.partition { |x| x.feed_posts.length > 0 }
243
+
244
+
if unmatched.length > 0
245
+
values = unmatched.map { |p| p.attributes.except('id') }
246
+
Post.insert_all(values)
247
+
end
248
+
249
+
@post_queue = matched
250
+
return if @post_queue.empty?
251
+
252
+
# and for those that do have feed_posts, we save them normally, in one transaction:
253
+
254
+
ActiveRecord::Base.transaction do
255
+
@post_queue.each do |p|
256
+
# skip validations since we've checked the posts before adding them to the queue
257
+
p.save!(validate: false)
258
+
end
259
+
end
260
+
261
+
@post_queue = []
262
+
rescue StandardError => e
263
+
# there shouldn't be any ActiveRecord errors raised, but SQLite might find some issues which
264
+
# aren't covered by AR validations; so in that case, try to save any valid posts one by one:
265
+
266
+
@post_queue.each do |p|
267
+
begin
268
+
ActiveRecord::Base.transaction do
269
+
p.save!(validate: false)
270
+
end
271
+
rescue StandardError => e
272
+
log "Error in #save_queued_posts: #{e}"
273
+
274
+
unless e.message == "nesting of 100 is too deep"
275
+
log p.inspect
276
+
log e.backtrace.reject { |x| x.include?('/ruby/') }
277
+
end
278
+
end
279
+
end
280
+
281
+
@post_queue = []
226
282
end
227
283
228
284
def log(text)