require 'didkit' require 'minisky' require_relative 'at_uri' require_relative 'models/post' require_relative 'models/user' class PostDownloader attr_accessor :report, :logger, :stop_when_empty def initialize @sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil) @total_count = 0 @oldest_imported = Time.now @account_status_cache = {} end def import_from_queue(queue) loop do items = queue.pop_batch if items.empty? if @stop_when_empty return else sleep 1 next end end @report&.update(queue: { length: queue.length }) process_items(items) end end def process_items(items) existing_posts = Post.where(rkey: items.map { |x| AT_URI(x.post_uri).rkey }).to_a items.dup.each do |item| if post = existing_posts.detect { |post| post.at_uri == item.post_uri } update_item(item, post) items.delete(item) end end return if items.empty? begin response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq }) response['posts'].each do |data| current_items = items.select { |x| x.post_uri == data['uri'] } items -= current_items begin post = save_post(data['uri'], data['record']) if post.valid? current_items.each { |i| update_item(i, post) } else @logger&.warn "Invalid post #{data['uri']}: #{post.errors.full_messages.join("; ")}" current_items.each { |i| invalidate_item(i) } end rescue InvalidRecordError => e @logger&.warn "Error in PostDownloader: #{data['uri']}: #{e.class}: #{e}" current_items.each { |i| i.update!(queue: nil) } end end check_missing_items(items) rescue StandardError => e @logger&.warn "Error in PostDownloader: #{e.class}: #{e}" end end def save_post(post_uri, record) did, _, rkey = AT_URI(post_uri) begin author = User.find_or_create_by!(did: did) rescue ActiveRecord::RecordInvalid => e raise InvalidRecordError end if post = Post.find_by(user: author, rkey: rkey) return post else post = build_post(author, rkey, record) post.save post end end def build_post(author, rkey, record) text = record.delete('text') created = record.delete('createdAt') record.delete('$type') Post.new( user: author, rkey: rkey, time: Time.parse(created), text: text, data: JSON.generate(record) ) rescue StandardError raise InvalidRecordError end def update_item(item, post) item.update!(post: post, post_uri: nil, queue: nil) @total_count += 1 @oldest_imported = [@oldest_imported, item.time].min @report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported }) end def invalidate_item(item) @total_count += 1 @oldest_imported = [@oldest_imported, item.time].min @report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported }) item.destroy end def check_missing_items(items) return if items.empty? dids = items.map { |x| AT_URI(x.post_uri).repo }.uniq response = @sky.get_request('app.bsky.actor.getProfiles', { actors: dids }) active_dids = response['profiles'].map { |x| x['did'] } items.each do |item| did = AT_URI(item.post_uri).repo did_obj = DID.new(did) if active_dids.include?(did) # account exists but post doesn't, delete the post reference item.destroy else begin status = if @account_status_cache.has_key?(did) # don't retry if status was nil @account_status_cache[did] else @account_status_cache[did] ||= did_obj.account_status end case status when :active # account is active but wasn't returned in getProfiles, probably was suspended on the AppView # puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down" item.destroy when nil # account was deleted, so all posts were deleted too # puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted" item.destroy else # account is inactive/suspended, but could come back, so leave it for now # puts "#{item.post_uri}: account #{did} is inactive: #{status}" end rescue StandardError => e hostname = did_obj.document.pds_host rescue "???" @logger&.warn "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}" # delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023) item.destroy if hostname == 'bsky.social' end end if !item.destroyed? item.update!(queue: nil) end end end end