Template of a custom feed generator service for the Bluesky network in Ruby
1require 'json' 2require 'sinatra/activerecord' 3require 'skyfall' 4 5require_relative 'config' 6require_relative 'models/feed_post' 7require_relative 'models/post' 8require_relative 'models/subscription' 9 10class FirehoseStream 11 attr_accessor :show_progress, :log_status, :log_posts, :save_posts, :replay_events 12 13 DEFAULT_JETSTREAM = 'jetstream2.us-east.bsky.network' 14 15 def initialize(service = nil) 16 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 17 @service = service || DEFAULT_JETSTREAM 18 19 @show_progress = (@env == :development) ? true : false 20 @log_status = true 21 @log_posts = (@env == :development) ? :matching : false 22 @save_posts = (@env == :development) ? :all : :matching 23 @replay_events = (@env == :development) ? false : true 24 25 @feeds = BlueFactory.all_feeds.select(&:is_updating?) 26 end 27 28 def start 29 return if @sky 30 31 last_cursor = load_or_init_cursor 32 cursor = @replay_events ? last_cursor : nil 33 34 @sky = sky = Skyfall::Jetstream.new(@service, { 35 cursor: cursor, 36 37 # we ask Jetstream to only send us post records, since we don't need anything else 38 # if you need to process e.g. likes or follows too, update or remove this param 39 wanted_collections: ['app.bsky.feed.post'], 40 }) 41 42 # set your user agent here to identify yourself on the relay 43 # @sky.user_agent = "My Feed Server (@my.handle) #{@sky.version_string}" 44 45 @sky.check_heartbeat = true 46 47 @sky.on_message do |m| 48 process_message(m) 49 end 50 51 if @log_status 52 @sky.on_connecting { |u| log "Connecting to #{u}..." } 53 54 @sky.on_connect { 55 @replaying = !!(cursor) 56 log "Connected ✓" 57 } 58 59 @sky.on_disconnect { 60 log "Disconnected." 61 save_cursor(sky.cursor) 62 } 63 64 @sky.on_timeout { log "Trying to reconnect..." } 65 @sky.on_reconnect { log "Connection lost, reconnecting..." } 66 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" } 67 end 68 69 @sky.connect 70 end 71 72 def stop 73 @sky&.disconnect 74 @sky = nil 75 end 76 77 def load_or_init_cursor 78 if sub = Subscription.find_by(service: @service) 79 sub.cursor 80 else 81 Subscription.create!(service: @service, cursor: 0) 82 nil 83 end 84 end 85 86 def save_cursor(cursor) 87 Subscription.where(service: @service).update_all(cursor: cursor) 88 end 89 90 def process_message(msg) 91 if msg.type == :info 92 # AtProto error, the only one right now is "OutdatedCursor" 93 log "InfoMessage: #{msg}" 94 95 elsif msg.type == :identity 96 # use these events if you want to track handle changes: 97 # log "Handle change: #{msg.repo} => #{msg.handle}" 98 99 elsif msg.type == :account 100 # tracking account status changes, e.g. suspensions, deactivations and deletes 101 process_account_message(msg) 102 103 elsif msg.is_a?(Skyfall::Firehose::UnknownMessage) 104 log "Unknown message type: #{msg.type} (#{msg.seq})" 105 end 106 107 return unless msg.type == :commit 108 109 if @replaying 110 log "Replaying events since #{msg.time.getlocal} -->" 111 @replaying = false 112 end 113 114 if msg.seq % 10 == 0 115 save_cursor(msg.seq) 116 end 117 118 msg.operations.each do |op| 119 case op.type 120 when :bsky_post 121 process_post(msg, op) 122 123 when :bsky_like, :bsky_repost 124 # if you want to use the number of likes and/or reposts for filtering or sorting: 125 # add a likes/reposts column to feeds, then do +1 / -1 here depending on op.action 126 127 when :bsky_follow 128 # if you want to make a personalized feed that needs info about given user's follows/followers: 129 # add a followers table, then add/remove records here depending on op.action 130 131 else 132 # other types like :bsky_block, :bsky_profile (includes profile edits) 133 end 134 end 135 end 136 137 def process_account_message(msg) 138 if msg.status == :deleted 139 # delete all data we have stored about this account 140 FeedPost.joins(:post).where(post: { repo: msg.did }).delete_all 141 Post.where(repo: msg.did).delete_all 142 end 143 end 144 145 def process_post(msg, op) 146 if op.action == :delete 147 if post = Post.find_by_repo_rkey(op.repo, op.rkey) 148 post.destroy 149 end 150 end 151 152 return unless op.action == :create 153 154 begin 155 if op.raw_record.nil? 156 log "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})" 157 return 158 end 159 rescue CBOR::UnpackError => e 160 log "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}" 161 return 162 end 163 164 # ignore posts with past date from Twitter etc. imported using some kind of tool 165 begin 166 post_time = Time.parse(op.raw_record['createdAt']) 167 return if post_time < msg.time - 86400 168 rescue StandardError => e 169 log "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})" 170 return 171 end 172 173 text = op.raw_record['text'] 174 175 # to save space, delete redundant post text and type from the saved data JSON 176 trimmed_record = op.raw_record.dup 177 trimmed_record.delete('$type') 178 trimmed_record.delete('text') 179 trimmed_json = JSON.generate(trimmed_record) 180 181 # tip: if you don't need full record data for debugging, delete the data column in posts 182 post = Post.new( 183 repo: op.repo, 184 time: msg.time, 185 text: text, 186 rkey: op.rkey, 187 data: trimmed_json, 188 record: op.raw_record 189 ) 190 191 if !post.valid? 192 if post.errors.has_key?(:data) 193 post.trim_too_long_data 194 end 195 196 if !post.valid? 197 log "Error: post is invalid: #{op.uri} (#{msg.seq}): #{post.errors.to_a.join(', ')}" 198 return 199 end 200 end 201 202 matched = false 203 204 @feeds.each do |feed| 205 if feed.post_matches?(post) 206 FeedPost.create!(feed_id: feed.feed_id, post: post, time: msg.time) unless !@save_posts 207 matched = true 208 end 209 end 210 211 if @log_posts == :all || @log_posts && matched 212 puts 213 puts text 214 end 215 216 post.save! if @save_posts == :all 217 218 print '.' if @show_progress && @log_posts != :all 219 rescue StandardError => e 220 log "Error in #process_post: #{e}" 221 222 unless e.message == "nesting of 100 is too deep" 223 log msg.inspect 224 log e.backtrace.reject { |x| x.include?('/ruby/') } 225 end 226 end 227 228 def log(text) 229 puts if @show_progress 230 puts "[#{Time.now}] #{text}" 231 end 232 233 def inspect 234 vars = instance_variables - [:@feeds, :@timer] 235 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 236 "#<#{self.class}:0x#{object_id} #{values}>" 237 end 238end