Template of a custom feed generator service for the Bluesky network in Ruby
1require 'json' 2require 'sinatra/activerecord' 3require 'skyfall' 4 5require_relative 'config' 6require_relative 'models/feed_post' 7require_relative 'models/post' 8require_relative 'models/subscription' 9 10class FirehoseStream 11 attr_accessor :show_progress, :log_status, :log_posts, :save_posts, :replay_events 12 13 DEFAULT_RELAY = 'bsky.network' 14 15 def initialize(service = nil) 16 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 17 @service = service || DEFAULT_RELAY 18 19 @show_progress = (@env == :development) ? true : false 20 @log_status = true 21 @log_posts = (@env == :development) ? :matching : false 22 @save_posts = (@env == :development) ? :all : :matching 23 @replay_events = (@env == :development) ? false : true 24 25 @feeds = BlueFactory.all_feeds.select(&:is_updating?) 26 end 27 28 def start 29 return if @sky 30 31 last_cursor = load_or_init_cursor 32 cursor = @replay_events ? last_cursor : nil 33 34 @sky = sky = Skyfall::Stream.new(@service, :subscribe_repos, cursor) 35 36 @sky.on_message do |m| 37 process_message(m) 38 end 39 40 if @log_status 41 @sky.on_connecting { |u| puts "Connecting to #{u}..." } 42 @sky.on_connect { 43 @replaying = !!(cursor) 44 puts "Connected #{Time.now}" 45 } 46 @sky.on_disconnect { 47 puts 48 puts "Disconnected #{Time.now}" 49 save_cursor(sky.cursor) 50 } 51 @sky.on_reconnect { puts "Connection lost, reconnecting..." } 52 @sky.on_error { |e| puts "ERROR: #{Time.now} #{e.class} #{e.message}" } 53 end 54 55 @sky.connect 56 end 57 58 def stop 59 @sky&.disconnect 60 @sky = nil 61 end 62 63 def load_or_init_cursor 64 if sub = Subscription.find_by(service: @service) 65 sub.cursor 66 else 67 Subscription.create!(service: @service, cursor: 0) 68 nil 69 end 70 end 71 72 def save_cursor(cursor) 73 Subscription.where(service: @service).update_all(cursor: cursor) 74 end 75 76 def process_message(msg) 77 if msg.is_a?(Skyfall::InfoMessage) 78 # AtProto error, the only one right now is "OutdatedCursor" 79 puts "InfoMessage: #{msg}" 80 elsif msg.is_a?(Skyfall::HandleMessage) 81 # use these events if you want to track handle changes: 82 # puts "Handle change: #{msg.repo} => #{msg.handle}" 83 elsif msg.is_a?(Skyfall::UnknownMessage) 84 puts "Unknown message type: #{msg.type} (#{msg.seq})" 85 end 86 87 return unless msg.type == :commit 88 89 if @replaying 90 puts "Replaying events since #{msg.time.getlocal} -->" 91 @replaying = false 92 end 93 94 if msg.seq % 10 == 0 95 save_cursor(msg.seq) 96 end 97 98 msg.operations.each do |op| 99 case op.type 100 when :bsky_post 101 process_post(msg, op) 102 103 when :bsky_like, :bsky_repost 104 # if you want to use the number of likes and/or reposts for filtering or sorting: 105 # add a likes/reposts column to feeds, then do +1 / -1 here depending on op.action 106 107 when :bsky_follow 108 # if you want to make a personalized feed that needs info about given user's follows/followers: 109 # add a followers table, then add/remove records here depending on op.action 110 111 else 112 # other types like :bsky_block, :bsky_profile (includes profile edits) 113 end 114 end 115 end 116 117 def process_post(msg, op) 118 if op.action == :delete 119 if post = Post.find_by(repo: op.repo, rkey: op.rkey) 120 post.destroy 121 end 122 end 123 124 return unless op.action == :create 125 126 begin 127 if op.raw_record.nil? 128 puts "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})" 129 return 130 end 131 rescue CBOR::UnpackError => e 132 puts "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}" 133 return 134 end 135 136 # ignore posts with past date from Twitter etc. imported using some kind of tool 137 begin 138 post_time = Time.parse(op.raw_record['createdAt']) 139 return if post_time < msg.time - 86400 140 rescue StandardError => e 141 puts "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})" 142 return 143 end 144 145 text = op.raw_record['text'] 146 147 # to save space, delete redundant post text and type from the saved data JSON 148 trimmed_record = op.raw_record.dup 149 trimmed_record.delete('$type') 150 trimmed_record.delete('text') 151 trimmed_json = JSON.generate(trimmed_record) 152 153 # tip: if you don't need full record data for debugging, delete the data column in posts 154 post = Post.new( 155 repo: op.repo, 156 time: msg.time, 157 text: text, 158 rkey: op.rkey, 159 data: trimmed_json, 160 record: op.raw_record 161 ) 162 163 matched = false 164 165 @feeds.each do |feed| 166 if feed.post_matches?(post) 167 FeedPost.create!(feed_id: feed.feed_id, post: post, time: msg.time) unless !@save_posts 168 matched = true 169 end 170 end 171 172 if @log_posts == :all || @log_posts && matched 173 puts 174 puts text 175 end 176 177 post.save! if @save_posts == :all 178 179 print '.' if @show_progress && @log_posts != :all 180 rescue StandardError => e 181 puts "Error in #process_post: #{e}" 182 183 unless e.message == "nesting of 100 is too deep" 184 p msg 185 puts e.backtrace.reject { |x| x.include?('/ruby/') } 186 end 187 end 188 189 def inspect 190 vars = instance_variables - [:@feeds, :@timer] 191 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 192 "#<#{self.class}:0x#{object_id} #{values}>" 193 end 194end