An all-to-all group chat for AI agents on ATProto.

Fix race condition causing duplicate message processing

- Add processing_queue flag to prevent concurrent queue processing
- Check processing flag before attempting to process queue
- Cancel and clear flush task when processing begins
- Add proper cleanup in finally block to always reset flag
- Improve shutdown handling to cancel pending tasks
- Process remaining messages on shutdown before exit

Fixes issue where multiple timeout flushes would trigger simultaneously,
causing the same batch of messages to be sent to the agent multiple times.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+47 -12
src
+47 -12
src/jetstream_letta_bridge.py
··· 74 # Message processing 75 self.message_queue: List[Dict[str, Any]] = [] 76 self.queue_lock = asyncio.Lock() 77 self.batch_size = config.get('agent', {}).get('batch_size', 1) 78 self.queue_flush_timeout = config.get('agent', {}).get('queue_flush_timeout', 30) # seconds 79 self.queue_first_message_time: Optional[float] = None ··· 273 try: 274 await asyncio.sleep(self.queue_flush_timeout) 275 276 - # Check if we still have messages to process 277 async with self.queue_lock: 278 - if self.message_queue: 279 console.print(f"[{self.agent_name}] ⏰ Timeout flush: {len(self.message_queue)} messages") 280 281 await self.process_message_queue() 282 283 except asyncio.CancelledError: 284 # Task was cancelled, which is fine 285 pass 286 287 async def process_message_queue(self) -> None: 288 """Process all queued messages by sending them to the agent.""" 289 - if not self.message_queue: 290 - return 291 292 - # Get all queued items 293 - async with self.queue_lock: 294 items_to_process = self.message_queue.copy() 295 self.message_queue.clear() 296 # Reset queue timing 297 self.queue_first_message_time = None 298 if self.flush_task and not self.flush_task.done(): 299 self.flush_task.cancel() 300 - 301 - if not items_to_process: 302 - return 303 304 # Create combined prompt for batch processing 305 if len(items_to_process) == 1: ··· 339 console.print(f"[{self.agent_name}] ❌ {error_name}: {str(e)[:100]}") 340 logger.error(f"[{self.agent_name}] Agent communication error: {e}") 341 # Continue processing - don't let one error stop the bridge 342 343 def agent_stream_handler(self, chunk) -> None: 344 """Handle streaming chunks from the agent.""" ··· 487 488 async def stop(self) -> None: 489 """Stop the bridge.""" 490 self.running = False 491 await self.disconnect_jetstream() 492 await self.did_cache.close() 493 - 494 # Final statistics 495 elapsed = time.time() - self.start_time 496 - console.print(f"\n📊 Final Stats:") 497 console.print(f" Received: {self.blips_received}") 498 console.print(f" Sent: {self.messages_sent_to_agent}") 499 console.print(f" Published: {self.blips_published}") 500 console.print(f" Runtime: {elapsed/60:.1f}m") 501 - console.print(f" Rate: {self.blips_received / (elapsed / 60):.1f}/min") 502 503 504 def list_available_agents(directory: str) -> None:
··· 74 # Message processing 75 self.message_queue: List[Dict[str, Any]] = [] 76 self.queue_lock = asyncio.Lock() 77 + self.processing_queue = False # Flag to prevent concurrent processing 78 self.batch_size = config.get('agent', {}).get('batch_size', 1) 79 self.queue_flush_timeout = config.get('agent', {}).get('queue_flush_timeout', 30) # seconds 80 self.queue_first_message_time: Optional[float] = None ··· 274 try: 275 await asyncio.sleep(self.queue_flush_timeout) 276 277 + # Check if we still have messages to process and not already processing 278 async with self.queue_lock: 279 + if self.message_queue and not self.processing_queue: 280 console.print(f"[{self.agent_name}] ⏰ Timeout flush: {len(self.message_queue)} messages") 281 + # Process outside the lock 282 + else: 283 + logger.debug(f"[{self.agent_name}] Flush skipped - queue empty or already processing") 284 + return 285 286 await self.process_message_queue() 287 288 except asyncio.CancelledError: 289 # Task was cancelled, which is fine 290 + logger.debug(f"[{self.agent_name}] Flush task cancelled") 291 pass 292 293 async def process_message_queue(self) -> None: 294 """Process all queued messages by sending them to the agent.""" 295 + # Acquire lock and check if we should process 296 + async with self.queue_lock: 297 + # Check if already processing or queue is empty 298 + if self.processing_queue: 299 + logger.debug(f"[{self.agent_name}] Already processing queue, skipping") 300 + return 301 302 + if not self.message_queue: 303 + logger.debug(f"[{self.agent_name}] Queue is empty, nothing to process") 304 + return 305 + 306 + # Set processing flag and get items 307 + self.processing_queue = True 308 items_to_process = self.message_queue.copy() 309 self.message_queue.clear() 310 # Reset queue timing 311 self.queue_first_message_time = None 312 + # Cancel flush task since we're processing now 313 if self.flush_task and not self.flush_task.done(): 314 self.flush_task.cancel() 315 + self.flush_task = None 316 317 # Create combined prompt for batch processing 318 if len(items_to_process) == 1: ··· 352 console.print(f"[{self.agent_name}] ❌ {error_name}: {str(e)[:100]}") 353 logger.error(f"[{self.agent_name}] Agent communication error: {e}") 354 # Continue processing - don't let one error stop the bridge 355 + finally: 356 + # Always clear the processing flag when done 357 + async with self.queue_lock: 358 + self.processing_queue = False 359 + logger.debug(f"[{self.agent_name}] Queue processing complete, flag cleared") 360 361 def agent_stream_handler(self, chunk) -> None: 362 """Handle streaming chunks from the agent.""" ··· 505 506 async def stop(self) -> None: 507 """Stop the bridge.""" 508 + logger.debug(f"[{self.agent_name}] Stopping bridge...") 509 self.running = False 510 + 511 + # Cancel any pending flush task 512 + if self.flush_task and not self.flush_task.done(): 513 + logger.debug(f"[{self.agent_name}] Cancelling flush task") 514 + self.flush_task.cancel() 515 + try: 516 + await self.flush_task 517 + except asyncio.CancelledError: 518 + pass 519 + 520 + # Process any remaining messages in queue 521 + if self.message_queue: 522 + logger.debug(f"[{self.agent_name}] Processing {len(self.message_queue)} remaining messages") 523 + await self.process_message_queue() 524 + 525 await self.disconnect_jetstream() 526 await self.did_cache.close() 527 + 528 # Final statistics 529 elapsed = time.time() - self.start_time 530 + console.print(f"\n[{self.agent_name}] 📊 Final Stats:") 531 console.print(f" Received: {self.blips_received}") 532 console.print(f" Sent: {self.messages_sent_to_agent}") 533 console.print(f" Published: {self.blips_published}") 534 console.print(f" Runtime: {elapsed/60:.1f}m") 535 + if elapsed > 0: 536 + console.print(f" Rate: {self.blips_received / (elapsed / 60):.1f}/min") 537 538 539 def list_available_agents(directory: str) -> None: