Don't forget to lycansubscribe
1require 'didkit'
2require 'minisky'
3
4require_relative 'at_uri'
5require_relative 'models/post'
6require_relative 'models/user'
7
8class PostDownloader
9 attr_accessor :report, :stop_when_empty
10
11 def initialize
12 @sky = Minisky.new(ENV['APPVIEW'] || 'public.api.bsky.app', nil)
13
14 @total_count = 0
15 @oldest_imported = Time.now
16 @account_status_cache = {}
17 end
18
19 def import_from_queue(queue)
20 loop do
21 items = queue.pop_batch
22
23 if items.empty?
24 if @stop_when_empty
25 return
26 else
27 sleep 1
28 next
29 end
30 end
31
32 @report&.update(queue: { length: queue.length })
33
34 existing_posts = Post.where(rkey: items.map { |x| AT_URI(x.post_uri).rkey }).to_a
35
36 items.dup.each do |item|
37 if post = existing_posts.detect { |post| post.at_uri == item.post_uri }
38 update_item(item, post)
39 items.delete(item)
40 end
41 end
42
43 next if items.empty?
44
45 begin
46 response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq })
47
48 response['posts'].each do |data|
49 begin
50 item = items.detect { |x| x.post_uri == data['uri'] }
51 items.delete(item)
52
53 post = save_post(data['uri'], data['record'])
54
55 if post.valid?
56 update_item(item, post)
57 else
58 puts "Invalid post #{item.post_uri}: #{post.errors.full_messages.join("; ")}"
59 invalidate_item(item)
60 end
61 rescue StandardError => e
62 puts "Error in PostDownloader: #{item.post_uri}: #{e.class}: #{e}"
63 end
64 end
65
66 check_missing_items(items)
67 rescue StandardError => e
68 puts "Error in PostDownloader: #{e.class}: #{e}"
69 end
70 end
71 end
72
73 def save_post(post_uri, record)
74 did, _, rkey = AT_URI(post_uri)
75
76 text = record.delete('text')
77 created = record.delete('createdAt')
78
79 author = User.find_or_create_by!(did: did)
80
81 Post.create(
82 user: author,
83 rkey: rkey,
84 time: Time.parse(created),
85 text: text,
86 data: JSON.generate(record)
87 )
88 end
89
90 def update_item(item, post)
91 item.update!(post: post, post_uri: nil)
92
93 @total_count += 1
94 @oldest_imported = [@oldest_imported, item.time].min
95
96 @report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported })
97 end
98
99 def invalidate_item(item)
100 @total_count += 1
101 @oldest_imported = [@oldest_imported, item.time].min
102
103 @report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported })
104
105 item.destroy
106 end
107
108 def check_missing_items(items)
109 return if items.empty?
110
111 dids = items.map { |x| AT_URI(x.post_uri).repo }.uniq
112 response = @sky.get_request('app.bsky.actor.getProfiles', { actors: dids })
113 active_dids = response['profiles'].map { |x| x['did'] }
114
115 items.each do |item|
116 did = AT_URI(item.post_uri).repo
117 did_obj = DID.new(did)
118
119 if active_dids.include?(did)
120 # account exists but post doesn't, delete the post reference
121 item.destroy
122 else
123 begin
124 status = if @account_status_cache.has_key?(did) # don't retry if status was nil
125 @account_status_cache[did]
126 else
127 @account_status_cache[did] ||= did_obj.account_status
128 end
129
130 case status
131 when :active
132 # account is active but wasn't returned in getProfiles, probably was suspended on the AppView
133 puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down"
134 item.destroy
135 when nil
136 # account was deleted, so all posts were deleted too
137 puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted"
138 item.destroy
139 else
140 # account is inactive/suspended, but could come back, so leave it for now
141 puts "#{item.post_uri}: account #{did} is inactive: #{status}"
142 end
143 rescue StandardError => e
144 hostname = did_obj.document.pds_host rescue "???"
145 puts "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}"
146
147 # delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023)
148 item.destroy if hostname == 'bsky.social'
149 end
150 end
151 end
152 end
153end