Don't forget to lycansubscribe
at master 2.3 kB view raw
1require 'active_record' 2require 'minisky' 3require 'time' 4 5require_relative 'init' 6require_relative 'import_manager' 7require_relative 'models/import' 8require_relative 'post_downloader' 9require_relative 'reports/basic_report' 10 11class ImportWorker 12 attr_accessor :verbose, :logger 13 14 class UserThread < Thread 15 def initialize(user, collections, logger, verbose = false) 16 @user = user 17 @verbose = verbose 18 @logger = logger 19 20 super { run(collections) } 21 end 22 23 def user_id 24 @user.id 25 end 26 27 def run(collections) 28 @logger&.info "Starting import thread for #{@user}" 29 30 if @user.registered_at.nil? 31 registration_time = get_registration_time(@user) 32 @user.update!(registered_at: registration_time) 33 end 34 35 import = ImportManager.new(@user) 36 37 if @logger 38 import.report = BasicReport.new(@logger) if @verbose 39 import.logger = @logger 40 import.log_status_updates = true 41 end 42 43 import.start(collections) 44 45 @logger&.info "Ended import thread for #{@user}" 46 end 47 48 def get_registration_time(user) 49 sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil) 50 profile = sky.get_request('app.bsky.actor.getProfile', { actor: user.did }) 51 52 Time.parse(profile['createdAt']) 53 end 54 end 55 56 def run 57 @user_threads = [] 58 59 @firehose_thread = Thread.new { process_firehose_items } 60 @downloader = PostDownloader.new 61 @downloader.logger = @logger 62 63 loop do 64 @user_threads.delete_if { |t| !t.alive? } 65 66 users = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).to_a 67 68 users.each do |user| 69 collections = user.imports.unfinished.map(&:collection) 70 thread = UserThread.new(user, collections, @logger, @verbose) 71 @user_threads << thread 72 end 73 74 # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify 75 sleep 5 76 end 77 end 78 79 def process_firehose_items 80 loop do 81 items = [Like, Repost, Pin, Quote] 82 .map { |type| type.in_queue(:firehose).order('time').limit(25) } 83 .flatten 84 .sort_by(&:time) 85 .first(25) 86 87 if items.length > 0 88 @downloader.process_items(items) 89 else 90 sleep 5 91 end 92 end 93 end 94end