Don't forget to lycansubscribe

generalize code to allow different types of imports

+39 -68
app/importer.rb
··· 2 2 require 'minisky' 3 3 4 4 require_relative 'at_uri' 5 - require_relative 'models/import' 6 - require_relative 'models/like' 7 5 require_relative 'models/post' 8 6 require_relative 'models/user' 9 7 10 8 class Importer 11 - attr_accessor :like_queue, :report 9 + attr_accessor :item_queue, :report 12 10 13 11 def initialize(user_did) 14 12 @did = DID.new(user_did) 15 13 @user = User.find_or_create_by!(did: user_did) 16 14 17 15 @uid_cache = { user_did => @user.id } 16 + @imported_count = 0 18 17 end 19 18 20 - def run_import(time_limit = nil) 21 - @minisky = Minisky.new(@did.document.pds_host, nil) 19 + def importer_name 20 + self.class.name 21 + end 22 22 23 - import_likes(time_limit) 23 + def collection 24 + importer_name.gsub(/Importer$/, '').downcase 24 25 end 25 26 26 - def import_likes(requested_time_limit = nil) 27 - import = @user.import || @user.create_import! 27 + def run_import(requested_time_limit = nil) 28 + @minisky = Minisky.new(@did.document.pds_host, nil) 29 + @import = @user.imports.find_by(collection: collection) || @user.imports.create!(collection: collection) 28 30 29 - params = { repo: @did, collection: 'app.bsky.feed.like', limit: 100 } 30 - 31 - if import.cursor 32 - params[:cursor] = import.cursor 33 - else 34 - import.update!(started_from: Time.now) unless requested_time_limit 31 + if @import.cursor.nil? 32 + @import.update!(started_from: Time.now) unless requested_time_limit 35 33 end 36 34 37 - count = 0 38 - time_limit = requested_time_limit || import.last_completed 35 + @time_limit = requested_time_limit || @import.last_completed 36 + puts "Fetching until: #{@time_limit}" if @time_limit 39 37 40 - puts "Fetching until: #{time_limit}" if time_limit 38 + import_items 41 39 42 - loop do 43 - response = @minisky.get_request('com.atproto.repo.listRecords', params) 40 + @import.update!(last_completed: @import.started_from) unless requested_time_limit 41 + @import.update!(cursor: nil, started_from: nil) 42 + @report&.update(importers: { importer_name => { :finished => true }}) 43 + end 44 44 45 - records = response['records'] 46 - cursor = response['cursor'] 45 + def import_items 46 + raise NotImplementedError 47 + end 47 48 48 - count += records.length 49 - @report&.update(importer: { imported_likes: count }) 50 - @report&.update(importer: { oldest_date: Time.parse(records.last['value']['createdAt']) }) unless records.empty? 49 + def create_item_for_post(uri) 50 + post_uri = AT_URI(uri) 51 + return if post_uri.collection != 'app.bsky.feed.post' 51 52 52 - process_likes(records) 53 - params[:cursor] = cursor 53 + post = Post.find_by( 54 + user_id: user_id_for_did(post_uri.repo), 55 + rkey: post_uri.rkey 56 + ) 54 57 55 - import.update!(cursor: cursor) 58 + if post 59 + yield({ post: post }) 60 + else 61 + item_stub = yield({ post_uri: post_uri }) 56 62 57 - break if !cursor 58 - break if time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < time_limit } 63 + if @item_queue 64 + @item_queue.push(item_stub) 65 + @report&.update(queue: { length: @item_queue.length }) 66 + end 59 67 end 60 - 61 - import.update!(cursor: nil, started_from: nil) 62 - import.update!(last_completed: import.started_from) unless requested_time_limit 63 - 64 - @report&.update(importer: { finished: true }) 65 68 end 66 69 67 - def process_likes(likes) 68 - likes.each do |record| 69 - begin 70 - like_rkey = AT_URI(record['uri']).rkey 71 - next if @user.likes.where(rkey: like_rkey).exists? 72 - 73 - like_time = Time.parse(record['value']['createdAt']) 74 - 75 - post_uri = AT_URI(record['value']['subject']['uri']) 76 - next if post_uri.collection != 'app.bsky.feed.post' 77 - 78 - post_did = post_uri.repo 79 - 80 - if @uid_cache[post_did].nil? 81 - post_author = User.find_or_create_by!(did: post_did) 82 - @uid_cache[post_did] = post_author.id 83 - end 84 - 85 - post = Post.find_by(user_id: @uid_cache[post_did], rkey: post_uri.rkey) 86 - 87 - if post 88 - @user.likes.create!(rkey: like_rkey, time: like_time, post: post) 89 - else 90 - like_stub = @user.likes.create!(rkey: like_rkey, time: like_time, post_uri: post_uri) 91 - 92 - if @like_queue 93 - @like_queue.push(like_stub) 94 - @report&.update(queue: { length: @like_queue.length }) 95 - end 96 - end 97 - rescue StandardError => e 98 - puts "Error in Importer#process_likes: #{record['uri']}: #{e}" 99 - end 100 - end 70 + def user_id_for_did(did) 71 + @uid_cache[did] ||= User.find_or_create_by!(did: did) 101 72 end 102 73 end
+44
app/importers/likes_importer.rb
··· 1 + require 'time' 2 + 3 + require_relative '../at_uri' 4 + require_relative '../importer' 5 + 6 + class LikesImporter < Importer 7 + def import_items 8 + params = { repo: @did, collection: 'app.bsky.feed.like', limit: 100 } 9 + params[:cursor] = @import.cursor if @import.cursor 10 + 11 + loop do 12 + response = @minisky.get_request('com.atproto.repo.listRecords', params) 13 + 14 + records = response['records'] 15 + cursor = response['cursor'] 16 + 17 + @imported_count += records.length 18 + @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 19 + @report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty? 20 + 21 + records.each do |record| 22 + begin 23 + like_rkey = AT_URI(record['uri']).rkey 24 + next if @user.likes.where(rkey: like_rkey).exists? 25 + 26 + like_time = Time.parse(record['value']['createdAt']) 27 + post_uri = record['value']['subject']['uri'] 28 + 29 + create_item_for_post(post_uri) do |args| 30 + @user.likes.create!(args.merge(rkey: like_rkey, time: like_time)) 31 + end 32 + rescue StandardError => e 33 + puts "Error in LikesImporter: #{record['uri']}: #{e}" 34 + end 35 + end 36 + 37 + params[:cursor] = cursor 38 + @import.update!(cursor: cursor) 39 + 40 + break if !cursor 41 + break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit } 42 + end 43 + end 44 + end
+5 -5
app/like_queue.rb app/item_queue.rb
··· 1 - class LikeQueue 1 + class ItemQueue 2 2 BATCH_SIZE = 25 3 3 4 - def initialize(likes = []) 4 + def initialize(items = []) 5 5 @mutex = Mutex.new 6 - @queue = likes 6 + @queue = items 7 7 end 8 8 9 - def push(like) 9 + def push(item) 10 10 @mutex.synchronize { 11 - @queue << like 11 + @queue << item 12 12 } 13 13 end 14 14
+3
app/models/import.rb
··· 4 4 5 5 class Import < ActiveRecord::Base 6 6 belongs_to :user 7 + 8 + validates_inclusion_of :collection, in: %w(likes) 9 + validates_uniqueness_of :collection, scope: :user_id 7 10 end
+1 -1
app/models/user.rb
··· 10 10 11 11 has_many :posts 12 12 has_many :likes, foreign_key: 'actor_id' 13 - has_one :import 13 + has_many :imports 14 14 end
+22
db/migrate/20250903215014_add_import_collections.rb
··· 1 + class AddImportCollections < ActiveRecord::Migration[7.2] 2 + def change 3 + add_column :imports, :collection, :string, limit: 20, null: true 4 + 5 + reversible do |dir| 6 + dir.up do 7 + execute "UPDATE imports SET collection = 'likes'" 8 + end 9 + end 10 + 11 + change_column_null :imports, :collection, false 12 + 13 + remove_index :imports, :user_id, unique: true 14 + add_index :imports, [:user_id, :collection], unique: true 15 + 16 + reversible do |dir| 17 + dir.down do 18 + execute "DELETE FROM imports WHERE collection != 'likes'" 19 + end 20 + end 21 + end 22 + end
+3 -2
db/schema.rb
··· 10 10 # 11 11 # It's strongly recommended that you check this file into your version control system. 12 12 13 - ActiveRecord::Schema[7.2].define(version: 2025_08_31_210930) do 13 + ActiveRecord::Schema[7.2].define(version: 2025_09_03_215014) do 14 14 # These are extensions that must be enabled in order to support this database 15 15 enable_extension "plpgsql" 16 16 ··· 19 19 t.string "cursor" 20 20 t.datetime "started_from" 21 21 t.datetime "last_completed" 22 - t.index ["user_id"], name: "index_imports_on_user_id", unique: true 22 + t.string "collection", limit: 20, null: false 23 + t.index ["user_id", "collection"], name: "index_imports_on_user_id_and_collection", unique: true 23 24 end 24 25 25 26 create_table "likes", force: :cascade do |t|
+32 -14
lib/tasks/import.rake
··· 1 - require_relative '../../app/importer' 2 - require_relative '../../app/like_queue' 1 + require_relative '../../app/importers/likes_importer' 2 + require_relative '../../app/item_queue' 3 3 require_relative '../../app/post_downloader' 4 4 5 5 class ImportReport ··· 9 9 end 10 10 11 11 def update(data) 12 - data.each do |k, v| 13 - @data[k] ||= {} 14 - @data[k].update(v) 12 + deep_merge(@data, data) 13 + render 14 + end 15 + 16 + def deep_merge(target, updates) 17 + updates.each do |k, v| 18 + if v.is_a?(Hash) 19 + target[k] ||= {} 20 + deep_merge(target[k], v) 21 + else 22 + target[k] = v 23 + end 15 24 end 16 - 17 - render 18 25 end 19 26 20 27 def render 21 28 print " " * 80 + "\r" 22 29 puts "Elapsed time: #{(Time.now - @start).to_i} s" 23 30 24 - print " " * 80 + "\r" 25 - puts "Importer: imported likes = #{@data.dig(:importer, :imported_likes) || 0} (until: #{@data.dig(:importer, :oldest_date)})" + 26 - "#{" (DONE)" if @data.dig(:importer, :finished)}" 31 + importers = @data[:importers] || {} 32 + 33 + importers.each do |name, data| 34 + print " " * 80 + "\r" 35 + puts "#{name}: imported items = #{data[:imported_items] || 0} (until: #{data[:oldest_date]}) #{" (DONE)" if data[:finished]}" 36 + end 27 37 28 38 print " " * 80 + "\r" 29 39 puts "Downloader: imported posts = #{@data.dig(:downloader, :downloaded_posts) || 0} (until: #{@data.dig(:downloader, :oldest_date)})" ··· 31 41 print " " * 80 + "\r" 32 42 puts "Queue size: #{@data.dig(:queue, :length) || 0}" 33 43 34 - print "\e[4A" 44 + print "\e[#{3 + importers.length}A" 35 45 end 36 46 end 37 47 ··· 40 50 raise "Required USER parameter missing" 41 51 end 42 52 43 - queue = LikeQueue.new(Like.where(post: nil).to_a) 53 + case ENV['COLLECTION'] 54 + when 'likes' 55 + queue = ItemQueue.new(Like.where(post: nil).to_a) 56 + importer = LikesImporter.new(ENV['USER']) 57 + when nil 58 + raise "Required COLLECTION parameter missing" 59 + else 60 + raise "Invalid collection: #{ENV['COLLECTION']}" 61 + end 62 + 44 63 report = ImportReport.new 45 64 46 - importer = Importer.new(ENV['USER']) 47 - importer.like_queue = queue 65 + importer.item_queue = queue 48 66 importer.report = report 49 67 50 68 downloader = PostDownloader.new