Template of a custom feed generator service for the Bluesky network in Ruby
1$LOAD_PATH.unshift(File.expand_path('../..', __dir__))
2
3require 'app/config'
4require 'app/models/feed_post'
5require 'app/models/post'
6require 'app/post_console_printer'
7require 'app/utils'
8
9require 'base64'
10require 'json'
11require 'open-uri'
12require 'set'
13
14
15def get_feed
16 if ENV['KEY'].to_s == ''
17 puts "Please specify feed key as KEY=feedname (the part of the feed's at:// URI after the last slash)"
18 exit 1
19 end
20
21 feed_key = ENV['KEY']
22 feed = BlueFactory.get_feed(feed_key)
23
24 if feed.nil?
25 puts "No feed configured for key '#{feed_key}' - use `BlueFactory.add_feed '#{feed_key}', MyFeed.new`"
26 exit 1
27 end
28
29 feed
30end
31
32def make_jwt(payload)
33 header = { typ: 'JWT', alg: 'ES256K' }
34 sig = 'fakesig'
35
36 fields = [header, payload].map { |d| Base64.encode64(JSON.generate(d)).chomp } + [sig]
37 fields.join('.')
38end
39
40desc "Print posts in the feed, starting from the newest ones (limit = N)"
41task :print_feed do
42 feed = get_feed
43 limit = ENV['N'] ? ENV['N'].to_i : 100
44
45 posts = FeedPost.where(feed_id: feed.feed_id).joins(:post).order('feed_posts.time DESC').limit(limit).map(&:post)
46
47 # this fixes an error when piping a long output to less and then closing without reading it all
48 Signal.trap("SIGPIPE", "SYSTEM_DEFAULT")
49
50 printer = PostConsolePrinter.new(feed)
51
52 posts.each do |s|
53 printer.display(s)
54 end
55end
56
57desc "Print feed by making an HTTP connection to the XRPC endpoint"
58task :test_feed do
59 feed = get_feed
60 limit = ENV['N'] ? ENV['N'].to_i : 100
61 actor = ENV['DID'] || BlueFactory.publisher_did
62 jwt = make_jwt({ iss: actor })
63
64 puts "Loading feed..."
65
66 feed_uri = "at://#{BlueFactory.publisher_did}/app.bsky.feed.generator/#{ENV['KEY']}"
67 port = ENV['PORT'] || BlueFactory::Server.settings.port
68 url = "http://localhost:#{port}/xrpc/app.bsky.feed.getFeedSkeleton?limit=#{limit}&feed=#{feed_uri}"
69 headers = { 'Authorization' => "Bearer #{jwt}" }
70
71 json = JSON.parse(URI.open(url, headers).read)
72 post_uris = json['feed'].map { |x| x['post'] }
73
74 puts "Loading posts..."
75
76 posts = post_uris.map { |uri| Post.find_by_at_uri(uri) }.compact
77
78 Signal.trap("SIGPIPE", "SYSTEM_DEFAULT")
79 printer = PostConsolePrinter.new(feed)
80
81 posts.each do |s|
82 printer.display(s)
83 end
84end
85
86desc "Remove a single post from a feed"
87task :delete_feed_item do
88 feed = get_feed
89
90 if ENV['URL'].to_s == ''
91 puts "Please specify post url as URL=https://bsky.app/..."
92 exit 1
93 end
94
95 url = ENV['URL']
96 parts = url.gsub(/^https:\/\//, '').split('/')
97 author = parts[2]
98 rkey = parts[4]
99
100 if author.start_with?('did:')
101 did = author
102 handle = Utils.handle_from_did(did)
103 else
104 handle = author
105 did = Utils.did_from_handle(handle)
106 end
107
108 if item = FeedPost.joins(:post).find_by(feed_id: feed.feed_id, post: { repo: did, rkey: rkey })
109 item.destroy
110 puts "Deleted post by @#{handle} from #{feed.display_name} feed"
111 else
112 puts "Post not found in the feed"
113 end
114end
115
116desc "Rescan all posts and rebuild the feed from scratch (DAYS = number of days)"
117task :rebuild_feed do
118 feed = get_feed
119 method = ENV['UNSAFE'] ? :tap : :transaction
120 dry = !!ENV['DRY_RUN']
121
122 if ENV['ONLY_EXISTING'] && ENV['APPEND_ONLY']
123 raise "APPEND_ONLY cannot be used together with ONLY_EXISTING"
124 end
125
126 ActiveRecord::Base.send(method) do
127 if ENV['ONLY_EXISTING']
128 rescan_feed_items(feed, dry)
129 else
130 days = ENV['DAYS'] ? ENV['DAYS'].to_i : 7
131 append_only = !!ENV['APPEND_ONLY']
132
133 matched_posts = rebuild_feed(feed, days, append_only, dry)
134
135 if matched_posts && (filename = ENV['TO_FILE'])
136 File.write(filename, matched_posts.map(&:id).to_json)
137 end
138 end
139 end
140end
141
142def rescan_feed_items(feed, dry = false)
143 feed_posts = FeedPost.where(feed_id: feed.feed_id).includes(:post).order('time DESC').to_a
144 total = feed_posts.length
145
146 puts "Processing posts..."
147
148 deleted = []
149
150 feed_posts.each do |fp|
151 if !feed.post_matches?(fp.post)
152 if !dry
153 puts "Deleting from feed: ##{fp.post.id} \"#{fp.post.text}\""
154 fp.destroy
155 end
156
157 deleted << fp.post
158 end
159 end
160
161 if dry
162 Signal.trap("SIGPIPE", "SYSTEM_DEFAULT")
163 printer = PostConsolePrinter.new(feed)
164
165 puts
166 puts Rainbow("Posts to delete:").red
167 puts Rainbow("==============================").red
168 puts
169
170 deleted.each do |p|
171 printer.display(p)
172 end
173
174 puts Rainbow("#{deleted.length} post(s) would be deleted.").red
175 else
176 puts "Done (#{deleted.length} post(s) deleted)."
177 end
178end
179
180def rebuild_feed(feed, days, append_only, dry = false)
181 if append_only
182 feed_posts = FeedPost.where(feed_id: feed.feed_id)
183 current_post_ids = Set.new(feed_posts.pluck('post_id'))
184 elsif !dry
185 print "This will erase and replace the contents of the feed. Continue? [y/n]: "
186 answer = STDIN.readline
187 exit unless answer.strip.downcase == 'y'
188
189 puts "Cleaning up feed..."
190 FeedPost.where(feed_id: feed.feed_id).delete_all
191 current_post_ids = []
192 end
193
194 puts "Counting posts..."
195 posts = Post.order('time, id')
196 start = posts.where("time <= DATETIME('now', '-#{days} days')").last
197 total = start ? Post.where("time > DATETIME('now', '-#{days} days')").count : Post.count
198
199 offset = 0
200 page = 100000
201 matched_posts = []
202
203 loop do
204 batch = if start
205 posts.where("time > ? OR (time = ? AND id > ?)", start.time, start.time, start.id).limit(page).to_a
206 else
207 posts.limit(page).to_a
208 end
209
210 break if batch.empty?
211
212 batch.each_with_index do |post, i|
213 $stderr.print "Processing posts... [#{offset + i + 1}/#{total}]\r" if i % 100 == 99
214 $stderr.flush
215
216 if !current_post_ids.include?(post.id) && feed.post_matches?(post)
217 matched_posts << post
218 FeedPost.create!(feed_id: feed.feed_id, post: post, time: post.time) unless dry
219 end
220 end
221
222 offset += page
223 start = batch.last
224 end
225
226 $stderr.puts "Processing posts... Done." + " " * 30
227
228 if dry || ENV['VERBOSE']
229 if append_only
230 puts (dry ? "Posts to add: " : "Added posts: ") + matched_posts.length.to_s
231 puts "=============================="
232 puts
233 end
234
235 Signal.trap("SIGPIPE", "SYSTEM_DEFAULT")
236 printer = PostConsolePrinter.new(feed)
237
238 matched_posts.reverse.each do |p|
239 printer.display(p)
240 end
241
242 matched_posts
243 end
244end