An AI agent built to do Ralph loops - plan mode for planning and ralph mode for implementing.
at new-directions 188 lines 5.0 kB view raw
1/** 2 * WebSocket connection handler for Rustagent V2 daemon. 3 * Manages persistent WebSocket connection to /ws with automatic reconnection 4 * and event dispatching via callbacks. 5 */ 6 7import type { WsEvent } from '../types'; 8 9/** 10 * Set of known WebSocket event types for validation. 11 * Used in handleMessage() for O(1) forward-compatibility checking. 12 */ 13const KNOWN_EVENT_TYPES = new Set<string>([ 14 'agent_spawned', 15 'agent_progress', 16 'agent_completed', 17 'node_created', 18 'node_status_changed', 19 'edge_created', 20 'session_ended', 21 'tool_execution', 22 'orchestrator_state_changed', 23]); 24 25/** 26 * Manages a WebSocket connection to the daemon. 27 * Handles automatic reconnection with exponential backoff. 28 */ 29export class WsConnection { 30 private ws: WebSocket | null = null; 31 private listeners: Set<(event: WsEvent) => void> = new Set(); 32 private wsUrl: string; 33 private reconnectTimer: ReturnType<typeof setTimeout> | null = null; 34 private backoffMs: number = 1000; // Start at 1 second 35 private shouldReconnect: boolean = false; 36 37 /** 38 * Constructor. 39 * @param wsUrl URL for the WebSocket. If not provided, auto-detects from window.location. 40 * Use 'ws://' or 'wss://' based on page protocol, pointing to '/ws'. 41 */ 42 constructor(wsUrl?: string) { 43 if (wsUrl) { 44 this.wsUrl = wsUrl; 45 } else { 46 // Auto-detect from window.location (browser environment) 47 if (typeof window !== 'undefined' && window.location) { 48 const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; 49 const host = window.location.host; 50 this.wsUrl = `${protocol}//${host}/ws`; 51 } else { 52 // Fallback for non-browser environments 53 this.wsUrl = 'ws://localhost/ws'; 54 } 55 } 56 } 57 58 /** 59 * Current connection state. 60 */ 61 get connected(): boolean { 62 return this.ws !== null && this.ws.readyState === 1; // WebSocket.OPEN = 1 63 } 64 65 /** 66 * Register a callback for all WebSocket events. 67 */ 68 onEvent(callback: (event: WsEvent) => void): void { 69 this.listeners.add(callback); 70 } 71 72 /** 73 * Unregister a callback. 74 */ 75 offEvent(callback: (event: WsEvent) => void): void { 76 this.listeners.delete(callback); 77 } 78 79 /** 80 * Open the WebSocket connection. 81 * Automatically handles reconnection on close/error. 82 */ 83 connect(): void { 84 if (this.ws) { 85 return; // Already connected or connecting 86 } 87 88 this.shouldReconnect = true; 89 this.ws = new WebSocket(this.wsUrl); 90 91 this.ws.onopen = () => { 92 // Reset backoff on successful connection 93 this.backoffMs = 1000; 94 }; 95 96 this.ws.onmessage = (event: MessageEvent) => { 97 this.handleMessage(event.data); 98 }; 99 100 this.ws.onerror = () => { 101 // Error will trigger onclose, so we don't need to handle reconnection here 102 }; 103 104 this.ws.onclose = () => { 105 this.ws = null; 106 if (this.shouldReconnect) { 107 this.scheduleReconnect(); 108 } 109 }; 110 } 111 112 /** 113 * Close the WebSocket connection and stop reconnection attempts. 114 */ 115 disconnect(): void { 116 this.shouldReconnect = false; 117 if (this.reconnectTimer) { 118 clearTimeout(this.reconnectTimer); 119 this.reconnectTimer = null; 120 } 121 if (this.ws) { 122 this.ws.close(); 123 this.ws = null; 124 } 125 } 126 127 /** 128 * Schedule a reconnection attempt with exponential backoff. 129 * Backoff: 1s, 2s, 4s, 8s, 16s, 30s max. 130 */ 131 private scheduleReconnect(): void { 132 if (this.reconnectTimer) { 133 clearTimeout(this.reconnectTimer); 134 } 135 136 // Cap backoff at 30 seconds 137 const delayMs = Math.min(this.backoffMs, 30000); 138 139 this.reconnectTimer = setTimeout(() => { 140 this.reconnectTimer = null; 141 if (this.shouldReconnect) { 142 // Double backoff for next attempt (up to cap) 143 if (this.backoffMs < 30000) { 144 this.backoffMs *= 2; 145 } 146 this.connect(); 147 } 148 }, delayMs); 149 } 150 151 /** 152 * Handle incoming message from WebSocket. 153 * Parses JSON, validates event shape, and dispatches to listeners. 154 */ 155 private handleMessage(data: string): void { 156 try { 157 const parsed = JSON.parse(data); 158 159 // Validate that it has a type field 160 if (!parsed.type || typeof parsed.type !== 'string') { 161 console.warn('WebSocket message missing or invalid type field:', parsed); 162 return; 163 } 164 165 // Type-narrow to WsEvent based on type field 166 // Check against known event types for forward compatibility 167 if (!KNOWN_EVENT_TYPES.has(parsed.type)) { 168 console.warn('WebSocket message with unknown event type:', parsed.type); 169 return; 170 } 171 172 // Dispatch to all listeners 173 const event = parsed as WsEvent; 174 for (const listener of this.listeners) { 175 listener(event); 176 } 177 } catch (error) { 178 console.warn('Failed to parse WebSocket message:', data, error); 179 } 180 } 181} 182 183/** 184 * Singleton-style factory for creating a WebSocket connection. 185 */ 186export function createWsConnection(wsUrl?: string): WsConnection { 187 return new WsConnection(wsUrl); 188}