Template of a custom feed generator service for the Bluesky network in Ruby
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 832b4b435614bf02c45ad0ca32b56102f884ec49 233 lines 5.9 kB view raw
1$LOAD_PATH.unshift(File.expand_path('../..', __dir__)) 2 3require 'app/config' 4require 'app/models/feed_post' 5require 'app/models/post' 6require 'app/post_console_printer' 7require 'app/utils' 8 9require 'base64' 10require 'json' 11require 'open-uri' 12require 'set' 13 14 15def get_feed 16 if ENV['KEY'].to_s == '' 17 puts "Please specify feed key as KEY=feedname (the part of the feed's at:// URI after the last slash)" 18 exit 1 19 end 20 21 feed_key = ENV['KEY'] 22 feed = BlueFactory.get_feed(feed_key) 23 24 if feed.nil? 25 puts "No feed configured for key '#{feed_key}' - use `BlueFactory.add_feed '#{feed_key}', MyFeed.new`" 26 exit 1 27 end 28 29 feed 30end 31 32def make_jwt(payload) 33 header = { typ: 'JWT', alg: 'ES256K' } 34 sig = 'fakesig' 35 36 fields = [header, payload].map { |d| Base64.encode64(JSON.generate(d)).chomp } + [sig] 37 fields.join('.') 38end 39 40desc "Print posts in the feed, starting from the newest ones (limit = N)" 41task :print_feed do 42 feed = get_feed 43 limit = ENV['N'] ? ENV['N'].to_i : 100 44 45 posts = FeedPost.where(feed_id: feed.feed_id).joins(:post).order('feed_posts.time DESC').limit(limit).map(&:post) 46 47 # this fixes an error when piping a long output to less and then closing without reading it all 48 Signal.trap("SIGPIPE", "SYSTEM_DEFAULT") 49 50 printer = PostConsolePrinter.new(feed) 51 52 posts.each do |s| 53 printer.display(s) 54 end 55end 56 57desc "Print feed by making an HTTP connection to the XRPC endpoint" 58task :test_feed do 59 feed = get_feed 60 limit = ENV['N'] ? ENV['N'].to_i : 100 61 actor = ENV['DID'] || BlueFactory.publisher_did 62 jwt = make_jwt({ iss: actor }) 63 64 puts "Loading feed..." 65 66 feed_uri = "at://#{BlueFactory.publisher_did}/app.bsky.feed.generator/#{ENV['KEY']}" 67 port = ENV['PORT'] || BlueFactory::Server.settings.port 68 url = "http://localhost:#{port}/xrpc/app.bsky.feed.getFeedSkeleton?limit=#{limit}&feed=#{feed_uri}" 69 headers = { 'Authorization' => "Bearer #{jwt}" } 70 71 json = JSON.parse(URI.open(url, headers).read) 72 post_uris = json['feed'].map { |x| x['post'] } 73 74 puts "Loading posts..." 75 76 posts = post_uris.map { |uri| Post.find_by_at_uri(uri) }.compact 77 78 Signal.trap("SIGPIPE", "SYSTEM_DEFAULT") 79 printer = PostConsolePrinter.new(feed) 80 81 posts.each do |s| 82 printer.display(s) 83 end 84end 85 86desc "Remove a single post from a feed" 87task :delete_feed_item do 88 feed = get_feed 89 90 if ENV['URL'].to_s == '' 91 puts "Please specify post url as URL=https://bsky.app/..." 92 exit 1 93 end 94 95 url = ENV['URL'] 96 parts = url.gsub(/^https:\/\//, '').split('/') 97 author = parts[2] 98 rkey = parts[4] 99 100 if author.start_with?('did:') 101 did = author 102 handle = Utils.handle_from_did(did) 103 else 104 handle = author 105 did = Utils.did_from_handle(handle) 106 end 107 108 if item = FeedPost.joins(:post).find_by(feed_id: feed.feed_id, post: { repo: did, rkey: rkey }) 109 item.destroy 110 puts "Deleted post by @#{handle} from #{feed.display_name} feed" 111 else 112 puts "Post not found in the feed" 113 end 114end 115 116desc "Rescan all posts and rebuild the feed from scratch (DAYS = number of days)" 117task :rebuild_feed do 118 feed = get_feed 119 method = ENV['UNSAFE'] ? :tap : :transaction 120 dry = !!ENV['DRY_RUN'] 121 122 ActiveRecord::Base.send(method) do 123 if ENV['ONLY_EXISTING'] 124 rescan_feed_items(feed, dry) 125 else 126 days = ENV['DAYS'] ? ENV['DAYS'].to_i : 7 127 append_only = !!ENV['APPEND_ONLY'] 128 129 matched_posts = rebuild_feed(feed, days, append_only, dry) 130 131 if matched_posts && (filename = ENV['TO_FILE']) 132 File.write(filename, matched_posts.map(&:id).to_json) 133 end 134 end 135 end 136end 137 138def rescan_feed_items(feed, dry = false) 139 feed_posts = FeedPost.where(feed_id: feed.feed_id).includes(:post).to_a 140 total = feed_posts.length 141 142 puts "Processing posts..." 143 144 deleted = 0 145 146 feed_posts.each do |fp| 147 if !feed.post_matches?(fp.post) 148 if dry 149 puts "Post would be deleted: ##{fp.post.id} \"#{fp.post.text}\"" 150 else 151 puts "Deleting from feed: ##{fp.post.id} \"#{fp.post.text}\"" 152 fp.destroy 153 end 154 deleted += 1 155 end 156 end 157 158 if dry 159 puts "#{deleted} post(s) would be deleted." 160 else 161 puts "Done (#{deleted} post(s) deleted)." 162 end 163end 164 165def rebuild_feed(feed, days, append_only, dry = false) 166 posts = Post.order('time, id') 167 start = posts.where("time <= DATETIME('now', '-#{days} days')").last 168 stop = posts.last 169 first = posts.first 170 total = start ? (stop.id - start.id + 1) : (stop.id - first.id + 1) 171 172 if append_only 173 feed_posts = FeedPost.where(feed_id: feed.feed_id) 174 current_post_ids = Set.new(feed_posts.pluck('post_id')) 175 elsif !dry 176 print "This will erase and replace the contents of the feed. Continue? [y/n]: " 177 answer = STDIN.readline 178 exit unless answer.strip.downcase == 'y' 179 180 puts "Cleaning up feed..." 181 FeedPost.where(feed_id: feed.feed_id).delete_all 182 current_post_ids = [] 183 end 184 185 offset = 0 186 page = 100000 187 matched_posts = [] 188 189 loop do 190 batch = if start 191 posts.where("time > ? OR (time = ? AND id > ?)", start.time, start.time, start.id).limit(page).to_a 192 else 193 posts.limit(page).to_a 194 end 195 196 break if batch.empty? 197 198 batch.each_with_index do |post, i| 199 $stderr.print "Processing posts... [#{offset + i + 1}/#{total}]\r" if i % 100 == 99 200 $stderr.flush 201 202 if !current_post_ids.include?(post.id) && feed.post_matches?(post) 203 if dry 204 matched_posts << post 205 else 206 FeedPost.create!(feed_id: feed.feed_id, post: post, time: post.time) 207 end 208 end 209 end 210 211 offset += page 212 start = batch.last 213 end 214 215 $stderr.puts "Processing posts... Done." + " " * 30 216 217 if dry 218 if append_only 219 puts "Added posts:" 220 puts "==============================" 221 puts 222 end 223 224 Signal.trap("SIGPIPE", "SYSTEM_DEFAULT") 225 printer = PostConsolePrinter.new(feed) 226 227 matched_posts.each do |p| 228 printer.display(p) 229 end 230 231 matched_posts 232 end 233end