Don't forget to lycansubscribe
1require 'didkit' 2require 'minisky' 3 4require_relative 'at_uri' 5require_relative 'models/post' 6require_relative 'models/user' 7 8class PostDownloader 9 attr_accessor :report, :stop_when_empty 10 11 def initialize 12 @sky = Minisky.new(ENV['APPVIEW'] || 'public.api.bsky.app', nil) 13 14 @total_count = 0 15 @oldest_imported = Time.now 16 @account_status_cache = {} 17 end 18 19 def import_from_queue(queue) 20 loop do 21 items = queue.pop_batch 22 23 if items.empty? 24 if @stop_when_empty 25 return 26 else 27 sleep 1 28 next 29 end 30 end 31 32 @report&.update(queue: { length: queue.length }) 33 34 existing_posts = Post.where(rkey: items.map { |x| AT_URI(x.post_uri).rkey }).to_a 35 36 items.dup.each do |item| 37 if post = existing_posts.detect { |post| post.at_uri == item.post_uri } 38 update_item(item, post) 39 items.delete(item) 40 end 41 end 42 43 next if items.empty? 44 45 begin 46 response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq }) 47 48 response['posts'].each do |data| 49 begin 50 item = items.detect { |x| x.post_uri == data['uri'] } 51 items.delete(item) 52 53 post = save_post(data['uri'], data['record']) 54 55 if post.valid? 56 update_item(item, post) 57 else 58 puts "Invalid post #{item.post_uri}: #{post.errors.full_messages.join("; ")}" 59 invalidate_item(item) 60 end 61 rescue StandardError => e 62 puts "Error in PostDownloader: #{item.post_uri}: #{e.class}: #{e}" 63 end 64 end 65 66 check_missing_items(items) 67 rescue StandardError => e 68 puts "Error in PostDownloader: #{e.class}: #{e}" 69 end 70 end 71 end 72 73 def save_post(post_uri, record) 74 did, _, rkey = AT_URI(post_uri) 75 76 text = record.delete('text') 77 created = record.delete('createdAt') 78 79 author = User.find_or_create_by!(did: did) 80 81 Post.create( 82 user: author, 83 rkey: rkey, 84 time: Time.parse(created), 85 text: text, 86 data: JSON.generate(record) 87 ) 88 end 89 90 def update_item(item, post) 91 item.update!(post: post, post_uri: nil) 92 93 @total_count += 1 94 @oldest_imported = [@oldest_imported, item.time].min 95 96 @report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported }) 97 end 98 99 def invalidate_item(item) 100 @total_count += 1 101 @oldest_imported = [@oldest_imported, item.time].min 102 103 @report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported }) 104 105 item.destroy 106 end 107 108 def check_missing_items(items) 109 return if items.empty? 110 111 dids = items.map { |x| AT_URI(x.post_uri).repo }.uniq 112 response = @sky.get_request('app.bsky.actor.getProfiles', { actors: dids }) 113 active_dids = response['profiles'].map { |x| x['did'] } 114 115 items.each do |item| 116 did = AT_URI(item.post_uri).repo 117 did_obj = DID.new(did) 118 119 if active_dids.include?(did) 120 # account exists but post doesn't, delete the post reference 121 item.destroy 122 else 123 begin 124 status = if @account_status_cache.has_key?(did) # don't retry if status was nil 125 @account_status_cache[did] 126 else 127 @account_status_cache[did] ||= did_obj.account_status 128 end 129 130 case status 131 when :active 132 # account is active but wasn't returned in getProfiles, probably was suspended on the AppView 133 puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down" 134 item.destroy 135 when nil 136 # account was deleted, so all posts were deleted too 137 puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted" 138 item.destroy 139 else 140 # account is inactive/suspended, but could come back, so leave it for now 141 puts "#{item.post_uri}: account #{did} is inactive: #{status}" 142 end 143 rescue StandardError => e 144 hostname = did_obj.document.pds_host rescue "???" 145 puts "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}" 146 147 # delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023) 148 item.destroy if hostname == 'bsky.social' 149 end 150 end 151 end 152 end 153end