Don't forget to lycansubscribe
1require 'didkit' 2require 'minisky' 3 4require_relative '../at_uri' 5require_relative '../models/post' 6require_relative '../models/user' 7 8class BaseImporter 9 attr_accessor :item_queue, :report 10 11 def initialize(user_did) 12 @did = DID.new(user_did) 13 @user = User.find_or_create_by!(did: user_did) 14 15 @uid_cache = { user_did => @user.id } 16 @imported_count = 0 17 end 18 19 def importer_name 20 self.class.name 21 end 22 23 def collection 24 importer_name.gsub(/Importer$/, '').downcase 25 end 26 27 def run_import(requested_time_limit = nil) 28 @minisky = Minisky.new(@did.document.pds_host, nil) 29 @import = @user.imports.find_by(collection: collection) || @user.imports.create!(collection: collection) 30 31 if @import.cursor.nil? 32 @import.update!(started_from: Time.now) unless requested_time_limit 33 end 34 35 @time_limit = requested_time_limit || @import.last_completed 36 puts "Fetching until: #{@time_limit}" if @time_limit 37 38 import_items 39 40 @import.update!(last_completed: @import.started_from) unless requested_time_limit 41 @import.update!(cursor: nil, started_from: nil) 42 @report&.update(importers: { importer_name => { :finished => true }}) 43 end 44 45 def import_items 46 raise NotImplementedError 47 end 48 49 def create_item_for_post(uri) 50 post_uri = AT_URI(uri) 51 return unless post_uri.is_post? 52 53 post = Post.find_by( 54 user_id: user_id_for_did(post_uri.repo), 55 rkey: post_uri.rkey 56 ) 57 58 if post 59 yield({ post: post }) 60 else 61 item_stub = yield({ post_uri: post_uri }) 62 63 if @item_queue 64 @item_queue.push(item_stub) 65 @report&.update(queue: { length: @item_queue.length }) 66 end 67 end 68 end 69 70 def user_id_for_did(did) 71 @uid_cache[did] ||= User.find_or_create_by!(did: did) 72 end 73end