Don't forget to lycansubscribe

added worker for background imports

+12 -2
app/firehose_client.rb
··· 1 require 'skyfall' 2 3 require_relative 'init' 4 require_relative 'models/post' 5 require_relative 'models/subscription' 6 require_relative 'models/user' ··· 48 @replaying = true 49 @last_update = Time.now 50 51 - @timer ||= EM::PeriodicTimer.new(20) do 52 now = Time.now 53 diff = now - @last_update 54 ··· 56 log "Timer: last update #{sprintf('%.1f', diff)}s ago" 57 end 58 end 59 } 60 61 @sky.on_disconnect { ··· 135 if msg.status == :deleted 136 if user = User.find_by(did: msg.repo) 137 user.destroy 138 end 139 end 140 end ··· 182 end 183 184 def inspect 185 - vars = instance_variables - [:@timer] 186 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 187 "#<#{self.class}:0x#{object_id} #{values}>" 188 end
··· 1 require 'skyfall' 2 3 require_relative 'init' 4 + require_relative 'models/import_job' 5 require_relative 'models/post' 6 require_relative 'models/subscription' 7 require_relative 'models/user' ··· 49 @replaying = true 50 @last_update = Time.now 51 52 + @live_check_timer ||= EM::PeriodicTimer.new(20) do 53 now = Time.now 54 diff = now - @last_update 55 ··· 57 log "Timer: last update #{sprintf('%.1f', diff)}s ago" 58 end 59 end 60 + 61 + @jobs_timer ||= EM::PeriodicTimer.new(3) do 62 + ImportJob.all.each do |job| 63 + @active_users[job.user.did] = job.user 64 + job.create_imports 65 + job.destroy 66 + end 67 + end 68 } 69 70 @sky.on_disconnect { ··· 144 if msg.status == :deleted 145 if user = User.find_by(did: msg.repo) 146 user.destroy 147 + @active_users.delete_if { |u| u.id == user.id } 148 end 149 end 150 end ··· 192 end 193 194 def inspect 195 + vars = instance_variables - [:@jobs_timer, :@live_check_timer] 196 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 197 "#<#{self.class}:0x#{object_id} #{values}>" 198 end
+42
app/import_worker.rb
···
··· 1 + require 'active_record' 2 + 3 + require_relative 'init' 4 + require_relative 'import_manager' 5 + require_relative 'models/import' 6 + 7 + class ImportWorker 8 + class UserThread < Thread 9 + def initialize(user, collections) 10 + @user = user 11 + super { run(collections) } 12 + end 13 + 14 + def user_id 15 + @user.id 16 + end 17 + 18 + def run(collections) 19 + import = ImportManager.new(@user) 20 + import.start(collections, false) 21 + end 22 + end 23 + 24 + def run 25 + @user_threads = [] 26 + 27 + loop do 28 + @user_threads.delete_if { |t| !t.alive? } 29 + 30 + ActiveRecord::Base.transaction do 31 + if user = User.with_unfinished_imports.where.not(id: @user_threads.map(&:user_id)).first 32 + collections = user.imports.unfinished.map(&:collection) 33 + thread = UserThread.new(user, collections) 34 + @user_threads << thread 35 + end 36 + end 37 + 38 + # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify 39 + sleep 5 40 + end 41 + end 42 + end
+2
app/models/import.rb
··· 7 8 validates_inclusion_of :collection, in: %w(likes reposts posts) 9 validates_uniqueness_of :collection, scope: :user_id 10 end
··· 7 8 validates_inclusion_of :collection, in: %w(likes reposts posts) 9 validates_uniqueness_of :collection, scope: :user_id 10 + 11 + scope :unfinished, -> { where('(started_from IS NOT NULL) OR (last_completed IS NULL)') } 12 end
+13
app/models/import_job.rb
···
··· 1 + require 'active_record' 2 + 3 + require_relative 'user' 4 + 5 + class ImportJob < ActiveRecord::Base 6 + belongs_to :user 7 + 8 + def create_imports 9 + %w(likes reposts posts).each do |group| 10 + self.user.imports.find_or_create_by!(collection: group) 11 + end 12 + end 13 + end
+10
app/models/user.rb
··· 1 require 'active_record' 2 3 require_relative 'import' 4 require_relative 'like' 5 require_relative 'quote' 6 require_relative 'pin' ··· 13 14 has_many :posts 15 has_many :imports, dependent: :delete_all 16 17 before_destroy :delete_posts_cascading 18 ··· 54 55 def self.active 56 self.joins(:imports).distinct 57 end 58 59 def all_pending_items
··· 1 require 'active_record' 2 3 require_relative 'import' 4 + require_relative 'import_job' 5 require_relative 'like' 6 require_relative 'quote' 7 require_relative 'pin' ··· 14 15 has_many :posts 16 has_many :imports, dependent: :delete_all 17 + has_one :import_job, dependent: :delete 18 19 before_destroy :delete_posts_cascading 20 ··· 56 57 def self.active 58 self.joins(:imports).distinct 59 + end 60 + 61 + def self.with_unfinished_imports 62 + self.joins(:imports).merge(Import.unfinished).distinct 63 + end 64 + 65 + def active? 66 + imports.exists? 67 end 68 69 def all_pending_items
+26
bin/worker
···
··· 1 + #!/usr/bin/env ruby 2 + 3 + require 'bundler/setup' 4 + require_relative '../app/import_worker' 5 + 6 + $stdout.sync = true 7 + 8 + if ENV['ARLOG'] == '1' 9 + ActiveRecord::Base.logger = Logger.new(STDOUT) 10 + else 11 + ActiveRecord::Base.logger = nil 12 + end 13 + 14 + worker = ImportWorker.new 15 + 16 + trap("SIGINT") { 17 + puts "Stopping..." 18 + exit 19 + } 20 + 21 + trap("SIGTERM") { 22 + puts "Shutting down the service..." 23 + exit 24 + } 25 + 26 + worker.run
+9
db/migrate/20250920182018_add_import_jobs.rb
···
··· 1 + class AddImportJobs < ActiveRecord::Migration[7.2] 2 + def change 3 + create_table :import_jobs do |t| 4 + t.integer "user_id", null: false 5 + end 6 + 7 + add_index :import_jobs, :user_id, unique: true 8 + end 9 + end
+6 -1
db/schema.rb
··· 10 # 11 # It's strongly recommended that you check this file into your version control system. 12 13 - ActiveRecord::Schema[7.2].define(version: 2025_09_18_024627) do 14 # These are extensions that must be enabled in order to support this database 15 enable_extension "plpgsql" 16 17 create_table "imports", force: :cascade do |t| 18 t.integer "user_id", null: false
··· 10 # 11 # It's strongly recommended that you check this file into your version control system. 12 13 + ActiveRecord::Schema[7.2].define(version: 2025_09_20_182018) do 14 # These are extensions that must be enabled in order to support this database 15 enable_extension "plpgsql" 16 + 17 + create_table "import_jobs", force: :cascade do |t| 18 + t.integer "user_id", null: false 19 + t.index ["user_id"], name: "index_import_jobs_on_user_id", unique: true 20 + end 21 22 create_table "imports", force: :cascade do |t| 23 t.integer "user_id", null: false
+17
lib/tasks/import.rake
··· 4 require_relative '../../app/models/user' 5 require_relative '../../app/post_downloader' 6 7 task :import_user do 8 unless ENV['DID'] 9 raise "Required DID parameter missing"
··· 4 require_relative '../../app/models/user' 5 require_relative '../../app/post_downloader' 6 7 + task :enqueue_user do 8 + unless ENV['DID'] 9 + raise "Required DID parameter missing" 10 + end 11 + 12 + user = User.find_or_create_by!(did: ENV['DID']) 13 + 14 + if user.import_job 15 + puts "Import for #{user.did} is already scheduled." 16 + elsif user.active? 17 + puts "Import for #{user.did} has already started." 18 + else 19 + user.create_import_job! 20 + puts "Import for #{user.did} scheduled ✓" 21 + end 22 + end 23 + 24 task :import_user do 25 unless ENV['DID'] 26 raise "Required DID parameter missing"