1import WebSocket from "ws";
2import { Logger } from "./logger";
3import { healthMonitor } from "./healthCheck";
4
5interface WebSocketClientOptions {
6 /** The URL of the WebSocket server to connect to. */
7 service: string | string[];
8 /** The interval in milliseconds to wait before attempting to reconnect when the connection closes. Default is 5000ms. */
9 reconnectInterval?: number;
10 /** The interval in milliseconds for sending ping messages (heartbeats) to keep the connection alive. Default is 10000ms. */
11 pingInterval?: number;
12 /** Maximum number of consecutive reconnection attempts per service. Default is 3. */
13 maxReconnectAttempts?: number;
14 /** Maximum delay between reconnection attempts in milliseconds. Default is 30000ms (30 seconds). */
15 maxReconnectDelay?: number;
16 /** Exponential backoff factor for reconnection delays. Default is 1.5. */
17 backoffFactor?: number;
18 /** Maximum number of attempts to cycle through all services before giving up. Default is 2. */
19 maxServiceCycles?: number;
20}
21
22/**
23 * A WebSocket client that automatically attempts to reconnect upon disconnection
24 * and periodically sends ping messages (heartbeats) to ensure the connection remains alive.
25 *
26 * Extend this class and override the protected `onOpen`, `onMessage`, `onError`, and `onClose` methods
27 * to implement custom handling of WebSocket events.
28 */
29export class WebSocketClient {
30 private service: string | string[];
31 private reconnectInterval: number;
32 private pingInterval: number;
33 private ws: WebSocket | null = null;
34 private pingTimeout: NodeJS.Timeout | null = null;
35 private serviceIndex = 0;
36 private reconnectAttempts = 0;
37 private serviceCycles = 0;
38 private maxReconnectAttempts: number;
39 private maxServiceCycles: number;
40 private maxReconnectDelay: number;
41 private backoffFactor: number;
42 private reconnectTimeout: NodeJS.Timeout | null = null;
43 private isConnecting = false;
44 private shouldReconnect = true;
45 private messageCount = 0;
46 private lastMessageTime = 0;
47 private healthCheckName: string;
48
49 /**
50 * Creates a new instance of `WebSocketClient`.
51 *
52 * @param options - Configuration options for the WebSocket client, including URL, reconnect interval, and ping interval.
53 */
54 constructor(options: WebSocketClientOptions) {
55 this.service = options.service;
56 this.reconnectInterval = options.reconnectInterval || 5000;
57 this.pingInterval = options.pingInterval || 10000;
58 this.maxReconnectAttempts = options.maxReconnectAttempts || 3;
59 this.maxServiceCycles = options.maxServiceCycles || 2;
60 this.maxReconnectDelay = options.maxReconnectDelay || 30000;
61 this.backoffFactor = options.backoffFactor || 1.5;
62
63 // Generate unique health check name
64 this.healthCheckName = `websocket_${Date.now()}_${Math.random().toString(36).substring(2, 7)}`;
65
66 try {
67 // Register health check
68 healthMonitor.registerHealthCheck(this.healthCheckName, async () => {
69 return this.getConnectionState() === "CONNECTED";
70 });
71
72 // Initialize metrics
73 healthMonitor.setMetric(`${this.healthCheckName}_messages_received`, 0);
74 healthMonitor.setMetric(`${this.healthCheckName}_reconnect_attempts`, 0);
75 } catch (error) {
76 Logger.error("Error initializing health monitoring:", error);
77 }
78
79 this.run();
80 }
81
82 /**
83 * Initiates a WebSocket connection to the specified URL.
84 *
85 * This method sets up event listeners for `open`, `message`, `error`, and `close` events.
86 * When the connection opens, it starts the heartbeat mechanism.
87 * On close, it attempts to reconnect after a specified interval.
88 */
89 private run() {
90 if (this.isConnecting) {
91 return;
92 }
93
94 this.isConnecting = true;
95 const currentService = Array.isArray(this.service)
96 ? this.service[this.serviceIndex]
97 : this.service;
98
99 try {
100 Logger.info(`Attempting to connect to WebSocket: ${currentService}`);
101 this.ws = new WebSocket(currentService);
102
103 this.ws.on("open", () => {
104 try {
105 Logger.info("WebSocket connected successfully", {
106 service: this.getCurrentService(),
107 serviceIndex: this.serviceIndex,
108 });
109 this.isConnecting = false;
110 this.reconnectAttempts = 0; // Reset on successful connection
111 this.serviceCycles = 0; // Reset cycles on successful connection
112 try {
113 healthMonitor.setMetric(
114 `${this.healthCheckName}_reconnect_attempts`,
115 this.reconnectAttempts
116 );
117 } catch (healthError) {
118 Logger.error("Error updating health metrics:", healthError);
119 }
120 this.startHeartbeat();
121 this.onOpen();
122 } catch (error) {
123 Logger.error("Error in WebSocket open handler:", error);
124 this.isConnecting = false;
125 }
126 });
127
128 this.ws.on("message", (data: WebSocket.Data) => {
129 try {
130 this.messageCount++;
131 this.lastMessageTime = Date.now();
132 try {
133 healthMonitor.incrementMetric(`${this.healthCheckName}_messages_received`);
134 } catch (healthError) {
135 Logger.debug("Error updating message count metric:", healthError);
136 }
137 this.onMessage(data);
138 } catch (error) {
139 Logger.error("Error processing WebSocket message:", error);
140 }
141 });
142
143 this.ws.on("error", error => {
144 Logger.error("WebSocket error:", error);
145 this.isConnecting = false;
146 try {
147 this.onError(error);
148 } catch (handlerError) {
149 Logger.error("Error in WebSocket error handler:", handlerError);
150 }
151 });
152
153 this.ws.on("close", (code, reason) => {
154 try {
155 Logger.info(`WebSocket disconnected. Code: ${code}, Reason: ${reason.toString()}`);
156 this.isConnecting = false;
157 this.stopHeartbeat();
158 this.onClose();
159
160 if (this.shouldReconnect) {
161 this.scheduleReconnect();
162 }
163 } catch (error) {
164 Logger.error("Error in WebSocket close handler:", error);
165 this.isConnecting = false;
166 }
167 });
168 } catch (error) {
169 Logger.error("Error creating WebSocket connection:", error);
170 this.isConnecting = false;
171
172 // Schedule reconnect on connection creation failure
173 if (this.shouldReconnect) {
174 this.scheduleReconnect();
175 }
176 }
177 }
178
179 /**
180 * Attempts to reconnect to the WebSocket server after the specified `reconnectInterval`.
181 * It clears all event listeners on the old WebSocket and initiates a new connection.
182 */
183 private scheduleReconnect() {
184 this.reconnectAttempts++;
185 try {
186 healthMonitor.setMetric(`${this.healthCheckName}_reconnect_attempts`, this.reconnectAttempts);
187 } catch (error) {
188 Logger.debug("Error updating reconnect attempts metric:", error);
189 }
190
191 // Check if we should try the next service
192 if (this.reconnectAttempts >= this.maxReconnectAttempts) {
193 if (this.shouldTryNextService()) {
194 this.moveToNextService();
195 return; // Try next service immediately
196 } else {
197 Logger.error("All services exhausted after maximum cycles", {
198 totalServices: Array.isArray(this.service) ? this.service.length : 1,
199 maxServiceCycles: this.maxServiceCycles,
200 serviceCycles: this.serviceCycles,
201 });
202 return; // Give up entirely
203 }
204 }
205
206 const delay = Math.min(
207 this.reconnectInterval * Math.pow(this.backoffFactor, this.reconnectAttempts - 1),
208 this.maxReconnectDelay
209 );
210
211 Logger.info(
212 `Scheduling reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} for service`,
213 {
214 service: this.getCurrentService(),
215 serviceIndex: this.serviceIndex,
216 delay: `${delay}ms`,
217 }
218 );
219
220 if (this.reconnectTimeout) {
221 clearTimeout(this.reconnectTimeout);
222 }
223
224 this.reconnectTimeout = setTimeout(() => {
225 this.cleanup();
226 this.run();
227 }, delay);
228 }
229
230 /**
231 * Check if we should try the next service in the array.
232 */
233 private shouldTryNextService(): boolean {
234 if (!Array.isArray(this.service)) {
235 return false; // Single service, can't switch
236 }
237
238 return this.serviceCycles < this.maxServiceCycles;
239 }
240
241 /**
242 * Move to the next service in the array and reset reconnection attempts.
243 */
244 private moveToNextService() {
245 if (!Array.isArray(this.service)) {
246 return;
247 }
248
249 const previousIndex = this.serviceIndex;
250 this.serviceIndex = (this.serviceIndex + 1) % this.service.length;
251
252 // If we've gone through all services once, increment the cycle counter
253 if (this.serviceIndex === 0) {
254 this.serviceCycles++;
255 }
256
257 this.reconnectAttempts = 0; // Reset attempts for the new service
258
259 Logger.info("Switching to next service", {
260 previousService: this.service[previousIndex],
261 previousIndex,
262 newService: this.getCurrentService(),
263 newIndex: this.serviceIndex,
264 serviceCycle: this.serviceCycles,
265 });
266
267 // Try the new service immediately
268 this.cleanup();
269 this.run();
270 }
271
272 private cleanup() {
273 if (this.ws) {
274 this.ws.removeAllListeners();
275 if (this.ws.readyState === WebSocket.OPEN) {
276 this.ws.close();
277 }
278 this.ws = null;
279 }
280
281 if (this.reconnectTimeout) {
282 clearTimeout(this.reconnectTimeout);
283 this.reconnectTimeout = null;
284 }
285 }
286
287 /**
288 * Starts sending periodic ping messages to the server.
289 *
290 * This function uses `setInterval` to send a ping at the configured `pingInterval`.
291 * If the WebSocket is not open, pings are not sent.
292 */
293 private startHeartbeat() {
294 this.pingTimeout = setInterval(() => {
295 try {
296 if (this.ws && this.ws.readyState === WebSocket.OPEN) {
297 this.ws.ping();
298 }
299 } catch (error) {
300 Logger.error("Error sending WebSocket ping:", error);
301 }
302 }, this.pingInterval);
303 }
304
305 /**
306 * Stops sending heartbeat pings by clearing the ping interval.
307 */
308 private stopHeartbeat() {
309 if (this.pingTimeout) {
310 clearInterval(this.pingTimeout);
311 this.pingTimeout = null;
312 }
313 }
314
315 /**
316 * Called when the WebSocket connection is successfully opened.
317 *
318 * Override this method in a subclass to implement custom logic on connection.
319 */
320 protected onOpen() {
321 // Custom logic for connection open
322 }
323
324 /**
325 * Called when a WebSocket message is received.
326 *
327 * @param data - The data received from the WebSocket server.
328 *
329 * Override this method in a subclass to implement custom message handling.
330 */
331 protected onMessage(_data: WebSocket.Data) {
332 // Custom logic for handling received messages
333 }
334
335 /**
336 * Called when a WebSocket error occurs.
337 *
338 * @param error - The error that occurred.
339 *
340 * Override this method in a subclass to implement custom error handling.
341 * Note: Service switching is now handled in the reconnection logic, not here.
342 */
343 protected onError(_error: Error) {
344 // Custom logic for handling errors - override in subclasses
345 // Service switching is handled automatically in scheduleReconnect()
346 }
347
348 /**
349 * Called when the WebSocket connection is closed.
350 *
351 * Override this method in a subclass to implement custom logic on disconnection.
352 */
353 protected onClose() {
354 // Custom logic for handling connection close
355 }
356
357 /**
358 * Sends data to the connected WebSocket server, if the connection is open.
359 *
360 * @param data - The data to send.
361 * @returns true if the message was sent successfully, false otherwise.
362 */
363 public send(data: string | Buffer | ArrayBuffer | Buffer[]): boolean {
364 try {
365 if (this.ws && this.ws.readyState === WebSocket.OPEN) {
366 this.ws.send(data);
367 return true;
368 } else {
369 Logger.debug("Cannot send message: WebSocket not connected", {
370 readyState: this.ws?.readyState,
371 service: this.getCurrentService(),
372 });
373 return false;
374 }
375 } catch (error) {
376 Logger.error("Error sending WebSocket message:", error);
377 return false;
378 }
379 }
380
381 /**
382 * Closes the WebSocket connection gracefully.
383 */
384 public close() {
385 this.shouldReconnect = false;
386 this.stopHeartbeat();
387
388 if (this.reconnectTimeout) {
389 clearTimeout(this.reconnectTimeout);
390 this.reconnectTimeout = null;
391 }
392
393 if (this.ws) {
394 try {
395 this.ws.close();
396 } catch (error) {
397 Logger.error("Error closing WebSocket:", error);
398 }
399 }
400
401 // Unregister health check when closing
402 try {
403 healthMonitor.unregisterHealthCheck(this.healthCheckName);
404 } catch (error) {
405 Logger.error("Error unregistering health check:", error);
406 }
407 }
408
409 public getConnectionState(): string {
410 if (!this.ws) return "DISCONNECTED";
411
412 switch (this.ws.readyState) {
413 case WebSocket.CONNECTING:
414 return "CONNECTING";
415 case WebSocket.OPEN:
416 return "CONNECTED";
417 case WebSocket.CLOSING:
418 return "CLOSING";
419 case WebSocket.CLOSED:
420 return "DISCONNECTED";
421 default:
422 return "UNKNOWN";
423 }
424 }
425
426 public getReconnectAttempts(): number {
427 return this.reconnectAttempts;
428 }
429
430 public getServiceCycles(): number {
431 return this.serviceCycles;
432 }
433
434 public getServiceIndex(): number {
435 return this.serviceIndex;
436 }
437
438 public getAllServices(): string[] {
439 return Array.isArray(this.service) ? [...this.service] : [this.service];
440 }
441
442 public getCurrentService(): string {
443 return Array.isArray(this.service) ? this.service[this.serviceIndex] : this.service;
444 }
445
446 public getMessageCount(): number {
447 return this.messageCount;
448 }
449
450 public getLastMessageTime(): number {
451 return this.lastMessageTime;
452 }
453
454 public getHealthCheckName(): string {
455 return this.healthCheckName;
456 }
457}