Don't forget to lycansubscribe
1require 'active_record' 2require 'minisky' 3require 'time' 4 5require_relative 'init' 6require_relative 'import_manager' 7require_relative 'models/import' 8require_relative 'post_downloader' 9require_relative 'reports/basic_report' 10 11class ImportWorker 12 attr_accessor :verbose 13 14 class UserThread < Thread 15 def initialize(user, collections, verbose = false) 16 @user = user 17 @verbose = verbose 18 19 super { run(collections) } 20 end 21 22 def user_id 23 @user.id 24 end 25 26 def run(collections) 27 if @user.registered_at.nil? 28 registration_time = get_registration_time(@user) 29 @user.update!(registered_at: registration_time) 30 end 31 32 import = ImportManager.new(@user) 33 import.report = BasicReport.new if @verbose 34 import.start(collections) 35 end 36 37 def get_registration_time(user) 38 profile = Minisky.new('public.api.bsky.app', nil).get_request('app.bsky.actor.getProfile', { actor: user.did }) 39 Time.parse(profile['createdAt']) 40 end 41 end 42 43 def run 44 @user_threads = [] 45 46 @firehose_thread = Thread.new { process_firehose_items } 47 @downloader = PostDownloader.new 48 49 loop do 50 @user_threads.delete_if { |t| !t.alive? } 51 52 if user = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).first 53 collections = user.imports.unfinished.map(&:collection) 54 thread = UserThread.new(user, collections, @verbose) 55 @user_threads << thread 56 end 57 58 # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify 59 sleep 5 60 end 61 end 62 63 def process_firehose_items 64 loop do 65 items = [Like, Repost, Pin, Quote] 66 .map { |type| type.in_queue(:firehose).order('time').limit(25) } 67 .flatten 68 .sort_by(&:time) 69 .first(25) 70 71 if items.length > 0 72 @downloader.process_items(items) 73 else 74 sleep 5 75 end 76 end 77 end 78end