require 'active_record' require 'minisky' require 'time' require_relative 'init' require_relative 'import_manager' require_relative 'models/import' require_relative 'post_downloader' require_relative 'reports/basic_report' class ImportWorker attr_accessor :verbose, :logger class UserThread < Thread def initialize(user, collections, logger, verbose = false) @user = user @verbose = verbose @logger = logger super { run(collections) } end def user_id @user.id end def run(collections) @logger&.info "Starting import thread for #{@user}" if @user.registered_at.nil? registration_time = get_registration_time(@user) @user.update!(registered_at: registration_time) end import = ImportManager.new(@user) if @logger import.report = BasicReport.new(@logger) if @verbose import.logger = @logger import.log_status_updates = true end import.start(collections) @logger&.info "Ended import thread for #{@user}" end def get_registration_time(user) sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil) profile = sky.get_request('app.bsky.actor.getProfile', { actor: user.did }) Time.parse(profile['createdAt']) end end def run @user_threads = [] @firehose_thread = Thread.new { process_firehose_items } @downloader = PostDownloader.new @downloader.logger = @logger loop do @user_threads.delete_if { |t| !t.alive? } users = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).to_a users.each do |user| collections = user.imports.unfinished.map(&:collection) thread = UserThread.new(user, collections, @logger, @verbose) @user_threads << thread end # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify sleep 5 end end def process_firehose_items loop do items = [Like, Repost, Pin, Quote] .map { |type| type.in_queue(:firehose).order('time').limit(25) } .flatten .sort_by(&:time) .first(25) if items.length > 0 @downloader.process_items(items) else sleep 5 end end end end