Bots to use with the firehose/jetstream
at main 457 lines 14 kB view raw
1import WebSocket from "ws"; 2import { Logger } from "./logger"; 3import { healthMonitor } from "./healthCheck"; 4 5interface WebSocketClientOptions { 6 /** The URL of the WebSocket server to connect to. */ 7 service: string | string[]; 8 /** The interval in milliseconds to wait before attempting to reconnect when the connection closes. Default is 5000ms. */ 9 reconnectInterval?: number; 10 /** The interval in milliseconds for sending ping messages (heartbeats) to keep the connection alive. Default is 10000ms. */ 11 pingInterval?: number; 12 /** Maximum number of consecutive reconnection attempts per service. Default is 3. */ 13 maxReconnectAttempts?: number; 14 /** Maximum delay between reconnection attempts in milliseconds. Default is 30000ms (30 seconds). */ 15 maxReconnectDelay?: number; 16 /** Exponential backoff factor for reconnection delays. Default is 1.5. */ 17 backoffFactor?: number; 18 /** Maximum number of attempts to cycle through all services before giving up. Default is 2. */ 19 maxServiceCycles?: number; 20} 21 22/** 23 * A WebSocket client that automatically attempts to reconnect upon disconnection 24 * and periodically sends ping messages (heartbeats) to ensure the connection remains alive. 25 * 26 * Extend this class and override the protected `onOpen`, `onMessage`, `onError`, and `onClose` methods 27 * to implement custom handling of WebSocket events. 28 */ 29export class WebSocketClient { 30 private service: string | string[]; 31 private reconnectInterval: number; 32 private pingInterval: number; 33 private ws: WebSocket | null = null; 34 private pingTimeout: NodeJS.Timeout | null = null; 35 private serviceIndex = 0; 36 private reconnectAttempts = 0; 37 private serviceCycles = 0; 38 private maxReconnectAttempts: number; 39 private maxServiceCycles: number; 40 private maxReconnectDelay: number; 41 private backoffFactor: number; 42 private reconnectTimeout: NodeJS.Timeout | null = null; 43 private isConnecting = false; 44 private shouldReconnect = true; 45 private messageCount = 0; 46 private lastMessageTime = 0; 47 private healthCheckName: string; 48 49 /** 50 * Creates a new instance of `WebSocketClient`. 51 * 52 * @param options - Configuration options for the WebSocket client, including URL, reconnect interval, and ping interval. 53 */ 54 constructor(options: WebSocketClientOptions) { 55 this.service = options.service; 56 this.reconnectInterval = options.reconnectInterval || 5000; 57 this.pingInterval = options.pingInterval || 10000; 58 this.maxReconnectAttempts = options.maxReconnectAttempts || 3; 59 this.maxServiceCycles = options.maxServiceCycles || 2; 60 this.maxReconnectDelay = options.maxReconnectDelay || 30000; 61 this.backoffFactor = options.backoffFactor || 1.5; 62 63 // Generate unique health check name 64 this.healthCheckName = `websocket_${Date.now()}_${Math.random().toString(36).substring(2, 7)}`; 65 66 try { 67 // Register health check 68 healthMonitor.registerHealthCheck(this.healthCheckName, async () => { 69 return this.getConnectionState() === "CONNECTED"; 70 }); 71 72 // Initialize metrics 73 healthMonitor.setMetric(`${this.healthCheckName}_messages_received`, 0); 74 healthMonitor.setMetric(`${this.healthCheckName}_reconnect_attempts`, 0); 75 } catch (error) { 76 Logger.error("Error initializing health monitoring:", error); 77 } 78 79 this.run(); 80 } 81 82 /** 83 * Initiates a WebSocket connection to the specified URL. 84 * 85 * This method sets up event listeners for `open`, `message`, `error`, and `close` events. 86 * When the connection opens, it starts the heartbeat mechanism. 87 * On close, it attempts to reconnect after a specified interval. 88 */ 89 private run() { 90 if (this.isConnecting) { 91 return; 92 } 93 94 this.isConnecting = true; 95 const currentService = Array.isArray(this.service) 96 ? this.service[this.serviceIndex] 97 : this.service; 98 99 try { 100 Logger.info(`Attempting to connect to WebSocket: ${currentService}`); 101 this.ws = new WebSocket(currentService); 102 103 this.ws.on("open", () => { 104 try { 105 Logger.info("WebSocket connected successfully", { 106 service: this.getCurrentService(), 107 serviceIndex: this.serviceIndex, 108 }); 109 this.isConnecting = false; 110 this.reconnectAttempts = 0; // Reset on successful connection 111 this.serviceCycles = 0; // Reset cycles on successful connection 112 try { 113 healthMonitor.setMetric( 114 `${this.healthCheckName}_reconnect_attempts`, 115 this.reconnectAttempts 116 ); 117 } catch (healthError) { 118 Logger.error("Error updating health metrics:", healthError); 119 } 120 this.startHeartbeat(); 121 this.onOpen(); 122 } catch (error) { 123 Logger.error("Error in WebSocket open handler:", error); 124 this.isConnecting = false; 125 } 126 }); 127 128 this.ws.on("message", (data: WebSocket.Data) => { 129 try { 130 this.messageCount++; 131 this.lastMessageTime = Date.now(); 132 try { 133 healthMonitor.incrementMetric(`${this.healthCheckName}_messages_received`); 134 } catch (healthError) { 135 Logger.debug("Error updating message count metric:", healthError); 136 } 137 this.onMessage(data); 138 } catch (error) { 139 Logger.error("Error processing WebSocket message:", error); 140 } 141 }); 142 143 this.ws.on("error", error => { 144 Logger.error("WebSocket error:", error); 145 this.isConnecting = false; 146 try { 147 this.onError(error); 148 } catch (handlerError) { 149 Logger.error("Error in WebSocket error handler:", handlerError); 150 } 151 }); 152 153 this.ws.on("close", (code, reason) => { 154 try { 155 Logger.info(`WebSocket disconnected. Code: ${code}, Reason: ${reason.toString()}`); 156 this.isConnecting = false; 157 this.stopHeartbeat(); 158 this.onClose(); 159 160 if (this.shouldReconnect) { 161 this.scheduleReconnect(); 162 } 163 } catch (error) { 164 Logger.error("Error in WebSocket close handler:", error); 165 this.isConnecting = false; 166 } 167 }); 168 } catch (error) { 169 Logger.error("Error creating WebSocket connection:", error); 170 this.isConnecting = false; 171 172 // Schedule reconnect on connection creation failure 173 if (this.shouldReconnect) { 174 this.scheduleReconnect(); 175 } 176 } 177 } 178 179 /** 180 * Attempts to reconnect to the WebSocket server after the specified `reconnectInterval`. 181 * It clears all event listeners on the old WebSocket and initiates a new connection. 182 */ 183 private scheduleReconnect() { 184 this.reconnectAttempts++; 185 try { 186 healthMonitor.setMetric(`${this.healthCheckName}_reconnect_attempts`, this.reconnectAttempts); 187 } catch (error) { 188 Logger.debug("Error updating reconnect attempts metric:", error); 189 } 190 191 // Check if we should try the next service 192 if (this.reconnectAttempts >= this.maxReconnectAttempts) { 193 if (this.shouldTryNextService()) { 194 this.moveToNextService(); 195 return; // Try next service immediately 196 } else { 197 Logger.error("All services exhausted after maximum cycles", { 198 totalServices: Array.isArray(this.service) ? this.service.length : 1, 199 maxServiceCycles: this.maxServiceCycles, 200 serviceCycles: this.serviceCycles, 201 }); 202 return; // Give up entirely 203 } 204 } 205 206 const delay = Math.min( 207 this.reconnectInterval * Math.pow(this.backoffFactor, this.reconnectAttempts - 1), 208 this.maxReconnectDelay 209 ); 210 211 Logger.info( 212 `Scheduling reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} for service`, 213 { 214 service: this.getCurrentService(), 215 serviceIndex: this.serviceIndex, 216 delay: `${delay}ms`, 217 } 218 ); 219 220 if (this.reconnectTimeout) { 221 clearTimeout(this.reconnectTimeout); 222 } 223 224 this.reconnectTimeout = setTimeout(() => { 225 this.cleanup(); 226 this.run(); 227 }, delay); 228 } 229 230 /** 231 * Check if we should try the next service in the array. 232 */ 233 private shouldTryNextService(): boolean { 234 if (!Array.isArray(this.service)) { 235 return false; // Single service, can't switch 236 } 237 238 return this.serviceCycles < this.maxServiceCycles; 239 } 240 241 /** 242 * Move to the next service in the array and reset reconnection attempts. 243 */ 244 private moveToNextService() { 245 if (!Array.isArray(this.service)) { 246 return; 247 } 248 249 const previousIndex = this.serviceIndex; 250 this.serviceIndex = (this.serviceIndex + 1) % this.service.length; 251 252 // If we've gone through all services once, increment the cycle counter 253 if (this.serviceIndex === 0) { 254 this.serviceCycles++; 255 } 256 257 this.reconnectAttempts = 0; // Reset attempts for the new service 258 259 Logger.info("Switching to next service", { 260 previousService: this.service[previousIndex], 261 previousIndex, 262 newService: this.getCurrentService(), 263 newIndex: this.serviceIndex, 264 serviceCycle: this.serviceCycles, 265 }); 266 267 // Try the new service immediately 268 this.cleanup(); 269 this.run(); 270 } 271 272 private cleanup() { 273 if (this.ws) { 274 this.ws.removeAllListeners(); 275 if (this.ws.readyState === WebSocket.OPEN) { 276 this.ws.close(); 277 } 278 this.ws = null; 279 } 280 281 if (this.reconnectTimeout) { 282 clearTimeout(this.reconnectTimeout); 283 this.reconnectTimeout = null; 284 } 285 } 286 287 /** 288 * Starts sending periodic ping messages to the server. 289 * 290 * This function uses `setInterval` to send a ping at the configured `pingInterval`. 291 * If the WebSocket is not open, pings are not sent. 292 */ 293 private startHeartbeat() { 294 this.pingTimeout = setInterval(() => { 295 try { 296 if (this.ws && this.ws.readyState === WebSocket.OPEN) { 297 this.ws.ping(); 298 } 299 } catch (error) { 300 Logger.error("Error sending WebSocket ping:", error); 301 } 302 }, this.pingInterval); 303 } 304 305 /** 306 * Stops sending heartbeat pings by clearing the ping interval. 307 */ 308 private stopHeartbeat() { 309 if (this.pingTimeout) { 310 clearInterval(this.pingTimeout); 311 this.pingTimeout = null; 312 } 313 } 314 315 /** 316 * Called when the WebSocket connection is successfully opened. 317 * 318 * Override this method in a subclass to implement custom logic on connection. 319 */ 320 protected onOpen() { 321 // Custom logic for connection open 322 } 323 324 /** 325 * Called when a WebSocket message is received. 326 * 327 * @param data - The data received from the WebSocket server. 328 * 329 * Override this method in a subclass to implement custom message handling. 330 */ 331 protected onMessage(_data: WebSocket.Data) { 332 // Custom logic for handling received messages 333 } 334 335 /** 336 * Called when a WebSocket error occurs. 337 * 338 * @param error - The error that occurred. 339 * 340 * Override this method in a subclass to implement custom error handling. 341 * Note: Service switching is now handled in the reconnection logic, not here. 342 */ 343 protected onError(_error: Error) { 344 // Custom logic for handling errors - override in subclasses 345 // Service switching is handled automatically in scheduleReconnect() 346 } 347 348 /** 349 * Called when the WebSocket connection is closed. 350 * 351 * Override this method in a subclass to implement custom logic on disconnection. 352 */ 353 protected onClose() { 354 // Custom logic for handling connection close 355 } 356 357 /** 358 * Sends data to the connected WebSocket server, if the connection is open. 359 * 360 * @param data - The data to send. 361 * @returns true if the message was sent successfully, false otherwise. 362 */ 363 public send(data: string | Buffer | ArrayBuffer | Buffer[]): boolean { 364 try { 365 if (this.ws && this.ws.readyState === WebSocket.OPEN) { 366 this.ws.send(data); 367 return true; 368 } else { 369 Logger.debug("Cannot send message: WebSocket not connected", { 370 readyState: this.ws?.readyState, 371 service: this.getCurrentService(), 372 }); 373 return false; 374 } 375 } catch (error) { 376 Logger.error("Error sending WebSocket message:", error); 377 return false; 378 } 379 } 380 381 /** 382 * Closes the WebSocket connection gracefully. 383 */ 384 public close() { 385 this.shouldReconnect = false; 386 this.stopHeartbeat(); 387 388 if (this.reconnectTimeout) { 389 clearTimeout(this.reconnectTimeout); 390 this.reconnectTimeout = null; 391 } 392 393 if (this.ws) { 394 try { 395 this.ws.close(); 396 } catch (error) { 397 Logger.error("Error closing WebSocket:", error); 398 } 399 } 400 401 // Unregister health check when closing 402 try { 403 healthMonitor.unregisterHealthCheck(this.healthCheckName); 404 } catch (error) { 405 Logger.error("Error unregistering health check:", error); 406 } 407 } 408 409 public getConnectionState(): string { 410 if (!this.ws) return "DISCONNECTED"; 411 412 switch (this.ws.readyState) { 413 case WebSocket.CONNECTING: 414 return "CONNECTING"; 415 case WebSocket.OPEN: 416 return "CONNECTED"; 417 case WebSocket.CLOSING: 418 return "CLOSING"; 419 case WebSocket.CLOSED: 420 return "DISCONNECTED"; 421 default: 422 return "UNKNOWN"; 423 } 424 } 425 426 public getReconnectAttempts(): number { 427 return this.reconnectAttempts; 428 } 429 430 public getServiceCycles(): number { 431 return this.serviceCycles; 432 } 433 434 public getServiceIndex(): number { 435 return this.serviceIndex; 436 } 437 438 public getAllServices(): string[] { 439 return Array.isArray(this.service) ? [...this.service] : [this.service]; 440 } 441 442 public getCurrentService(): string { 443 return Array.isArray(this.service) ? this.service[this.serviceIndex] : this.service; 444 } 445 446 public getMessageCount(): number { 447 return this.messageCount; 448 } 449 450 public getLastMessageTime(): number { 451 return this.lastMessageTime; 452 } 453 454 public getHealthCheckName(): string { 455 return this.healthCheckName; 456 } 457}