Don't forget to lycansubscribe

process items from the firehose in the worker

Changed files
+32 -8
app
+24 -6
app/import_worker.rb
··· 3 require_relative 'init' 4 require_relative 'import_manager' 5 require_relative 'models/import' 6 require_relative 'reports/basic_report' 7 8 class ImportWorker ··· 30 def run 31 @user_threads = [] 32 33 loop do 34 @user_threads.delete_if { |t| !t.alive? } 35 36 - ActiveRecord::Base.transaction do 37 - if user = User.with_unfinished_imports.where.not(id: @user_threads.map(&:user_id)).first 38 - collections = user.imports.unfinished.map(&:collection) 39 - thread = UserThread.new(user, collections, @verbose) 40 - @user_threads << thread 41 - end 42 end 43 44 # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify 45 sleep 5 46 end 47 end 48 end
··· 3 require_relative 'init' 4 require_relative 'import_manager' 5 require_relative 'models/import' 6 + require_relative 'post_downloader' 7 require_relative 'reports/basic_report' 8 9 class ImportWorker ··· 31 def run 32 @user_threads = [] 33 34 + @firehose_thread = Thread.new { process_firehose_items } 35 + @downloader = PostDownloader.new 36 + 37 loop do 38 @user_threads.delete_if { |t| !t.alive? } 39 40 + if user = User.with_unfinished_imports.where.not(id: @user_threads.map(&:user_id)).first 41 + collections = user.imports.unfinished.map(&:collection) 42 + thread = UserThread.new(user, collections, @verbose) 43 + @user_threads << thread 44 end 45 46 # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify 47 sleep 5 48 + end 49 + end 50 + 51 + def process_firehose_items 52 + loop do 53 + items = [Like, Repost, Pin, Quote] 54 + .map { |type| type.in_queue(:firehose).order('time').limit(25) } 55 + .flatten 56 + .sort_by(&:time) 57 + .first(25) 58 + 59 + if items.length > 0 60 + @downloader.process_items(items) 61 + else 62 + sleep 5 63 + end 64 end 65 end 66 end
+8 -2
app/post_downloader.rb
··· 31 32 @report&.update(queue: { length: queue.length }) 33 34 existing_posts = Post.where(rkey: items.map { |x| AT_URI(x.post_uri).rkey }).to_a 35 36 items.dup.each do |item| ··· 40 end 41 end 42 43 - next if items.empty? 44 45 begin 46 response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq }) ··· 67 rescue StandardError => e 68 puts "Error in PostDownloader: #{e.class}: #{e}" 69 end 70 - end 71 end 72 73 def save_post(post_uri, record)
··· 31 32 @report&.update(queue: { length: queue.length }) 33 34 + process_items(items) 35 + end 36 + end 37 + 38 + def process_items(items) 39 + # 40 existing_posts = Post.where(rkey: items.map { |x| AT_URI(x.post_uri).rkey }).to_a 41 42 items.dup.each do |item| ··· 46 end 47 end 48 49 + return if items.empty? 50 51 begin 52 response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq }) ··· 73 rescue StandardError => e 74 puts "Error in PostDownloader: #{e.class}: #{e}" 75 end 76 + # 77 end 78 79 def save_post(post_uri, record)