Don't forget to lycansubscribe
1require_relative 'base_importer'
2
3class PostsImporter < BaseImporter
4 def import_items
5 params = { repo: @did, collection: 'app.bsky.feed.post', limit: 100 }
6 params[:cursor] = @import.cursor if @import.cursor
7
8 loop do
9 response = @minisky.get_request('com.atproto.repo.listRecords', params)
10
11 records = response['records']
12 cursor = response['cursor']
13 oldest_date = nil
14
15 records.each do |record|
16 begin
17 quote = @user.quotes.import_from_record(record['uri'], record['value'], queue: :import)
18 pin = @user.pins.import_from_record(record['uri'], record['value'], queue: :import)
19
20 record_date = quote&.time || pin&.time || created_at(record['value'])
21 oldest_date = [oldest_date, record_date].compact.min
22
23 if @item_queue
24 if quote && quote.pending?
25 @item_queue.push(quote)
26 end
27
28 if pin && pin.pending?
29 @item_queue.push(pin)
30 end
31
32 @report&.update(queue: { length: @item_queue.length })
33 end
34 rescue InvalidRecordError => e
35 @logger&.warn "Error in PostsImporter: #{record['uri']}: #{e}"
36 end
37 end
38
39 @imported_count += records.length
40 @report&.update(importers: { importer_name => { :imported_items => @imported_count }})
41 @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date
42
43 params[:cursor] = cursor
44 @import.update!(cursor: cursor, fetched_until: oldest_date)
45
46 break if !cursor
47 break if @time_limit && oldest_date && oldest_date < @time_limit
48 end
49 end
50end