Don't forget to lycansubscribe

added logging for the workers/importers

+19 -3
app/import_manager.rb
··· 5 5 require_relative 'post_downloader' 6 6 7 7 class ImportManager 8 - attr_accessor :report, :time_limit 8 + attr_accessor :report, :time_limit, :logger, :log_status_updates 9 9 10 10 def initialize(user) 11 11 @user = user ··· 39 39 40 40 downloader = PostDownloader.new 41 41 downloader.report = @report 42 + downloader.logger = @logger 42 43 43 - download_thread = Thread.new { downloader.import_from_queue(queue) } 44 + download_thread = Thread.new do 45 + @logger&.info "Starting downloader thread for #{@user}" if @log_status_updates 46 + 47 + downloader.import_from_queue(queue) 48 + 49 + @logger&.info "Ended downloader thread for #{@user}" if @log_status_updates 50 + end 44 51 45 52 import_threads = importers.map do |import| 46 53 import.item_queue = queue 47 54 import.report = @report 55 + import.logger = @logger 48 56 49 - Thread.new { import.run_import(@time_limit) } 57 + Thread.new do 58 + @logger&.info "Starting #{import.class} thread for #{@user}" if @log_status_updates 59 + 60 + import.run_import(@time_limit) 61 + 62 + @logger&.info "Ended #{import.class} thread for #{@user}" if @log_status_updates 63 + end 50 64 end 51 65 52 66 import_threads.each { |i| i.join } 67 + 68 + @logger&.info "Finished all importer threads for #{@user}, waiting for downloader" if @log_status_updates 53 69 54 70 downloader.stop_when_empty = true 55 71 download_thread.join
+16 -4
app/import_worker.rb
··· 9 9 require_relative 'reports/basic_report' 10 10 11 11 class ImportWorker 12 - attr_accessor :verbose 12 + attr_accessor :verbose, :logger 13 13 14 14 class UserThread < Thread 15 - def initialize(user, collections, verbose = false) 15 + def initialize(user, collections, logger, verbose = false) 16 16 @user = user 17 17 @verbose = verbose 18 + @logger = logger 18 19 19 20 super { run(collections) } 20 21 end ··· 24 25 end 25 26 26 27 def run(collections) 28 + @logger&.info "Starting import thread for #{@user}" 29 + 27 30 if @user.registered_at.nil? 28 31 registration_time = get_registration_time(@user) 29 32 @user.update!(registered_at: registration_time) 30 33 end 31 34 32 35 import = ImportManager.new(@user) 33 - import.report = BasicReport.new if @verbose 36 + 37 + if @logger 38 + import.report = BasicReport.new(@logger) if @verbose 39 + import.logger = @logger 40 + import.log_status_updates = true 41 + end 42 + 34 43 import.start(collections) 44 + 45 + @logger&.info "Ended import thread for #{@user}" 35 46 end 36 47 37 48 def get_registration_time(user) ··· 47 58 48 59 @firehose_thread = Thread.new { process_firehose_items } 49 60 @downloader = PostDownloader.new 61 + @downloader.logger = @logger 50 62 51 63 loop do 52 64 @user_threads.delete_if { |t| !t.alive? } ··· 55 67 56 68 users.each do |user| 57 69 collections = user.imports.unfinished.map(&:collection) 58 - thread = UserThread.new(user, collections, @verbose) 70 + thread = UserThread.new(user, collections, @logger, @verbose) 59 71 @user_threads << thread 60 72 end 61 73
+2 -2
app/importers/base_importer.rb
··· 8 8 require_relative '../models/user' 9 9 10 10 class BaseImporter 11 - attr_accessor :item_queue, :report 11 + attr_accessor :item_queue, :report, :logger 12 12 13 13 def initialize(user) 14 14 @did = DID.new(user.did) ··· 33 33 end 34 34 35 35 @time_limit = requested_time_limit || @import.last_completed 36 - puts "Fetching until: #{@time_limit}" if @time_limit 36 + @logger&.info "Fetching until: #{@time_limit}" if @time_limit 37 37 38 38 import_items 39 39
+1 -1
app/importers/likes_importer.rb
··· 24 24 @report&.update(queue: { length: @item_queue.length }) 25 25 end 26 26 rescue InvalidRecordError => e 27 - puts "Error in LikesImporter: #{record['uri']}: #{e}" 27 + @logger&.warn "Error in LikesImporter: #{record['uri']}: #{e}" 28 28 end 29 29 end 30 30
+1 -1
app/importers/posts_importer.rb
··· 32 32 @report&.update(queue: { length: @item_queue.length }) 33 33 end 34 34 rescue InvalidRecordError => e 35 - puts "Error in PostsImporter: #{record['uri']}: #{e}" 35 + @logger&.warn "Error in PostsImporter: #{record['uri']}: #{e}" 36 36 end 37 37 end 38 38
+1 -1
app/importers/reposts_importer.rb
··· 24 24 @report&.update(queue: { length: @item_queue.length }) 25 25 end 26 26 rescue InvalidRecordError => e 27 - puts "Error in RepostsImporter: #{record['uri']}: #{e}" 27 + @logger&.warn "Error in RepostsImporter: #{record['uri']}: #{e}" 28 28 end 29 29 end 30 30
+4
app/models/user.rb
··· 89 89 90 90 Post.where(user: self).delete_all 91 91 end 92 + 93 + def to_s 94 + %(<User id: #{id}, did: "#{did}">) 95 + end 92 96 end
+8 -8
app/post_downloader.rb
··· 6 6 require_relative 'models/user' 7 7 8 8 class PostDownloader 9 - attr_accessor :report, :stop_when_empty 9 + attr_accessor :report, :logger, :stop_when_empty 10 10 11 11 def initialize 12 12 @sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil) ··· 60 60 if post.valid? 61 61 current_items.each { |i| update_item(i, post) } 62 62 else 63 - puts "Invalid post #{data['uri']}: #{post.errors.full_messages.join("; ")}" 63 + @logger&.warn "Invalid post #{data['uri']}: #{post.errors.full_messages.join("; ")}" 64 64 current_items.each { |i| invalidate_item(i) } 65 65 end 66 66 rescue InvalidRecordError => e 67 - puts "Error in PostDownloader: #{data['uri']}: #{e.class}: #{e}" 67 + @logger&.warn "Error in PostDownloader: #{data['uri']}: #{e.class}: #{e}" 68 68 current_items.each { |i| i.update!(queue: nil) } 69 69 end 70 70 end 71 71 72 72 check_missing_items(items) 73 73 rescue StandardError => e 74 - puts "Error in PostDownloader: #{e.class}: #{e}" 74 + @logger&.warn "Error in PostDownloader: #{e.class}: #{e}" 75 75 end 76 76 end 77 77 ··· 149 149 case status 150 150 when :active 151 151 # account is active but wasn't returned in getProfiles, probably was suspended on the AppView 152 - puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down" 152 + # puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down" 153 153 item.destroy 154 154 when nil 155 155 # account was deleted, so all posts were deleted too 156 - puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted" 156 + # puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted" 157 157 item.destroy 158 158 else 159 159 # account is inactive/suspended, but could come back, so leave it for now 160 - puts "#{item.post_uri}: account #{did} is inactive: #{status}" 160 + # puts "#{item.post_uri}: account #{did} is inactive: #{status}" 161 161 end 162 162 rescue StandardError => e 163 163 hostname = did_obj.document.pds_host rescue "???" 164 - puts "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}" 164 + @logger&.warn "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}" 165 165 166 166 # delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023) 167 167 item.destroy if hostname == 'bsky.social'
+6 -2
app/reports/basic_report.rb
··· 1 1 class BasicReport 2 + def initialize(logger) 3 + @logger = logger 4 + end 5 + 2 6 def update(data) 3 7 data.each do |k, v| 4 8 if k == :downloader 5 - p ({ k => v }) if v[:downloaded_posts] && v[:downloaded_posts] % 100 == 0 9 + @logger.info({ k => v }.inspect) if v[:downloaded_posts] && v[:downloaded_posts] % 100 == 0 6 10 elsif k == :queue 7 11 next 8 12 else 9 - p ({ k => v}) 13 + @logger.info({ k => v}.inspect) 10 14 end 11 15 end 12 16 end
+9
app/reports/simple_logger.rb
··· 1 + require 'logger' 2 + 3 + class SimpleLogger < Logger 4 + def initialize 5 + super(STDOUT) 6 + 7 + self.formatter = proc { |level, time, prog, msg| "[#{time}] #{msg}\n" } 8 + end 9 + end
+7 -2
bin/worker
··· 2 2 3 3 require 'bundler/setup' 4 4 require_relative '../app/import_worker' 5 + require_relative '../app/reports/simple_logger' 5 6 6 7 $stdout.sync = true 7 8 ··· 17 18 puts " -v = verbose" 18 19 end 19 20 21 + logger = SimpleLogger.new 20 22 worker = ImportWorker.new 23 + worker.logger = logger 21 24 22 25 args = ARGV.dup 23 26 ··· 36 39 end 37 40 38 41 trap("SIGINT") { 39 - puts "Stopping..." 42 + puts 43 + puts "[#{Time.now}] Stopping..." 40 44 exit 41 45 } 42 46 43 47 trap("SIGTERM") { 44 - puts "Shutting down the service..." 48 + puts "[#{Time.now}] Shutting down the service..." 45 49 exit 46 50 } 47 51 52 + puts "[#{Time.now}] Starting background worker..." 48 53 worker.run
+2
lib/tasks/import.rake
··· 3 3 require_relative '../../app/models/user' 4 4 require_relative '../../app/post_downloader' 5 5 require_relative '../../app/reports/console_report' 6 + require_relative '../../app/reports/simple_logger' 6 7 7 8 task :enqueue_user do 8 9 unless ENV['DID'] ··· 34 35 35 36 import = ImportManager.new(user) 36 37 import.report = ConsoleReport.new 38 + import.logger = SimpleLogger.new 37 39 import.time_limit = ENV['UNTIL'] 38 40 39 41 trap("SIGINT") {