"""Shared utilities for thought.stream system.""" import logging import time from typing import Dict, Any, Optional from functools import wraps def setup_logging(level: str = "INFO", format_type: str = "rich") -> None: """ Set up logging configuration. Args: level: Logging level (DEBUG, INFO, WARNING, ERROR) format_type: Format type ("rich", "json", "simple") """ numeric_level = getattr(logging, level.upper(), logging.INFO) if format_type == "rich": from rich.logging import RichHandler from rich.console import Console console = Console() handler = RichHandler(console=console, rich_tracebacks=True) format_str = "%(message)s" elif format_type == "json": import json class JsonFormatter(logging.Formatter): def format(self, record): log_entry = { "timestamp": time.time(), "level": record.levelname, "message": record.getMessage(), "module": record.module, "function": record.funcName, "line": record.lineno, } if hasattr(record, 'correlation_id'): log_entry['correlation_id'] = record.correlation_id return json.dumps(log_entry) handler = logging.StreamHandler() handler.setFormatter(JsonFormatter()) format_str = None else: # simple handler = logging.StreamHandler() format_str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" logging.basicConfig( level=numeric_level, format=format_str if format_str else None, handlers=[handler], force=True ) def retry_async(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0): """ Decorator for retrying async functions with exponential backoff. Args: max_attempts: Maximum number of attempts delay: Initial delay between attempts backoff: Backoff multiplier for delay """ def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_exception = None current_delay = delay for attempt in range(max_attempts): try: return await func(*args, **kwargs) except Exception as e: last_exception = e if attempt == max_attempts - 1: break logging.warning(f"Attempt {attempt + 1} failed for {func.__name__}: {e}") import asyncio await asyncio.sleep(current_delay) current_delay *= backoff raise last_exception return wrapper return decorator def format_timestamp(timestamp: float, format_type: str = "iso") -> str: """ Format a timestamp for display. Args: timestamp: Unix timestamp format_type: Format type ("iso", "human", "relative") """ import datetime dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) if format_type == "iso": return dt.isoformat().replace("+00:00", "Z") elif format_type == "human": return dt.strftime("%Y-%m-%d %H:%M:%S UTC") elif format_type == "relative": now = time.time() diff = now - timestamp if diff < 60: return f"{int(diff)}s ago" elif diff < 3600: return f"{int(diff // 60)}m ago" elif diff < 86400: return f"{int(diff // 3600)}h ago" else: return f"{int(diff // 86400)}d ago" else: return str(timestamp) def validate_did(did: str) -> bool: """ Validate a DID string format. Args: did: DID to validate Returns: True if valid DID format """ if not did or not isinstance(did, str): return False # Basic DID format validation: did:method:identifier parts = did.split(":", 2) if len(parts) != 3: return False if parts[0] != "did": return False if not parts[1] or not parts[2]: return False return True def validate_handle(handle: str) -> bool: """ Validate a Bluesky handle format. Args: handle: Handle to validate Returns: True if valid handle format """ if not handle or not isinstance(handle, str): return False # Basic handle validation if len(handle) < 3 or len(handle) > 253: return False if handle.startswith(".") or handle.endswith("."): return False if ".." in handle: return False # Must contain at least one dot if "." not in handle: return False return True def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: """ Truncate text to a maximum length. Args: text: Text to truncate max_length: Maximum length suffix: Suffix to add if truncated Returns: Truncated text """ if not text or len(text) <= max_length: return text return text[:max_length - len(suffix)] + suffix class CircuitBreaker: """Simple circuit breaker implementation.""" def __init__(self, failure_threshold: int = 5, reset_timeout: float = 60.0): """ Initialize circuit breaker. Args: failure_threshold: Number of failures before opening circuit reset_timeout: Time to wait before trying again """ self.failure_threshold = failure_threshold self.reset_timeout = reset_timeout self.failure_count = 0 self.last_failure_time: Optional[float] = None self.state = "closed" # closed, open, half-open def call(self, func, *args, **kwargs): """ Call function through circuit breaker. Args: func: Function to call *args, **kwargs: Arguments to pass to function Returns: Function result Raises: Exception: If circuit is open or function fails """ if self.state == "open": if time.time() - self.last_failure_time < self.reset_timeout: raise Exception("Circuit breaker is OPEN") else: self.state = "half-open" try: result = func(*args, **kwargs) self._on_success() return result except Exception as e: self._on_failure() raise def _on_success(self): """Handle successful call.""" self.failure_count = 0 self.state = "closed" def _on_failure(self): """Handle failed call.""" self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = "open"