Don't forget to lycansubscribe
1require 'active_record'
2require 'minisky'
3require 'time'
4
5require_relative 'init'
6require_relative 'import_manager'
7require_relative 'models/import'
8require_relative 'post_downloader'
9require_relative 'reports/basic_report'
10
11class ImportWorker
12 attr_accessor :verbose
13
14 class UserThread < Thread
15 def initialize(user, collections, verbose = false)
16 @user = user
17 @verbose = verbose
18
19 super { run(collections) }
20 end
21
22 def user_id
23 @user.id
24 end
25
26 def run(collections)
27 if @user.registered_at.nil?
28 registration_time = get_registration_time(@user)
29 @user.update!(registered_at: registration_time)
30 end
31
32 import = ImportManager.new(@user)
33 import.report = BasicReport.new if @verbose
34 import.start(collections)
35 end
36
37 def get_registration_time(user)
38 sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil)
39 profile = sky.get_request('app.bsky.actor.getProfile', { actor: user.did })
40
41 Time.parse(profile['createdAt'])
42 end
43 end
44
45 def run
46 @user_threads = []
47
48 @firehose_thread = Thread.new { process_firehose_items }
49 @downloader = PostDownloader.new
50
51 loop do
52 @user_threads.delete_if { |t| !t.alive? }
53
54 if user = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).first
55 collections = user.imports.unfinished.map(&:collection)
56 thread = UserThread.new(user, collections, @verbose)
57 @user_threads << thread
58 end
59
60 # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify
61 sleep 5
62 end
63 end
64
65 def process_firehose_items
66 loop do
67 items = [Like, Repost, Pin, Quote]
68 .map { |type| type.in_queue(:firehose).order('time').limit(25) }
69 .flatten
70 .sort_by(&:time)
71 .first(25)
72
73 if items.length > 0
74 @downloader.process_items(items)
75 else
76 sleep 5
77 end
78 end
79 end
80end