Don't forget to lycansubscribe
1require 'didkit' 2require 'minisky' 3 4require_relative 'at_uri' 5require_relative 'models/post' 6require_relative 'models/user' 7 8class PostDownloader 9 attr_accessor :report, :logger, :stop_when_empty 10 11 def initialize 12 @sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil) 13 14 @total_count = 0 15 @oldest_imported = Time.now 16 @account_status_cache = {} 17 end 18 19 def import_from_queue(queue) 20 loop do 21 items = queue.pop_batch 22 23 if items.empty? 24 if @stop_when_empty 25 return 26 else 27 sleep 1 28 next 29 end 30 end 31 32 @report&.update(queue: { length: queue.length }) 33 34 process_items(items) 35 end 36 end 37 38 def process_items(items) 39 existing_posts = Post.where(rkey: items.map { |x| AT_URI(x.post_uri).rkey }).to_a 40 41 items.dup.each do |item| 42 if post = existing_posts.detect { |post| post.at_uri == item.post_uri } 43 update_item(item, post) 44 items.delete(item) 45 end 46 end 47 48 return if items.empty? 49 50 begin 51 response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq }) 52 53 response['posts'].each do |data| 54 current_items = items.select { |x| x.post_uri == data['uri'] } 55 items -= current_items 56 57 begin 58 post = save_post(data['uri'], data['record']) 59 60 if post.valid? 61 current_items.each { |i| update_item(i, post) } 62 else 63 @logger&.warn "Invalid post #{data['uri']}: #{post.errors.full_messages.join("; ")}" 64 current_items.each { |i| invalidate_item(i) } 65 end 66 rescue InvalidRecordError => e 67 @logger&.warn "Error in PostDownloader: #{data['uri']}: #{e.class}: #{e}" 68 current_items.each { |i| i.update!(queue: nil) } 69 end 70 end 71 72 check_missing_items(items) 73 rescue StandardError => e 74 @logger&.warn "Error in PostDownloader: #{e.class}: #{e}" 75 end 76 end 77 78 def save_post(post_uri, record) 79 did, _, rkey = AT_URI(post_uri) 80 81 begin 82 author = User.find_or_create_by!(did: did) 83 rescue ActiveRecord::RecordInvalid => e 84 raise InvalidRecordError 85 end 86 87 if post = Post.find_by(user: author, rkey: rkey) 88 return post 89 else 90 post = build_post(author, rkey, record) 91 post.save 92 post 93 end 94 end 95 96 def build_post(author, rkey, record) 97 text = record.delete('text') 98 created = record.delete('createdAt') 99 100 record.delete('$type') 101 102 Post.new( 103 user: author, 104 rkey: rkey, 105 time: Time.parse(created), 106 text: text, 107 data: JSON.generate(record) 108 ) 109 rescue StandardError 110 raise InvalidRecordError 111 end 112 113 def update_item(item, post) 114 item.update!(post: post, post_uri: nil, queue: nil) 115 116 @total_count += 1 117 @oldest_imported = [@oldest_imported, item.time].min 118 119 @report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported }) 120 end 121 122 def invalidate_item(item) 123 @total_count += 1 124 @oldest_imported = [@oldest_imported, item.time].min 125 126 @report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported }) 127 128 item.destroy 129 end 130 131 def check_missing_items(items) 132 return if items.empty? 133 134 dids = items.map { |x| AT_URI(x.post_uri).repo }.uniq 135 response = @sky.get_request('app.bsky.actor.getProfiles', { actors: dids }) 136 active_dids = response['profiles'].map { |x| x['did'] } 137 138 items.each do |item| 139 did = AT_URI(item.post_uri).repo 140 did_obj = DID.new(did) 141 142 if active_dids.include?(did) 143 # account exists but post doesn't, delete the post reference 144 item.destroy 145 else 146 begin 147 status = if @account_status_cache.has_key?(did) # don't retry if status was nil 148 @account_status_cache[did] 149 else 150 @account_status_cache[did] ||= did_obj.account_status 151 end 152 153 case status 154 when :active 155 # account is active but wasn't returned in getProfiles, probably was suspended on the AppView 156 # puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down" 157 item.destroy 158 when nil 159 # account was deleted, so all posts were deleted too 160 # puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted" 161 item.destroy 162 else 163 # account is inactive/suspended, but could come back, so leave it for now 164 # puts "#{item.post_uri}: account #{did} is inactive: #{status}" 165 end 166 rescue StandardError => e 167 hostname = did_obj.document.pds_host rescue "???" 168 @logger&.warn "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}" 169 170 # delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023) 171 item.destroy if hostname == 'bsky.social' 172 end 173 end 174 175 if !item.destroyed? 176 item.update!(queue: nil) 177 end 178 end 179 end 180end