Don't forget to lycansubscribe
1require 'active_record' 2require 'minisky' 3require 'time' 4 5require_relative 'init' 6require_relative 'import_manager' 7require_relative 'models/import' 8require_relative 'post_downloader' 9require_relative 'reports/basic_report' 10 11class ImportWorker 12 attr_accessor :verbose 13 14 class UserThread < Thread 15 def initialize(user, collections, verbose = false) 16 @user = user 17 @verbose = verbose 18 19 super { run(collections) } 20 end 21 22 def user_id 23 @user.id 24 end 25 26 def run(collections) 27 if @user.registered_at.nil? 28 registration_time = get_registration_time(@user) 29 @user.update!(registered_at: registration_time) 30 end 31 32 import = ImportManager.new(@user) 33 import.report = BasicReport.new if @verbose 34 import.start(collections) 35 end 36 37 def get_registration_time(user) 38 sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil) 39 profile = sky.get_request('app.bsky.actor.getProfile', { actor: user.did }) 40 41 Time.parse(profile['createdAt']) 42 end 43 end 44 45 def run 46 @user_threads = [] 47 48 @firehose_thread = Thread.new { process_firehose_items } 49 @downloader = PostDownloader.new 50 51 loop do 52 @user_threads.delete_if { |t| !t.alive? } 53 54 users = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).to_a 55 56 users.each do |user| 57 collections = user.imports.unfinished.map(&:collection) 58 thread = UserThread.new(user, collections, @verbose) 59 @user_threads << thread 60 end 61 62 # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify 63 sleep 5 64 end 65 end 66 67 def process_firehose_items 68 loop do 69 items = [Like, Repost, Pin, Quote] 70 .map { |type| type.in_queue(:firehose).order('time').limit(25) } 71 .flatten 72 .sort_by(&:time) 73 .first(25) 74 75 if items.length > 0 76 @downloader.process_items(items) 77 else 78 sleep 5 79 end 80 end 81 end 82end