Template of a custom feed generator service for the Bluesky network in Ruby

add a timestamp to all firehose logs

Changed files
+29 -18
app
bin
+20 -16
app/firehose_stream.rb
··· 38 end 39 40 if @log_status 41 - @sky.on_connecting { |u| puts "Connecting to #{u}..." } 42 @sky.on_connect { 43 @replaying = !!(cursor) 44 - puts "Connected #{Time.now} ✓" 45 } 46 @sky.on_disconnect { 47 - puts 48 - puts "Disconnected #{Time.now}" 49 save_cursor(sky.cursor) 50 } 51 - @sky.on_reconnect { puts "Connection lost, reconnecting..." } 52 - @sky.on_error { |e| puts "ERROR: #{Time.now} #{e.class} #{e.message}" } 53 end 54 55 @sky.connect ··· 76 def process_message(msg) 77 if msg.is_a?(Skyfall::InfoMessage) 78 # AtProto error, the only one right now is "OutdatedCursor" 79 - puts "InfoMessage: #{msg}" 80 elsif msg.is_a?(Skyfall::HandleMessage) 81 # use these events if you want to track handle changes: 82 - # puts "Handle change: #{msg.repo} => #{msg.handle}" 83 elsif msg.is_a?(Skyfall::UnknownMessage) 84 - puts "Unknown message type: #{msg.type} (#{msg.seq})" 85 end 86 87 return unless msg.type == :commit 88 89 if @replaying 90 - puts "Replaying events since #{msg.time.getlocal} -->" 91 @replaying = false 92 end 93 ··· 125 126 begin 127 if op.raw_record.nil? 128 - puts "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})" 129 return 130 end 131 rescue CBOR::UnpackError => e 132 - puts "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}" 133 return 134 end 135 ··· 138 post_time = Time.parse(op.raw_record['createdAt']) 139 return if post_time < msg.time - 86400 140 rescue StandardError => e 141 - puts "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})" 142 return 143 end 144 ··· 178 179 print '.' if @show_progress && @log_posts != :all 180 rescue StandardError => e 181 - puts "Error in #process_post: #{e}" 182 183 unless e.message == "nesting of 100 is too deep" 184 - p msg 185 - puts e.backtrace.reject { |x| x.include?('/ruby/') } 186 end 187 end 188 189 def inspect
··· 38 end 39 40 if @log_status 41 + @sky.on_connecting { |u| log "Connecting to #{u}..." } 42 @sky.on_connect { 43 @replaying = !!(cursor) 44 + log "Connected ✓" 45 } 46 @sky.on_disconnect { 47 + log "Disconnected." 48 save_cursor(sky.cursor) 49 } 50 + @sky.on_reconnect { log "Connection lost, reconnecting..." } 51 + @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" } 52 end 53 54 @sky.connect ··· 75 def process_message(msg) 76 if msg.is_a?(Skyfall::InfoMessage) 77 # AtProto error, the only one right now is "OutdatedCursor" 78 + log "InfoMessage: #{msg}" 79 elsif msg.is_a?(Skyfall::HandleMessage) 80 # use these events if you want to track handle changes: 81 + # log "Handle change: #{msg.repo} => #{msg.handle}" 82 elsif msg.is_a?(Skyfall::UnknownMessage) 83 + log "Unknown message type: #{msg.type} (#{msg.seq})" 84 end 85 86 return unless msg.type == :commit 87 88 if @replaying 89 + log "Replaying events since #{msg.time.getlocal} -->" 90 @replaying = false 91 end 92 ··· 124 125 begin 126 if op.raw_record.nil? 127 + log "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})" 128 return 129 end 130 rescue CBOR::UnpackError => e 131 + log "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}" 132 return 133 end 134 ··· 137 post_time = Time.parse(op.raw_record['createdAt']) 138 return if post_time < msg.time - 86400 139 rescue StandardError => e 140 + log "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})" 141 return 142 end 143 ··· 177 178 print '.' if @show_progress && @log_posts != :all 179 rescue StandardError => e 180 + log "Error in #process_post: #{e}" 181 182 unless e.message == "nesting of 100 is too deep" 183 + log msg.inspect 184 + log e.backtrace.reject { |x| x.include?('/ruby/') } 185 end 186 + end 187 + 188 + def log(text) 189 + puts if @show_progress 190 + puts "[#{Time.now}] #{text}" 191 end 192 193 def inspect
+9 -2
bin/firehose
··· 71 end 72 end 73 74 - trap("SIGINT") { puts "Stopping..."; firehose.stop } 75 - trap("SIGTERM") { puts; puts "Shutting down the service..."; firehose.stop } 76 77 firehose.start
··· 71 end 72 end 73 74 + trap("SIGINT") { 75 + firehose.log "Stopping..." 76 + firehose.stop 77 + } 78 + 79 + trap("SIGTERM") { 80 + firehose.log "Shutting down the service..." 81 + firehose.stop 82 + } 83 84 firehose.start