Don't forget to lycansubscribe
1require 'active_record'
2require 'minisky'
3require 'time'
4
5require_relative 'init'
6require_relative 'import_manager'
7require_relative 'models/import'
8require_relative 'post_downloader'
9require_relative 'reports/basic_report'
10
11class ImportWorker
12 attr_accessor :verbose
13
14 class UserThread < Thread
15 def initialize(user, collections, verbose = false)
16 @user = user
17 @verbose = verbose
18
19 super { run(collections) }
20 end
21
22 def user_id
23 @user.id
24 end
25
26 def run(collections)
27 if @user.registered_at.nil?
28 registration_time = get_registration_time(@user)
29 @user.update!(registered_at: registration_time)
30 end
31
32 import = ImportManager.new(@user)
33 import.report = BasicReport.new if @verbose
34 import.start(collections)
35 end
36
37 def get_registration_time(user)
38 sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil)
39 profile = sky.get_request('app.bsky.actor.getProfile', { actor: user.did })
40
41 Time.parse(profile['createdAt'])
42 end
43 end
44
45 def run
46 @user_threads = []
47
48 @firehose_thread = Thread.new { process_firehose_items }
49 @downloader = PostDownloader.new
50
51 loop do
52 @user_threads.delete_if { |t| !t.alive? }
53
54 users = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).to_a
55
56 users.each do |user|
57 collections = user.imports.unfinished.map(&:collection)
58 thread = UserThread.new(user, collections, @verbose)
59 @user_threads << thread
60 end
61
62 # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify
63 sleep 5
64 end
65 end
66
67 def process_firehose_items
68 loop do
69 items = [Like, Repost, Pin, Quote]
70 .map { |type| type.in_queue(:firehose).order('time').limit(25) }
71 .flatten
72 .sort_by(&:time)
73 .first(25)
74
75 if items.length > 0
76 @downloader.process_items(items)
77 else
78 sleep 5
79 end
80 end
81 end
82end