Don't forget to lycansubscribe
1require_relative 'base_importer' 2 3class PostsImporter < BaseImporter 4 def import_items 5 params = { repo: @did, collection: 'app.bsky.feed.post', limit: 100 } 6 params[:cursor] = @import.cursor if @import.cursor 7 8 loop do 9 response = @minisky.get_request('com.atproto.repo.listRecords', params) 10 11 records = response['records'] 12 cursor = response['cursor'] 13 oldest_date = nil 14 15 records.each do |record| 16 begin 17 quote = @user.quotes.import_from_record(record['uri'], record['value'], queue: :import) 18 pin = @user.pins.import_from_record(record['uri'], record['value'], queue: :import) 19 20 record_date = quote&.time || pin&.time || created_at(record['value']) 21 oldest_date = [oldest_date, record_date].compact.min 22 23 if @item_queue 24 if quote && quote.pending? 25 @item_queue.push(quote) 26 end 27 28 if pin && pin.pending? 29 @item_queue.push(pin) 30 end 31 32 @report&.update(queue: { length: @item_queue.length }) 33 end 34 rescue InvalidRecordError => e 35 @logger&.warn "Error in PostsImporter: #{record['uri']}: #{e}" 36 end 37 end 38 39 @imported_count += records.length 40 @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 41 @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date 42 43 params[:cursor] = cursor 44 @import.update!(cursor: cursor, fetched_until: oldest_date) 45 46 break if !cursor 47 break if @time_limit && oldest_date && oldest_date < @time_limit 48 end 49 end 50end