+3
-1
src/jetstream_letta_bridge.py
+3
-1
src/jetstream_letta_bridge.py
···
335
335
console.print(f"[{self.agent_name}] 📊 {self.blips_received} received | {self.messages_sent_to_agent} sent | {self.blips_published} published ({rate:.1f}/min)")
336
336
337
337
except Exception as e:
338
-
console.print(f"[{self.agent_name}] ❌ Agent error: {e}")
338
+
error_name = type(e).__name__
339
+
console.print(f"[{self.agent_name}] ❌ {error_name}: {str(e)[:100]}")
339
340
logger.error(f"[{self.agent_name}] Agent communication error: {e}")
341
+
# Continue processing - don't let one error stop the bridge
340
342
341
343
def agent_stream_handler(self, chunk) -> None:
342
344
"""Handle streaming chunks from the agent."""
+85
-61
src/letta_integration.py
+85
-61
src/letta_integration.py
···
222
222
223
223
return results
224
224
225
-
def send_message_to_agent(self, message: str, stream_handler: Optional[Callable] = None) -> Any:
225
+
def send_message_to_agent(self, message: str, stream_handler: Optional[Callable] = None, max_retries: int = 2) -> Any:
226
226
"""
227
227
Send a message to the Letta agent with streaming support.
228
228
229
229
Args:
230
230
message: Message to send to the agent
231
231
stream_handler: Optional function to handle streaming chunks
232
+
max_retries: Maximum number of retry attempts on timeout
232
233
233
234
Returns:
234
235
Streaming response object
235
236
"""
236
-
logger.debug(f"[{self.agent_name}] Sending to agent: {message[:100]}...")
237
+
from httpx import ReadTimeout
238
+
import time
237
239
238
-
try:
239
-
# Create streaming request
240
-
logger.debug(f"[{self.agent_name}] Calling create_stream API...")
241
-
message_stream = self.letta_client.agents.messages.create_stream(
242
-
agent_id=self.agent_id,
243
-
messages=[{"role": "user", "content": message}],
244
-
stream_tokens=False, # Step streaming only
245
-
max_steps=self.max_steps
246
-
)
247
-
logger.debug(f"[{self.agent_name}] Stream created, processing chunks...")
240
+
last_error = None
241
+
for attempt in range(max_retries + 1):
242
+
try:
243
+
if attempt > 0:
244
+
delay = min(2 ** attempt, 10) # Exponential backoff, max 10s
245
+
logger.warning(f"[{self.agent_name}] Retry attempt {attempt}/{max_retries} after {delay}s delay...")
246
+
time.sleep(delay)
248
247
249
-
# Process streaming response
250
-
chunk_count = 0
251
-
for chunk in message_stream:
252
-
chunk_count += 1
253
-
if chunk_count == 1:
254
-
logger.debug(f"[{self.agent_name}] Received first chunk")
248
+
logger.debug(f"[{self.agent_name}] Sending to agent: {message[:100]}...")
255
249
256
-
# Handle streaming chunk
257
-
if stream_handler:
258
-
try:
259
-
stream_handler(chunk)
260
-
except Exception as e:
261
-
logger.error(f"Error in stream handler: {e}")
262
-
263
-
# Check for send_message tool calls
264
-
if hasattr(chunk, 'message_type') and chunk.message_type == 'tool_call_message':
265
-
message_content = self.process_tool_call_chunk(chunk)
266
-
if message_content:
267
-
self.add_message_to_batch(message_content)
268
-
269
-
# Check for direct assistant messages (not via tool)
270
-
elif hasattr(chunk, 'message_type') and chunk.message_type == 'assistant_message':
271
-
if hasattr(chunk, 'content') and chunk.content:
272
-
logger.debug(f"Direct assistant message: {chunk.content[:100]}...")
273
-
self.add_message_to_batch(chunk.content)
274
-
275
-
# Log chunk information
276
-
if hasattr(chunk, 'message_type'):
277
-
if chunk.message_type == 'reasoning_message':
278
-
logger.debug(f"Reasoning: {chunk.reasoning[:100]}...")
279
-
elif chunk.message_type == 'tool_call_message':
280
-
tool_name = chunk.tool_call.name
281
-
logger.debug(f"Tool call: {tool_name}")
282
-
elif chunk.message_type == 'tool_return_message':
283
-
status = chunk.status
284
-
logger.debug(f"Tool result: {chunk.name} -> {status}")
285
-
elif chunk.message_type == 'assistant_message':
286
-
logger.debug(f"Assistant: {chunk.content[:100]}...")
287
-
288
-
if str(chunk) == 'done':
289
-
logger.debug(f"[{self.agent_name}] Received 'done' signal")
290
-
break
250
+
# Create streaming request
251
+
logger.debug(f"[{self.agent_name}] Calling create_stream API...")
252
+
message_stream = self.letta_client.agents.messages.create_stream(
253
+
agent_id=self.agent_id,
254
+
messages=[{"role": "user", "content": message}],
255
+
stream_tokens=False, # Step streaming only
256
+
max_steps=self.max_steps
257
+
)
258
+
logger.debug(f"[{self.agent_name}] Stream created, processing chunks...")
291
259
292
-
logger.debug(f"[{self.agent_name}] Processed {chunk_count} chunks")
260
+
# Process streaming response
261
+
chunk_count = 0
262
+
for chunk in message_stream:
263
+
chunk_count += 1
264
+
if chunk_count == 1:
265
+
logger.debug(f"[{self.agent_name}] Received first chunk")
266
+
267
+
# Handle streaming chunk
268
+
if stream_handler:
269
+
try:
270
+
stream_handler(chunk)
271
+
except Exception as e:
272
+
logger.error(f"Error in stream handler: {e}")
273
+
274
+
# Check for send_message tool calls
275
+
if hasattr(chunk, 'message_type') and chunk.message_type == 'tool_call_message':
276
+
message_content = self.process_tool_call_chunk(chunk)
277
+
if message_content:
278
+
self.add_message_to_batch(message_content)
279
+
280
+
# Check for direct assistant messages (not via tool)
281
+
elif hasattr(chunk, 'message_type') and chunk.message_type == 'assistant_message':
282
+
if hasattr(chunk, 'content') and chunk.content:
283
+
logger.debug(f"Direct assistant message: {chunk.content[:100]}...")
284
+
self.add_message_to_batch(chunk.content)
285
+
286
+
# Log chunk information
287
+
if hasattr(chunk, 'message_type'):
288
+
if chunk.message_type == 'reasoning_message':
289
+
logger.debug(f"Reasoning: {chunk.reasoning[:100]}...")
290
+
elif chunk.message_type == 'tool_call_message':
291
+
tool_name = chunk.tool_call.name
292
+
logger.debug(f"Tool call: {tool_name}")
293
+
elif chunk.message_type == 'tool_return_message':
294
+
status = chunk.status
295
+
logger.debug(f"Tool result: {chunk.name} -> {status}")
296
+
elif chunk.message_type == 'assistant_message':
297
+
logger.debug(f"Assistant: {chunk.content[:100]}...")
293
298
294
-
# Flush any remaining messages in batch
295
-
self.flush_batch()
299
+
if str(chunk) == 'done':
300
+
logger.debug(f"[{self.agent_name}] Received 'done' signal")
301
+
break
296
302
297
-
logger.debug(f"[{self.agent_name}] Agent processing completed")
298
-
return message_stream
303
+
logger.debug(f"[{self.agent_name}] Processed {chunk_count} chunks")
304
+
305
+
# Flush any remaining messages in batch
306
+
self.flush_batch()
299
307
300
-
except Exception as e:
301
-
logger.error(f"[{self.agent_name}] Error sending message to agent: {e}", exc_info=True)
302
-
raise
308
+
logger.debug(f"[{self.agent_name}] Agent processing completed")
309
+
return message_stream
310
+
311
+
except ReadTimeout as e:
312
+
last_error = e
313
+
logger.warning(f"[{self.agent_name}] Read timeout on attempt {attempt + 1}/{max_retries + 1}")
314
+
if attempt >= max_retries:
315
+
logger.error(f"[{self.agent_name}] Max retries exceeded, giving up")
316
+
raise
317
+
# Continue to next retry
318
+
continue
319
+
320
+
except Exception as e:
321
+
logger.error(f"[{self.agent_name}] Error sending message to agent: {e}", exc_info=True)
322
+
raise
323
+
324
+
# Should not reach here, but just in case
325
+
if last_error:
326
+
raise last_error
303
327
304
328
def get_agent_info(self) -> Optional[Dict[str, Any]]:
305
329
"""