Don't forget to lycansubscribe
1require 'active_record'
2require 'minisky'
3require 'time'
4
5require_relative 'init'
6require_relative 'import_manager'
7require_relative 'models/import'
8require_relative 'post_downloader'
9require_relative 'reports/basic_report'
10
11class ImportWorker
12 attr_accessor :verbose, :logger
13
14 class UserThread < Thread
15 def initialize(user, collections, logger, verbose = false)
16 @user = user
17 @verbose = verbose
18 @logger = logger
19
20 super { run(collections) }
21 end
22
23 def user_id
24 @user.id
25 end
26
27 def run(collections)
28 @logger&.info "Starting import thread for #{@user}"
29
30 if @user.registered_at.nil?
31 registration_time = get_registration_time(@user)
32 @user.update!(registered_at: registration_time)
33 end
34
35 import = ImportManager.new(@user)
36
37 if @logger
38 import.report = BasicReport.new(@logger) if @verbose
39 import.logger = @logger
40 import.log_status_updates = true
41 end
42
43 import.start(collections)
44
45 @logger&.info "Ended import thread for #{@user}"
46 end
47
48 def get_registration_time(user)
49 sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil)
50 profile = sky.get_request('app.bsky.actor.getProfile', { actor: user.did })
51
52 Time.parse(profile['createdAt'])
53 end
54 end
55
56 def run
57 @user_threads = []
58
59 @firehose_thread = Thread.new { process_firehose_items }
60 @downloader = PostDownloader.new
61 @downloader.logger = @logger
62
63 loop do
64 @user_threads.delete_if { |t| !t.alive? }
65
66 users = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).to_a
67
68 users.each do |user|
69 collections = user.imports.unfinished.map(&:collection)
70 thread = UserThread.new(user, collections, @logger, @verbose)
71 @user_threads << thread
72 end
73
74 # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify
75 sleep 5
76 end
77 end
78
79 def process_firehose_items
80 loop do
81 items = [Like, Repost, Pin, Quote]
82 .map { |type| type.in_queue(:firehose).order('time').limit(25) }
83 .flatten
84 .sort_by(&:time)
85 .first(25)
86
87 if items.length > 0
88 @downloader.process_items(items)
89 else
90 sleep 5
91 end
92 end
93 end
94end