Don't forget to lycansubscribe
1require 'active_record'
2require 'minisky'
3require 'time'
4
5require_relative 'init'
6require_relative 'import_manager'
7require_relative 'models/import'
8require_relative 'post_downloader'
9require_relative 'reports/basic_report'
10
11class ImportWorker
12 attr_accessor :verbose
13
14 class UserThread < Thread
15 def initialize(user, collections, verbose = false)
16 @user = user
17 @verbose = verbose
18
19 super { run(collections) }
20 end
21
22 def user_id
23 @user.id
24 end
25
26 def run(collections)
27 if @user.registered_at.nil?
28 registration_time = get_registration_time(@user)
29 @user.update!(registered_at: registration_time)
30 end
31
32 import = ImportManager.new(@user)
33 import.report = BasicReport.new if @verbose
34 import.start(collections)
35 end
36
37 def get_registration_time(user)
38 profile = Minisky.new('public.api.bsky.app', nil).get_request('app.bsky.actor.getProfile', { actor: user.did })
39 Time.parse(profile['createdAt'])
40 end
41 end
42
43 def run
44 @user_threads = []
45
46 @firehose_thread = Thread.new { process_firehose_items }
47 @downloader = PostDownloader.new
48
49 loop do
50 @user_threads.delete_if { |t| !t.alive? }
51
52 if user = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).first
53 collections = user.imports.unfinished.map(&:collection)
54 thread = UserThread.new(user, collections, @verbose)
55 @user_threads << thread
56 end
57
58 # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify
59 sleep 5
60 end
61 end
62
63 def process_firehose_items
64 loop do
65 items = [Like, Repost, Pin, Quote]
66 .map { |type| type.in_queue(:firehose).order('time').limit(25) }
67 .flatten
68 .sort_by(&:time)
69 .first(25)
70
71 if items.length > 0
72 @downloader.process_items(items)
73 else
74 sleep 5
75 end
76 end
77 end
78end