personal memory agent
at main 342 lines 10 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Server-side event handling framework for apps. 5 6Apps can define event handlers in `events.py` that react to Callosum events. 7Handlers are discovered at Convey startup and dispatched via a thread pool. 8 9Usage in apps/my_app/events.py: 10 11 from apps.events import on_event 12 13 @on_event("observe", "observed") 14 def handle_observation(ctx): 15 day = ctx.msg.get("day") 16 segment = ctx.msg.get("segment") 17 # React to completed observation... 18 19 @on_event("cortex", "finish") 20 def handle_agent_done(ctx): 21 # React to agent completion... 22 23 @on_event("*", "*") # Wildcard - all events 24 def log_all(ctx): 25 # Debug logging... 26 27Handlers receive an EventContext with: 28 - ctx.msg: The raw Callosum message dict 29 - ctx.app: The app name that owns this handler 30 - ctx.tract: Event tract (e.g., "observe") 31 - ctx.event: Event type (e.g., "observed") 32 33Handlers can access journal path via `from convey import state` then `state.journal_root`. 34""" 35 36from __future__ import annotations 37 38import importlib 39import logging 40from concurrent.futures import Future, ThreadPoolExecutor, TimeoutError 41from dataclasses import dataclass 42from pathlib import Path 43from typing import Any, Callable, Dict, List, Tuple 44 45logger = logging.getLogger(__name__) 46 47# Default timeout for handler execution (seconds) 48DEFAULT_TIMEOUT = 30.0 49 50# Default number of worker threads 51DEFAULT_WORKERS = 4 52 53 54@dataclass 55class EventContext: 56 """Context passed to event handlers.""" 57 58 msg: Dict[str, Any] 59 app: str 60 tract: str 61 event: str 62 63 64# Handler registry: (tract, event) -> [(app_name, handler_fn), ...] 65_handlers: Dict[Tuple[str, str], List[Tuple[str, Callable[[EventContext], None]]]] = {} 66 67# Thread pool for async dispatch 68_executor: ThreadPoolExecutor | None = None 69 70# Track which app is currently being imported (for decorator context) 71_current_app: str | None = None 72 73 74def on_event(tract: str, event: str) -> Callable: 75 """Decorator to register a function as an event handler. 76 77 Args: 78 tract: Callosum tract to match (e.g., "observe", "cortex") or "*" for all 79 event: Event type to match (e.g., "observed", "finish") or "*" for all 80 81 Returns: 82 Decorator function that registers the handler 83 84 Example: 85 @on_event("observe", "observed") 86 def handle_observation(ctx: EventContext): 87 print(f"Segment {ctx.msg['segment']} observed") 88 """ 89 90 def decorator(fn: Callable[[EventContext], None]) -> Callable[[EventContext], None]: 91 key = (tract, event) 92 if key not in _handlers: 93 _handlers[key] = [] 94 95 # Use current app context from discovery, or infer from module 96 app_name = _current_app 97 if app_name is None: 98 # Fallback: extract from module name (apps.my_app.events -> my_app) 99 module = fn.__module__ 100 if module.startswith("apps.") and ".events" in module: 101 parts = module.split(".") 102 if len(parts) >= 2: 103 app_name = parts[1] 104 if app_name is None: 105 app_name = "unknown" 106 107 _handlers[key].append((app_name, fn)) 108 logger.debug( 109 f"Registered handler {fn.__name__} for ({tract}, {event}) in app {app_name}" 110 ) 111 return fn 112 113 return decorator 114 115 116def discover_handlers() -> int: 117 """Discover and load event handlers from apps/*/events.py. 118 119 This function scans the apps/ directory for events.py files and 120 dynamically imports them, which triggers @on_event decorators. 121 122 Returns: 123 Number of apps with event handlers discovered 124 125 Raises: 126 No exceptions - errors are logged but don't prevent other apps from loading 127 """ 128 global _current_app 129 130 apps_dir = Path(__file__).parent 131 132 if not apps_dir.exists(): 133 logger.debug("No apps/ directory found, skipping event handler discovery") 134 return 0 135 136 discovered_count = 0 137 total_handlers = 0 138 139 for app_dir in sorted(apps_dir.iterdir()): 140 # Skip non-directories and private directories 141 if not app_dir.is_dir() or app_dir.name.startswith("_"): 142 continue 143 144 events_file = app_dir / "events.py" 145 if not events_file.exists(): 146 continue 147 148 app_name = app_dir.name 149 150 try: 151 # Set context for decorator 152 _current_app = app_name 153 154 # Import triggers @on_event decorators 155 module_name = f"apps.{app_name}.events" 156 importlib.import_module(module_name) 157 158 # Count handlers for this app 159 app_handlers = sum( 160 1 161 for handlers in _handlers.values() 162 for (app, _) in handlers 163 if app == app_name 164 ) 165 166 discovered_count += 1 167 total_handlers += app_handlers 168 logger.info(f"Loaded {app_handlers} event handler(s) from app: {app_name}") 169 except Exception as e: 170 # Gracefully handle errors - don't break server startup 171 logger.error( 172 f"Failed to load events from app '{app_name}': {e}", exc_info=True 173 ) 174 finally: 175 _current_app = None 176 177 if discovered_count > 0: 178 logger.info( 179 f"Discovered {total_handlers} event handler(s) from {discovered_count} app(s)" 180 ) 181 182 return discovered_count 183 184 185def _get_handlers( 186 msg: Dict[str, Any], 187) -> List[Tuple[str, Callable[[EventContext], None]]]: 188 """Get all handlers matching the given message. 189 190 Matches exact (tract, event), plus wildcards: 191 - ("*", "*") matches all events 192 - (tract, "*") matches all events in a tract 193 - ("*", event) matches event type across all tracts 194 195 Args: 196 msg: Callosum message with tract and event fields 197 198 Returns: 199 List of (app_name, handler_fn) tuples 200 """ 201 tract = msg.get("tract", "") 202 event = msg.get("event", "") 203 204 handlers = [] 205 206 # Exact match 207 handlers.extend(_handlers.get((tract, event), [])) 208 209 # Wildcard: all events in this tract 210 if (tract, "*") in _handlers: 211 handlers.extend(_handlers[(tract, "*")]) 212 213 # Wildcard: this event type in any tract 214 if ("*", event) in _handlers: 215 handlers.extend(_handlers[("*", event)]) 216 217 # Wildcard: all events 218 if ("*", "*") in _handlers: 219 handlers.extend(_handlers[("*", "*")]) 220 221 return handlers 222 223 224def _run_handler( 225 app_name: str, 226 handler: Callable[[EventContext], None], 227 ctx: EventContext, 228) -> None: 229 """Run a single handler with error handling. 230 231 Args: 232 app_name: Name of the app that owns this handler 233 handler: The handler function to call 234 ctx: Event context to pass to handler 235 """ 236 try: 237 handler(ctx) 238 except Exception as e: 239 logger.error( 240 f"Event handler {handler.__name__} (app: {app_name}) failed: {e}", 241 exc_info=True, 242 ) 243 244 245def dispatch(msg: Dict[str, Any], timeout: float = DEFAULT_TIMEOUT) -> int: 246 """Dispatch a Callosum message to matching handlers. 247 248 Handlers are submitted to the thread pool and this function blocks until 249 all handlers complete or timeout. This serializes event processing to 250 ensure handlers finish before the next event is processed. 251 252 Each handler is wrapped in error handling so failures don't affect other 253 handlers or the caller. 254 255 Args: 256 msg: Callosum message dict with tract, event, and other fields 257 timeout: Maximum seconds to wait per handler (default: 30) 258 259 Returns: 260 Number of handlers invoked 261 """ 262 if _executor is None: 263 logger.debug("Event dispatcher not started, skipping dispatch") 264 return 0 265 266 handlers = _get_handlers(msg) 267 if not handlers: 268 return 0 269 270 tract = msg.get("tract", "") 271 event = msg.get("event", "") 272 273 futures: List[Tuple[str, str, Future]] = [] 274 275 for app_name, handler in handlers: 276 ctx = EventContext( 277 msg=msg, 278 app=app_name, 279 tract=tract, 280 event=event, 281 ) 282 future = _executor.submit(_run_handler, app_name, handler, ctx) 283 futures.append((app_name, handler.__name__, future)) 284 285 # Wait for all handlers with timeout (serializes event processing) 286 for app_name, handler_name, future in futures: 287 try: 288 future.result(timeout=timeout) 289 except TimeoutError: 290 logger.warning( 291 f"Event handler {handler_name} (app: {app_name}) timed out after {timeout}s" 292 ) 293 except Exception as e: 294 # Should not happen since _run_handler catches exceptions 295 logger.error(f"Unexpected error in handler {handler_name}: {e}") 296 297 return len(handlers) 298 299 300def start_dispatcher(workers: int = DEFAULT_WORKERS) -> None: 301 """Start the event dispatcher thread pool. 302 303 Args: 304 workers: Number of worker threads (default: 4) 305 """ 306 global _executor 307 308 if _executor is not None: 309 logger.debug("Event dispatcher already started") 310 return 311 312 _executor = ThreadPoolExecutor( 313 max_workers=workers, thread_name_prefix="event_handler" 314 ) 315 logger.info(f"Started event dispatcher with {workers} workers") 316 317 318def stop_dispatcher() -> None: 319 """Stop the event dispatcher thread pool gracefully.""" 320 global _executor 321 322 if _executor is None: 323 return 324 325 logger.info("Stopping event dispatcher...") 326 _executor.shutdown(wait=True, cancel_futures=False) 327 _executor = None 328 logger.info("Event dispatcher stopped") 329 330 331def get_handler_count() -> int: 332 """Get the total number of registered handlers. 333 334 Returns: 335 Total handler count across all apps and event patterns 336 """ 337 return sum(len(handlers) for handlers in _handlers.values()) 338 339 340def clear_handlers() -> None: 341 """Clear all registered handlers. Useful for testing.""" 342 _handlers.clear()