Don't forget to lycansubscribe
1require 'active_record' 2require 'minisky' 3require 'time' 4 5require_relative 'init' 6require_relative 'import_manager' 7require_relative 'models/import' 8require_relative 'post_downloader' 9require_relative 'reports/basic_report' 10 11class ImportWorker 12 attr_accessor :verbose 13 14 class UserThread < Thread 15 def initialize(user, collections, verbose = false) 16 @user = user 17 @verbose = verbose 18 19 super { run(collections) } 20 end 21 22 def user_id 23 @user.id 24 end 25 26 def run(collections) 27 if @user.registered_at.nil? 28 registration_time = get_registration_time(@user) 29 @user.update!(registered_at: registration_time) 30 end 31 32 import = ImportManager.new(@user) 33 import.report = BasicReport.new if @verbose 34 import.start(collections) 35 end 36 37 def get_registration_time(user) 38 sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil) 39 profile = sky.get_request('app.bsky.actor.getProfile', { actor: user.did }) 40 41 Time.parse(profile['createdAt']) 42 end 43 end 44 45 def run 46 @user_threads = [] 47 48 @firehose_thread = Thread.new { process_firehose_items } 49 @downloader = PostDownloader.new 50 51 loop do 52 @user_threads.delete_if { |t| !t.alive? } 53 54 if user = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).first 55 collections = user.imports.unfinished.map(&:collection) 56 thread = UserThread.new(user, collections, @verbose) 57 @user_threads << thread 58 end 59 60 # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify 61 sleep 5 62 end 63 end 64 65 def process_firehose_items 66 loop do 67 items = [Like, Repost, Pin, Quote] 68 .map { |type| type.in_queue(:firehose).order('time').limit(25) } 69 .flatten 70 .sort_by(&:time) 71 .first(25) 72 73 if items.length > 0 74 @downloader.process_items(items) 75 else 76 sleep 5 77 end 78 end 79 end 80end