#!/usr/bin/env python3 """Jetstream handler for listening to stream.thought.blip records.""" import asyncio import json import logging import signal import sys import time from typing import Optional, Set from datetime import datetime, timezone from pathlib import Path import websockets import click from rich.console import Console from rich.logging import RichHandler try: from .config_loader import load_config, get_jetstream_config from .models import JetstreamEvent, BlipRecord, BlipMessage from .did_cache import DIDCache except ImportError: # Handle running as script directly import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) from config_loader import load_config, get_jetstream_config from models import JetstreamEvent, BlipRecord, BlipMessage from did_cache import DIDCache # Set up logging console = Console() logging.basicConfig( level=logging.INFO, format="%(message)s", datefmt="[%X]", handlers=[RichHandler(console=console, rich_tracebacks=True)] ) logger = logging.getLogger(__name__) class JetstreamHandler: """Handler for ATProto Jetstream websocket connections.""" def __init__(self, config: dict): """Initialize the handler.""" self.config = config self.jetstream_config = get_jetstream_config(config) self.did_cache = DIDCache( max_size=config['cache']['max_cache_size'], ttl=config['cache']['did_cache_ttl'] ) self.websocket: Optional[websockets.WebSocketServerProtocol] = None self.running = False self.reconnect_count = 0 self.message_count = 0 self.cursor: Optional[int] = None self.wanted_dids: Set[str] = set(self.jetstream_config.get('wanted_dids', [])) self.output_format = "display" # or "json" def build_websocket_url(self) -> str: """Build the websocket URL with query parameters.""" base_url = self.jetstream_config['instance'] if not base_url.endswith('/subscribe'): base_url = base_url.rstrip('/') + '/subscribe' params = [] # Filter for stream.thought.blip collection params.append("wantedCollections=stream.thought.blip") # Add wanted DIDs if specified if self.wanted_dids: for did in self.wanted_dids: params.append(f"wantedDids={did}") # Add cursor if specified if self.cursor: params.append(f"cursor={self.cursor}") # Add compression support params.append("compress=false") # Disable compression for now url = base_url if params: url += "?" + "&".join(params) return url async def connect(self) -> bool: """Connect to jetstream websocket.""" url = self.build_websocket_url() try: logger.info(f"Connecting to jetstream: {url}") self.websocket = await websockets.connect( url, ping_interval=30, ping_timeout=10, close_timeout=10 ) logger.info("Connected to jetstream") return True except Exception as e: logger.error(f"Failed to connect to jetstream: {e}") return False async def disconnect(self) -> None: """Disconnect from jetstream.""" if self.websocket: await self.websocket.close() self.websocket = None logger.info("Disconnected from jetstream") async def handle_message(self, message: str) -> None: """Handle incoming jetstream message.""" try: data = json.loads(message) event = JetstreamEvent(**data) # Update cursor for resumption self.cursor = event.time_us # Only process commit events with blip records if event.kind != "commit" or not event.commit: return commit = event.commit if commit.collection != "stream.thought.blip": return # Skip delete operations (no record data) if commit.operation == "delete": logger.debug(f"Skipping delete operation for {event.did}") return # Filter by wanted DIDs if specified if self.wanted_dids and event.did not in self.wanted_dids: return # Parse blip record if not commit.record: logger.warning(f"No record data in commit from {event.did}") return try: blip_record = BlipRecord(**commit.record) except Exception as e: logger.warning(f"Failed to parse blip record from {event.did}: {e}") return # Resolve DID to profile data profile_data = await self.did_cache.resolve_did(event.did) if profile_data: handle = profile_data.handle display_name = profile_data.display_name else: handle = event.did # Fallback to DID if resolution fails display_name = None # Create formatted message blip_message = BlipMessage( author_handle=handle, author_display_name=display_name, author_did=event.did, created_at=blip_record.created_at, content=blip_record.content, record_uri=f"at://{event.did}/{commit.collection}/{commit.rkey}", record_cid=commit.cid ) # Output the message await self.output_message(blip_message) self.message_count += 1 except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON message: {e}") except Exception as e: logger.error(f"Error handling message: {e}") async def output_message(self, message: BlipMessage) -> None: """Output a blip message in the specified format.""" if self.output_format == "json": console.print(message.to_json()) else: console.print(message.format_display()) # Add a small separator for readability in display mode if self.output_format == "display": console.print() async def listen(self) -> None: """Listen for messages on the websocket.""" if not self.websocket: raise RuntimeError("Not connected to websocket") try: async for message in self.websocket: await self.handle_message(message) except websockets.exceptions.ConnectionClosed: logger.warning("Websocket connection closed") except Exception as e: logger.error(f"Error in listen loop: {e}") async def run_with_reconnect(self) -> None: """Run the handler with automatic reconnection.""" self.running = True while self.running: try: # Connect to websocket if not await self.connect(): await self._handle_reconnect_delay() continue # Reset reconnect count on successful connection self.reconnect_count = 0 # Listen for messages await self.listen() except KeyboardInterrupt: logger.info("Received interrupt signal, shutting down...") break except Exception as e: logger.error(f"Unexpected error: {e}") finally: await self.disconnect() # Handle reconnection if still running if self.running: await self._handle_reconnect_delay() async def _handle_reconnect_delay(self) -> None: """Handle reconnection delay with exponential backoff.""" self.reconnect_count += 1 max_attempts = self.jetstream_config['max_reconnect_attempts'] if max_attempts > 0 and self.reconnect_count > max_attempts: logger.error(f"Max reconnection attempts ({max_attempts}) exceeded") self.running = False return # Exponential backoff: base_delay * (2 ^ attempt) base_delay = self.jetstream_config['reconnect_delay'] delay = min(base_delay * (2 ** (self.reconnect_count - 1)), 300) # Cap at 5 minutes logger.info(f"Reconnecting in {delay}s (attempt {self.reconnect_count})") await asyncio.sleep(delay) async def stop(self) -> None: """Stop the handler.""" self.running = False await self.disconnect() await self.did_cache.close() stats = self.did_cache.stats() logger.info(f"Processed {self.message_count} messages") logger.info(f"DID cache stats: {stats}") def add_wanted_did(self, did: str) -> None: """Add a DID to the wanted list.""" self.wanted_dids.add(did) logger.info(f"Added DID to wanted list: {did}") def remove_wanted_did(self, did: str) -> None: """Remove a DID from the wanted list.""" self.wanted_dids.discard(did) logger.info(f"Removed DID from wanted list: {did}") def set_output_format(self, format_type: str) -> None: """Set the output format (display or json).""" if format_type not in ["display", "json"]: raise ValueError("Output format must be 'display' or 'json'") self.output_format = format_type logger.info(f"Output format set to: {format_type}") # Global handler instance for signal handling handler_instance: Optional[JetstreamHandler] = None def signal_handler(signum, frame): """Handle shutdown signals.""" if handler_instance: logger.info("Received shutdown signal, stopping handler...") asyncio.create_task(handler_instance.stop()) @click.command() @click.option('--config', '-c', type=click.Path(exists=True), help='Path to configuration file') @click.option('--dids', help='Comma-separated list of DIDs to monitor') @click.option('--cursor', type=int, help='Cursor position to start from (unix microseconds)') @click.option('--output', type=click.Choice(['display', 'json']), default='display', help='Output format') @click.option('--verbose', '-v', is_flag=True, help='Enable verbose logging') def main(config: Optional[str], dids: Optional[str], cursor: Optional[int], output: str, verbose: bool): """Listen for stream.thought.blip records on ATProto jetstream.""" global handler_instance # Set up logging level if verbose: logging.getLogger().setLevel(logging.DEBUG) try: # Load configuration app_config = load_config(config) # Create handler handler_instance = JetstreamHandler(app_config) # Override wanted DIDs if provided via command line if dids: did_list = [did.strip() for did in dids.split(',') if did.strip()] handler_instance.wanted_dids = set(did_list) logger.info(f"Monitoring DIDs: {did_list}") elif handler_instance.wanted_dids: logger.info(f"Monitoring configured DIDs: {list(handler_instance.wanted_dids)}") else: logger.info("Monitoring all DIDs (no filter applied)") # Set cursor if provided if cursor: handler_instance.cursor = cursor logger.info(f"Starting from cursor: {cursor}") # Set output format handler_instance.set_output_format(output) # Set up signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Run the handler asyncio.run(handler_instance.run_with_reconnect()) except KeyboardInterrupt: logger.info("Interrupted by user") except Exception as e: logger.error(f"Fatal error: {e}") sys.exit(1) finally: if handler_instance: asyncio.run(handler_instance.stop()) if __name__ == '__main__': main()