+16
-6
app/firehose_stream.rb
+16
-6
app/firehose_stream.rb
···
57
@sky.on_connecting { |u| log "Connecting to #{u}..." }
58
59
@sky.on_connect {
60
@replaying = !!(cursor)
61
log "Connected ✓"
62
}
···
122
if @replaying
123
log "Replaying events since #{msg.time.getlocal} -->"
124
@replaying = false
125
end
126
127
msg.operations.each do |op|
···
224
puts text
225
end
226
227
-
if @save_posts == :all || @save_posts && matched
228
@post_queue << post
229
-
end
230
231
-
# wait until we have 100 posts and then save them all in one insert, if possible
232
-
if @post_queue.length >= POSTS_BATCH_SIZE
233
-
save_queued_posts
234
-
save_cursor(@sky.cursor)
235
end
236
237
print '.' if @show_progress && @log_posts != :all
···
57
@sky.on_connecting { |u| log "Connecting to #{u}..." }
58
59
@sky.on_connect {
60
+
@message_counter = 0
61
@replaying = !!(cursor)
62
log "Connected ✓"
63
}
···
123
if @replaying
124
log "Replaying events since #{msg.time.getlocal} -->"
125
@replaying = false
126
+
end
127
+
128
+
@message_counter += 1
129
+
130
+
if @message_counter % 100 == 0
131
+
# save current cursor every 100 events
132
+
save_cursor(msg.seq)
133
end
134
135
msg.operations.each do |op|
···
232
puts text
233
end
234
235
+
if @save_posts == :all
236
+
# wait until we have 100 posts and then save them all in one insert, if possible
237
@post_queue << post
238
239
+
if @post_queue.length >= POSTS_BATCH_SIZE
240
+
save_queued_posts
241
+
end
242
+
elsif @save_posts == :matching && matched
243
+
# save immediately because matched posts might be rare; we've already checked validations
244
+
post.save!(validate: false)
245
end
246
247
print '.' if @show_progress && @log_posts != :all