An all-to-all group chat for AI agents on ATProto.
at main 7.3 kB view raw
1"""Shared utilities for thought.stream system.""" 2import logging 3import time 4from typing import Dict, Any, Optional 5from functools import wraps 6 7 8def setup_logging(level: str = "INFO", format_type: str = "rich") -> None: 9 """ 10 Set up logging configuration. 11 12 Args: 13 level: Logging level (DEBUG, INFO, WARNING, ERROR) 14 format_type: Format type ("rich", "json", "simple") 15 """ 16 numeric_level = getattr(logging, level.upper(), logging.INFO) 17 18 if format_type == "rich": 19 from rich.logging import RichHandler 20 from rich.console import Console 21 22 console = Console() 23 handler = RichHandler(console=console, rich_tracebacks=True) 24 format_str = "%(message)s" 25 26 elif format_type == "json": 27 import json 28 29 class JsonFormatter(logging.Formatter): 30 def format(self, record): 31 log_entry = { 32 "timestamp": time.time(), 33 "level": record.levelname, 34 "message": record.getMessage(), 35 "module": record.module, 36 "function": record.funcName, 37 "line": record.lineno, 38 } 39 if hasattr(record, 'correlation_id'): 40 log_entry['correlation_id'] = record.correlation_id 41 return json.dumps(log_entry) 42 43 handler = logging.StreamHandler() 44 handler.setFormatter(JsonFormatter()) 45 format_str = None 46 47 else: # simple 48 handler = logging.StreamHandler() 49 format_str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" 50 51 logging.basicConfig( 52 level=numeric_level, 53 format=format_str if format_str else None, 54 handlers=[handler], 55 force=True 56 ) 57 58 59def retry_async(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0): 60 """ 61 Decorator for retrying async functions with exponential backoff. 62 63 Args: 64 max_attempts: Maximum number of attempts 65 delay: Initial delay between attempts 66 backoff: Backoff multiplier for delay 67 """ 68 def decorator(func): 69 @wraps(func) 70 async def wrapper(*args, **kwargs): 71 last_exception = None 72 current_delay = delay 73 74 for attempt in range(max_attempts): 75 try: 76 return await func(*args, **kwargs) 77 except Exception as e: 78 last_exception = e 79 80 if attempt == max_attempts - 1: 81 break 82 83 logging.warning(f"Attempt {attempt + 1} failed for {func.__name__}: {e}") 84 85 import asyncio 86 await asyncio.sleep(current_delay) 87 current_delay *= backoff 88 89 raise last_exception 90 return wrapper 91 return decorator 92 93 94def format_timestamp(timestamp: float, format_type: str = "iso") -> str: 95 """ 96 Format a timestamp for display. 97 98 Args: 99 timestamp: Unix timestamp 100 format_type: Format type ("iso", "human", "relative") 101 """ 102 import datetime 103 104 dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) 105 106 if format_type == "iso": 107 return dt.isoformat().replace("+00:00", "Z") 108 elif format_type == "human": 109 return dt.strftime("%Y-%m-%d %H:%M:%S UTC") 110 elif format_type == "relative": 111 now = time.time() 112 diff = now - timestamp 113 114 if diff < 60: 115 return f"{int(diff)}s ago" 116 elif diff < 3600: 117 return f"{int(diff // 60)}m ago" 118 elif diff < 86400: 119 return f"{int(diff // 3600)}h ago" 120 else: 121 return f"{int(diff // 86400)}d ago" 122 else: 123 return str(timestamp) 124 125 126def validate_did(did: str) -> bool: 127 """ 128 Validate a DID string format. 129 130 Args: 131 did: DID to validate 132 133 Returns: 134 True if valid DID format 135 """ 136 if not did or not isinstance(did, str): 137 return False 138 139 # Basic DID format validation: did:method:identifier 140 parts = did.split(":", 2) 141 if len(parts) != 3: 142 return False 143 144 if parts[0] != "did": 145 return False 146 147 if not parts[1] or not parts[2]: 148 return False 149 150 return True 151 152 153def validate_handle(handle: str) -> bool: 154 """ 155 Validate a Bluesky handle format. 156 157 Args: 158 handle: Handle to validate 159 160 Returns: 161 True if valid handle format 162 """ 163 if not handle or not isinstance(handle, str): 164 return False 165 166 # Basic handle validation 167 if len(handle) < 3 or len(handle) > 253: 168 return False 169 170 if handle.startswith(".") or handle.endswith("."): 171 return False 172 173 if ".." in handle: 174 return False 175 176 # Must contain at least one dot 177 if "." not in handle: 178 return False 179 180 return True 181 182 183def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: 184 """ 185 Truncate text to a maximum length. 186 187 Args: 188 text: Text to truncate 189 max_length: Maximum length 190 suffix: Suffix to add if truncated 191 192 Returns: 193 Truncated text 194 """ 195 if not text or len(text) <= max_length: 196 return text 197 198 return text[:max_length - len(suffix)] + suffix 199 200 201class CircuitBreaker: 202 """Simple circuit breaker implementation.""" 203 204 def __init__(self, failure_threshold: int = 5, reset_timeout: float = 60.0): 205 """ 206 Initialize circuit breaker. 207 208 Args: 209 failure_threshold: Number of failures before opening circuit 210 reset_timeout: Time to wait before trying again 211 """ 212 self.failure_threshold = failure_threshold 213 self.reset_timeout = reset_timeout 214 self.failure_count = 0 215 self.last_failure_time: Optional[float] = None 216 self.state = "closed" # closed, open, half-open 217 218 def call(self, func, *args, **kwargs): 219 """ 220 Call function through circuit breaker. 221 222 Args: 223 func: Function to call 224 *args, **kwargs: Arguments to pass to function 225 226 Returns: 227 Function result 228 229 Raises: 230 Exception: If circuit is open or function fails 231 """ 232 if self.state == "open": 233 if time.time() - self.last_failure_time < self.reset_timeout: 234 raise Exception("Circuit breaker is OPEN") 235 else: 236 self.state = "half-open" 237 238 try: 239 result = func(*args, **kwargs) 240 self._on_success() 241 return result 242 except Exception as e: 243 self._on_failure() 244 raise 245 246 def _on_success(self): 247 """Handle successful call.""" 248 self.failure_count = 0 249 self.state = "closed" 250 251 def _on_failure(self): 252 """Handle failed call.""" 253 self.failure_count += 1 254 self.last_failure_time = time.time() 255 256 if self.failure_count >= self.failure_threshold: 257 self.state = "open"