Don't forget to lycansubscribe
1require 'time'
2require_relative 'base_importer'
3
4class PostsImporter < BaseImporter
5 def import_items
6 params = { repo: @did, collection: 'app.bsky.feed.post', limit: 100 }
7 params[:cursor] = @import.cursor if @import.cursor
8
9 loop do
10 response = @minisky.get_request('com.atproto.repo.listRecords', params)
11
12 records = response['records']
13 cursor = response['cursor']
14
15 @imported_count += records.length
16 @report&.update(importers: { importer_name => { :imported_items => @imported_count }})
17 @report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty?
18
19 records.each do |record|
20 begin
21 quote = @user.quotes.import_from_record(record['uri'], record['value'], queue: :import)
22 pin = @user.pins.import_from_record(record['uri'], record['value'], queue: :import)
23
24 if @item_queue
25 if quote && quote.pending?
26 @item_queue.push(quote)
27 end
28
29 if pin && pin.pending?
30 @item_queue.push(pin)
31 end
32
33 @report&.update(queue: { length: @item_queue.length })
34 end
35 rescue StandardError => e
36 puts "Error in PostsImporter: #{record['uri']}: #{e}"
37 end
38 end
39
40 params[:cursor] = cursor
41 @import.update!(cursor: cursor)
42
43 break if !cursor
44 break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit }
45 end
46 end
47end