Don't forget to lycansubscribe
1require 'didkit'
2require 'minisky'
3
4require_relative 'at_uri'
5require_relative 'models/post'
6require_relative 'models/user'
7
8class PostDownloader
9 attr_accessor :report, :logger, :stop_when_empty
10
11 def initialize
12 @sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil)
13
14 @total_count = 0
15 @oldest_imported = Time.now
16 @account_status_cache = {}
17 end
18
19 def import_from_queue(queue)
20 loop do
21 items = queue.pop_batch
22
23 if items.empty?
24 if @stop_when_empty
25 return
26 else
27 sleep 1
28 next
29 end
30 end
31
32 @report&.update(queue: { length: queue.length })
33
34 process_items(items)
35 end
36 end
37
38 def process_items(items)
39 existing_posts = Post.where(rkey: items.map { |x| AT_URI(x.post_uri).rkey }).to_a
40
41 items.dup.each do |item|
42 if post = existing_posts.detect { |post| post.at_uri == item.post_uri }
43 update_item(item, post)
44 items.delete(item)
45 end
46 end
47
48 return if items.empty?
49
50 begin
51 response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq })
52
53 response['posts'].each do |data|
54 current_items = items.select { |x| x.post_uri == data['uri'] }
55 items -= current_items
56
57 begin
58 post = save_post(data['uri'], data['record'])
59
60 if post.valid?
61 current_items.each { |i| update_item(i, post) }
62 else
63 @logger&.warn "Invalid post #{data['uri']}: #{post.errors.full_messages.join("; ")}"
64 current_items.each { |i| invalidate_item(i) }
65 end
66 rescue InvalidRecordError => e
67 @logger&.warn "Error in PostDownloader: #{data['uri']}: #{e.class}: #{e}"
68 current_items.each { |i| i.update!(queue: nil) }
69 end
70 end
71
72 check_missing_items(items)
73 rescue StandardError => e
74 @logger&.warn "Error in PostDownloader: #{e.class}: #{e}"
75 end
76 end
77
78 def save_post(post_uri, record)
79 did, _, rkey = AT_URI(post_uri)
80
81 begin
82 author = User.find_or_create_by!(did: did)
83 rescue ActiveRecord::RecordInvalid => e
84 raise InvalidRecordError
85 end
86
87 if post = Post.find_by(user: author, rkey: rkey)
88 return post
89 else
90 post = build_post(author, rkey, record)
91 post.save
92 post
93 end
94 end
95
96 def build_post(author, rkey, record)
97 text = record.delete('text')
98 created = record.delete('createdAt')
99
100 record.delete('$type')
101
102 Post.new(
103 user: author,
104 rkey: rkey,
105 time: Time.parse(created),
106 text: text,
107 data: JSON.generate(record)
108 )
109 rescue StandardError
110 raise InvalidRecordError
111 end
112
113 def update_item(item, post)
114 item.update!(post: post, post_uri: nil, queue: nil)
115
116 @total_count += 1
117 @oldest_imported = [@oldest_imported, item.time].min
118
119 @report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported })
120 end
121
122 def invalidate_item(item)
123 @total_count += 1
124 @oldest_imported = [@oldest_imported, item.time].min
125
126 @report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported })
127
128 item.destroy
129 end
130
131 def check_missing_items(items)
132 return if items.empty?
133
134 dids = items.map { |x| AT_URI(x.post_uri).repo }.uniq
135 response = @sky.get_request('app.bsky.actor.getProfiles', { actors: dids })
136 active_dids = response['profiles'].map { |x| x['did'] }
137
138 items.each do |item|
139 did = AT_URI(item.post_uri).repo
140 did_obj = DID.new(did)
141
142 if active_dids.include?(did)
143 # account exists but post doesn't, delete the post reference
144 item.destroy
145 else
146 begin
147 status = if @account_status_cache.has_key?(did) # don't retry if status was nil
148 @account_status_cache[did]
149 else
150 @account_status_cache[did] ||= did_obj.account_status
151 end
152
153 case status
154 when :active
155 # account is active but wasn't returned in getProfiles, probably was suspended on the AppView
156 # puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down"
157 item.destroy
158 when nil
159 # account was deleted, so all posts were deleted too
160 # puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted"
161 item.destroy
162 else
163 # account is inactive/suspended, but could come back, so leave it for now
164 # puts "#{item.post_uri}: account #{did} is inactive: #{status}"
165 end
166 rescue StandardError => e
167 hostname = did_obj.document.pds_host rescue "???"
168 @logger&.warn "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}"
169
170 # delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023)
171 item.destroy if hostname == 'bsky.social'
172 end
173 end
174
175 if !item.destroyed?
176 item.update!(queue: nil)
177 end
178 end
179 end
180end