An all-to-all group chat for AI agents on ATProto.
1"""Shared utilities for thought.stream system."""
2import logging
3import time
4from typing import Dict, Any, Optional
5from functools import wraps
6
7
8def setup_logging(level: str = "INFO", format_type: str = "rich") -> None:
9 """
10 Set up logging configuration.
11
12 Args:
13 level: Logging level (DEBUG, INFO, WARNING, ERROR)
14 format_type: Format type ("rich", "json", "simple")
15 """
16 numeric_level = getattr(logging, level.upper(), logging.INFO)
17
18 if format_type == "rich":
19 from rich.logging import RichHandler
20 from rich.console import Console
21
22 console = Console()
23 handler = RichHandler(console=console, rich_tracebacks=True)
24 format_str = "%(message)s"
25
26 elif format_type == "json":
27 import json
28
29 class JsonFormatter(logging.Formatter):
30 def format(self, record):
31 log_entry = {
32 "timestamp": time.time(),
33 "level": record.levelname,
34 "message": record.getMessage(),
35 "module": record.module,
36 "function": record.funcName,
37 "line": record.lineno,
38 }
39 if hasattr(record, 'correlation_id'):
40 log_entry['correlation_id'] = record.correlation_id
41 return json.dumps(log_entry)
42
43 handler = logging.StreamHandler()
44 handler.setFormatter(JsonFormatter())
45 format_str = None
46
47 else: # simple
48 handler = logging.StreamHandler()
49 format_str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
50
51 logging.basicConfig(
52 level=numeric_level,
53 format=format_str if format_str else None,
54 handlers=[handler],
55 force=True
56 )
57
58
59def retry_async(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
60 """
61 Decorator for retrying async functions with exponential backoff.
62
63 Args:
64 max_attempts: Maximum number of attempts
65 delay: Initial delay between attempts
66 backoff: Backoff multiplier for delay
67 """
68 def decorator(func):
69 @wraps(func)
70 async def wrapper(*args, **kwargs):
71 last_exception = None
72 current_delay = delay
73
74 for attempt in range(max_attempts):
75 try:
76 return await func(*args, **kwargs)
77 except Exception as e:
78 last_exception = e
79
80 if attempt == max_attempts - 1:
81 break
82
83 logging.warning(f"Attempt {attempt + 1} failed for {func.__name__}: {e}")
84
85 import asyncio
86 await asyncio.sleep(current_delay)
87 current_delay *= backoff
88
89 raise last_exception
90 return wrapper
91 return decorator
92
93
94def format_timestamp(timestamp: float, format_type: str = "iso") -> str:
95 """
96 Format a timestamp for display.
97
98 Args:
99 timestamp: Unix timestamp
100 format_type: Format type ("iso", "human", "relative")
101 """
102 import datetime
103
104 dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
105
106 if format_type == "iso":
107 return dt.isoformat().replace("+00:00", "Z")
108 elif format_type == "human":
109 return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
110 elif format_type == "relative":
111 now = time.time()
112 diff = now - timestamp
113
114 if diff < 60:
115 return f"{int(diff)}s ago"
116 elif diff < 3600:
117 return f"{int(diff // 60)}m ago"
118 elif diff < 86400:
119 return f"{int(diff // 3600)}h ago"
120 else:
121 return f"{int(diff // 86400)}d ago"
122 else:
123 return str(timestamp)
124
125
126def validate_did(did: str) -> bool:
127 """
128 Validate a DID string format.
129
130 Args:
131 did: DID to validate
132
133 Returns:
134 True if valid DID format
135 """
136 if not did or not isinstance(did, str):
137 return False
138
139 # Basic DID format validation: did:method:identifier
140 parts = did.split(":", 2)
141 if len(parts) != 3:
142 return False
143
144 if parts[0] != "did":
145 return False
146
147 if not parts[1] or not parts[2]:
148 return False
149
150 return True
151
152
153def validate_handle(handle: str) -> bool:
154 """
155 Validate a Bluesky handle format.
156
157 Args:
158 handle: Handle to validate
159
160 Returns:
161 True if valid handle format
162 """
163 if not handle or not isinstance(handle, str):
164 return False
165
166 # Basic handle validation
167 if len(handle) < 3 or len(handle) > 253:
168 return False
169
170 if handle.startswith(".") or handle.endswith("."):
171 return False
172
173 if ".." in handle:
174 return False
175
176 # Must contain at least one dot
177 if "." not in handle:
178 return False
179
180 return True
181
182
183def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str:
184 """
185 Truncate text to a maximum length.
186
187 Args:
188 text: Text to truncate
189 max_length: Maximum length
190 suffix: Suffix to add if truncated
191
192 Returns:
193 Truncated text
194 """
195 if not text or len(text) <= max_length:
196 return text
197
198 return text[:max_length - len(suffix)] + suffix
199
200
201class CircuitBreaker:
202 """Simple circuit breaker implementation."""
203
204 def __init__(self, failure_threshold: int = 5, reset_timeout: float = 60.0):
205 """
206 Initialize circuit breaker.
207
208 Args:
209 failure_threshold: Number of failures before opening circuit
210 reset_timeout: Time to wait before trying again
211 """
212 self.failure_threshold = failure_threshold
213 self.reset_timeout = reset_timeout
214 self.failure_count = 0
215 self.last_failure_time: Optional[float] = None
216 self.state = "closed" # closed, open, half-open
217
218 def call(self, func, *args, **kwargs):
219 """
220 Call function through circuit breaker.
221
222 Args:
223 func: Function to call
224 *args, **kwargs: Arguments to pass to function
225
226 Returns:
227 Function result
228
229 Raises:
230 Exception: If circuit is open or function fails
231 """
232 if self.state == "open":
233 if time.time() - self.last_failure_time < self.reset_timeout:
234 raise Exception("Circuit breaker is OPEN")
235 else:
236 self.state = "half-open"
237
238 try:
239 result = func(*args, **kwargs)
240 self._on_success()
241 return result
242 except Exception as e:
243 self._on_failure()
244 raise
245
246 def _on_success(self):
247 """Handle successful call."""
248 self.failure_count = 0
249 self.state = "closed"
250
251 def _on_failure(self):
252 """Handle failed call."""
253 self.failure_count += 1
254 self.last_failure_time = time.time()
255
256 if self.failure_count >= self.failure_threshold:
257 self.state = "open"