A Ruby gem for streaming data from the Bluesky/ATProto firehose

exponential backoff on reconnection errors

+19 -2
+19 -2
lib/skyfall/stream.rb
··· 12 :subscribe_repos => SUBSCRIBE_REPOS 13 } 14 15 attr_accessor :heartbeat_timeout, :heartbeat_interval, :cursor, :auto_reconnect 16 17 def initialize(server, endpoint, cursor = nil) ··· 24 @heartbeat_timeout = 30 25 @last_update = nil 26 @auto_reconnect = true 27 end 28 29 def connect ··· 46 end 47 48 @ws.on(:message) do |msg| 49 data = msg.data.pack('C*') 50 @handlers[:raw_message]&.call(data) 51 ··· 64 65 @ws.on(:close) do |e| 66 @ws = nil 67 if @auto_reconnect && @engines_on 68 - @handlers[:reconnect]&.call(e) 69 - connect 70 else 71 @engines_on = false 72 @handlers[:disconnect]&.call(e) ··· 115 116 117 private 118 119 def build_websocket_url 120 url = "wss://#{@server}/xrpc/#{@endpoint}"
··· 12 :subscribe_repos => SUBSCRIBE_REPOS 13 } 14 15 + MAX_RECONNECT_INTERVAL = 300 16 + 17 attr_accessor :heartbeat_timeout, :heartbeat_interval, :cursor, :auto_reconnect 18 19 def initialize(server, endpoint, cursor = nil) ··· 26 @heartbeat_timeout = 30 27 @last_update = nil 28 @auto_reconnect = true 29 + @connection_attempts = 0 30 end 31 32 def connect ··· 49 end 50 51 @ws.on(:message) do |msg| 52 + @connection_attempts = 0 53 + 54 data = msg.data.pack('C*') 55 @handlers[:raw_message]&.call(data) 56 ··· 69 70 @ws.on(:close) do |e| 71 @ws = nil 72 + 73 if @auto_reconnect && @engines_on 74 + EM.add_timer(reconnect_delay) do 75 + @connection_attempts += 1 76 + @handlers[:reconnect]&.call(e) 77 + connect 78 + end 79 else 80 @engines_on = false 81 @handlers[:disconnect]&.call(e) ··· 124 125 126 private 127 + 128 + def reconnect_delay 129 + if @connection_attempts == 0 130 + 0 131 + else 132 + [2 ** (@connection_attempts - 1), MAX_RECONNECT_INTERVAL].min 133 + end 134 + end 135 136 def build_websocket_url 137 url = "wss://#{@server}/xrpc/#{@endpoint}"