Template of a custom feed generator service for the Bluesky network in Ruby
at master 6.3 kB view raw
1$LOAD_PATH.unshift(File.expand_path('../..', __dir__)) 2 3require 'app/config' 4require 'app/models/feed_post' 5require 'app/models/post' 6require 'app/post_console_printer' 7require 'app/utils' 8 9require 'base64' 10require 'json' 11require 'open-uri' 12require 'set' 13 14 15def get_feed 16 if ENV['KEY'].to_s == '' 17 puts "Please specify feed key as KEY=feedname (the part of the feed's at:// URI after the last slash)" 18 exit 1 19 end 20 21 feed_key = ENV['KEY'] 22 feed = BlueFactory.get_feed(feed_key) 23 24 if feed.nil? 25 puts "No feed configured for key '#{feed_key}' - use `BlueFactory.add_feed '#{feed_key}', MyFeed.new`" 26 exit 1 27 end 28 29 feed 30end 31 32def make_jwt(payload) 33 header = { typ: 'JWT', alg: 'ES256K' } 34 sig = 'fakesig' 35 36 fields = [header, payload].map { |d| Base64.encode64(JSON.generate(d)).chomp } + [sig] 37 fields.join('.') 38end 39 40desc "Print posts in the feed, starting from the newest ones (limit = N)" 41task :print_feed do 42 feed = get_feed 43 limit = ENV['N'] ? ENV['N'].to_i : 100 44 45 posts = FeedPost.where(feed_id: feed.feed_id).joins(:post).order('feed_posts.time DESC').limit(limit).map(&:post) 46 47 # this fixes an error when piping a long output to less and then closing without reading it all 48 Signal.trap("SIGPIPE", "SYSTEM_DEFAULT") 49 50 printer = PostConsolePrinter.new(feed) 51 52 posts.each do |s| 53 printer.display(s) 54 end 55end 56 57desc "Print feed by making an HTTP connection to the XRPC endpoint" 58task :test_feed do 59 feed = get_feed 60 limit = ENV['N'] ? ENV['N'].to_i : 100 61 actor = ENV['DID'] || BlueFactory.publisher_did 62 jwt = make_jwt({ iss: actor }) 63 64 puts "Loading feed..." 65 66 feed_uri = "at://#{BlueFactory.publisher_did}/app.bsky.feed.generator/#{ENV['KEY']}" 67 port = ENV['PORT'] || BlueFactory::Server.settings.port 68 url = "http://localhost:#{port}/xrpc/app.bsky.feed.getFeedSkeleton?limit=#{limit}&feed=#{feed_uri}" 69 headers = { 'Authorization' => "Bearer #{jwt}" } 70 71 json = JSON.parse(URI.open(url, headers).read) 72 post_uris = json['feed'].map { |x| x['post'] } 73 74 puts "Loading posts..." 75 76 posts = post_uris.map { |uri| Post.find_by_at_uri(uri) }.compact 77 78 Signal.trap("SIGPIPE", "SYSTEM_DEFAULT") 79 printer = PostConsolePrinter.new(feed) 80 81 posts.each do |s| 82 printer.display(s) 83 end 84end 85 86desc "Remove a single post from a feed" 87task :delete_feed_item do 88 feed = get_feed 89 90 if ENV['URL'].to_s == '' 91 puts "Please specify post url as URL=https://bsky.app/..." 92 exit 1 93 end 94 95 url = ENV['URL'] 96 parts = url.gsub(/^https:\/\//, '').split('/') 97 author = parts[2] 98 rkey = parts[4] 99 100 if author.start_with?('did:') 101 did = author 102 handle = Utils.handle_from_did(did) 103 else 104 handle = author 105 did = Utils.did_from_handle(handle) 106 end 107 108 if item = FeedPost.joins(:post).find_by(feed_id: feed.feed_id, post: { repo: did, rkey: rkey }) 109 item.destroy 110 puts "Deleted post by @#{handle} from #{feed.display_name} feed" 111 else 112 puts "Post not found in the feed" 113 end 114end 115 116desc "Rescan all posts and rebuild the feed from scratch (DAYS = number of days)" 117task :rebuild_feed do 118 feed = get_feed 119 method = ENV['UNSAFE'] ? :tap : :transaction 120 dry = !!ENV['DRY_RUN'] 121 122 if ENV['ONLY_EXISTING'] && ENV['APPEND_ONLY'] 123 raise "APPEND_ONLY cannot be used together with ONLY_EXISTING" 124 end 125 126 ActiveRecord::Base.send(method) do 127 if ENV['ONLY_EXISTING'] 128 rescan_feed_items(feed, dry) 129 else 130 days = ENV['DAYS'] ? ENV['DAYS'].to_i : 7 131 append_only = !!ENV['APPEND_ONLY'] 132 133 matched_posts = rebuild_feed(feed, days, append_only, dry) 134 135 if matched_posts && (filename = ENV['TO_FILE']) 136 File.write(filename, matched_posts.map(&:id).to_json) 137 end 138 end 139 end 140end 141 142def rescan_feed_items(feed, dry = false) 143 feed_posts = FeedPost.where(feed_id: feed.feed_id).includes(:post).order('time DESC').to_a 144 total = feed_posts.length 145 146 puts "Processing posts..." 147 148 deleted = [] 149 150 feed_posts.each do |fp| 151 if !feed.post_matches?(fp.post) 152 if !dry 153 puts "Deleting from feed: ##{fp.post.id} \"#{fp.post.text}\"" 154 fp.destroy 155 end 156 157 deleted << fp.post 158 end 159 end 160 161 if dry 162 Signal.trap("SIGPIPE", "SYSTEM_DEFAULT") 163 printer = PostConsolePrinter.new(feed) 164 165 puts 166 puts Rainbow("Posts to delete:").red 167 puts Rainbow("==============================").red 168 puts 169 170 deleted.each do |p| 171 printer.display(p) 172 end 173 174 puts Rainbow("#{deleted.length} post(s) would be deleted.").red 175 else 176 puts "Done (#{deleted.length} post(s) deleted)." 177 end 178end 179 180def rebuild_feed(feed, days, append_only, dry = false) 181 if append_only 182 feed_posts = FeedPost.where(feed_id: feed.feed_id) 183 current_post_ids = Set.new(feed_posts.pluck('post_id')) 184 elsif !dry 185 print "This will erase and replace the contents of the feed. Continue? [y/n]: " 186 answer = STDIN.readline 187 exit unless answer.strip.downcase == 'y' 188 189 puts "Cleaning up feed..." 190 FeedPost.where(feed_id: feed.feed_id).delete_all 191 current_post_ids = [] 192 end 193 194 puts "Counting posts..." 195 posts = Post.order('time, id') 196 start = posts.where("time <= DATETIME('now', '-#{days} days')").last 197 total = start ? Post.where("time > DATETIME('now', '-#{days} days')").count : Post.count 198 199 offset = 0 200 page = 100000 201 matched_posts = [] 202 203 loop do 204 batch = if start 205 posts.where("time > ? OR (time = ? AND id > ?)", start.time, start.time, start.id).limit(page).to_a 206 else 207 posts.limit(page).to_a 208 end 209 210 break if batch.empty? 211 212 batch.each_with_index do |post, i| 213 $stderr.print "Processing posts... [#{offset + i + 1}/#{total}]\r" if i % 100 == 99 214 $stderr.flush 215 216 if !current_post_ids.include?(post.id) && feed.post_matches?(post) 217 matched_posts << post 218 FeedPost.create!(feed_id: feed.feed_id, post: post, time: post.time) unless dry 219 end 220 end 221 222 offset += page 223 start = batch.last 224 end 225 226 $stderr.puts "Processing posts... Done." + " " * 30 227 228 if dry || ENV['VERBOSE'] 229 if append_only 230 puts (dry ? "Posts to add: " : "Added posts: ") + matched_posts.length.to_s 231 puts "==============================" 232 puts 233 end 234 235 Signal.trap("SIGPIPE", "SYSTEM_DEFAULT") 236 printer = PostConsolePrinter.new(feed) 237 238 matched_posts.reverse.each do |p| 239 printer.display(p) 240 end 241 242 matched_posts 243 end 244end