Don't forget to lycansubscribe
1require 'didkit'
2require 'minisky'
3
4require_relative '../at_uri'
5require_relative '../models/post'
6require_relative '../models/user'
7
8class BaseImporter
9 attr_accessor :item_queue, :report
10
11 def initialize(user_did)
12 @did = DID.new(user_did)
13 @user = User.find_or_create_by!(did: user_did)
14
15 @uid_cache = { user_did => @user.id }
16 @imported_count = 0
17 end
18
19 def importer_name
20 self.class.name
21 end
22
23 def collection
24 importer_name.gsub(/Importer$/, '').downcase
25 end
26
27 def run_import(requested_time_limit = nil)
28 @minisky = Minisky.new(@did.document.pds_host, nil)
29 @import = @user.imports.find_by(collection: collection) || @user.imports.create!(collection: collection)
30
31 if @import.cursor.nil?
32 @import.update!(started_from: Time.now) unless requested_time_limit
33 end
34
35 @time_limit = requested_time_limit || @import.last_completed
36 puts "Fetching until: #{@time_limit}" if @time_limit
37
38 import_items
39
40 @import.update!(last_completed: @import.started_from) unless requested_time_limit
41 @import.update!(cursor: nil, started_from: nil)
42 @report&.update(importers: { importer_name => { :finished => true }})
43 end
44
45 def import_items
46 raise NotImplementedError
47 end
48
49 def create_item_for_post(uri)
50 post_uri = AT_URI(uri)
51 return unless post_uri.is_post?
52
53 post = Post.find_by(
54 user_id: user_id_for_did(post_uri.repo),
55 rkey: post_uri.rkey
56 )
57
58 if post
59 yield({ post: post })
60 else
61 item_stub = yield({ post_uri: post_uri })
62
63 if @item_queue
64 @item_queue.push(item_stub)
65 @report&.update(queue: { length: @item_queue.length })
66 end
67 end
68 end
69
70 def user_id_for_did(did)
71 @uid_cache[did] ||= User.find_or_create_by!(did: did)
72 end
73end