personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Server-side event handling framework for apps.
5
6Apps can define event handlers in `events.py` that react to Callosum events.
7Handlers are discovered at Convey startup and dispatched via a thread pool.
8
9Usage in apps/my_app/events.py:
10
11 from apps.events import on_event
12
13 @on_event("observe", "observed")
14 def handle_observation(ctx):
15 day = ctx.msg.get("day")
16 segment = ctx.msg.get("segment")
17 # React to completed observation...
18
19 @on_event("cortex", "finish")
20 def handle_agent_done(ctx):
21 # React to agent completion...
22
23 @on_event("*", "*") # Wildcard - all events
24 def log_all(ctx):
25 # Debug logging...
26
27Handlers receive an EventContext with:
28 - ctx.msg: The raw Callosum message dict
29 - ctx.app: The app name that owns this handler
30 - ctx.tract: Event tract (e.g., "observe")
31 - ctx.event: Event type (e.g., "observed")
32
33Handlers can access journal path via `from convey import state` then `state.journal_root`.
34"""
35
36from __future__ import annotations
37
38import importlib
39import logging
40from concurrent.futures import Future, ThreadPoolExecutor, TimeoutError
41from dataclasses import dataclass
42from pathlib import Path
43from typing import Any, Callable, Dict, List, Tuple
44
45logger = logging.getLogger(__name__)
46
47# Default timeout for handler execution (seconds)
48DEFAULT_TIMEOUT = 30.0
49
50# Default number of worker threads
51DEFAULT_WORKERS = 4
52
53
54@dataclass
55class EventContext:
56 """Context passed to event handlers."""
57
58 msg: Dict[str, Any]
59 app: str
60 tract: str
61 event: str
62
63
64# Handler registry: (tract, event) -> [(app_name, handler_fn), ...]
65_handlers: Dict[Tuple[str, str], List[Tuple[str, Callable[[EventContext], None]]]] = {}
66
67# Thread pool for async dispatch
68_executor: ThreadPoolExecutor | None = None
69
70# Track which app is currently being imported (for decorator context)
71_current_app: str | None = None
72
73
74def on_event(tract: str, event: str) -> Callable:
75 """Decorator to register a function as an event handler.
76
77 Args:
78 tract: Callosum tract to match (e.g., "observe", "cortex") or "*" for all
79 event: Event type to match (e.g., "observed", "finish") or "*" for all
80
81 Returns:
82 Decorator function that registers the handler
83
84 Example:
85 @on_event("observe", "observed")
86 def handle_observation(ctx: EventContext):
87 print(f"Segment {ctx.msg['segment']} observed")
88 """
89
90 def decorator(fn: Callable[[EventContext], None]) -> Callable[[EventContext], None]:
91 key = (tract, event)
92 if key not in _handlers:
93 _handlers[key] = []
94
95 # Use current app context from discovery, or infer from module
96 app_name = _current_app
97 if app_name is None:
98 # Fallback: extract from module name (apps.my_app.events -> my_app)
99 module = fn.__module__
100 if module.startswith("apps.") and ".events" in module:
101 parts = module.split(".")
102 if len(parts) >= 2:
103 app_name = parts[1]
104 if app_name is None:
105 app_name = "unknown"
106
107 _handlers[key].append((app_name, fn))
108 logger.debug(
109 f"Registered handler {fn.__name__} for ({tract}, {event}) in app {app_name}"
110 )
111 return fn
112
113 return decorator
114
115
116def discover_handlers() -> int:
117 """Discover and load event handlers from apps/*/events.py.
118
119 This function scans the apps/ directory for events.py files and
120 dynamically imports them, which triggers @on_event decorators.
121
122 Returns:
123 Number of apps with event handlers discovered
124
125 Raises:
126 No exceptions - errors are logged but don't prevent other apps from loading
127 """
128 global _current_app
129
130 apps_dir = Path(__file__).parent
131
132 if not apps_dir.exists():
133 logger.debug("No apps/ directory found, skipping event handler discovery")
134 return 0
135
136 discovered_count = 0
137 total_handlers = 0
138
139 for app_dir in sorted(apps_dir.iterdir()):
140 # Skip non-directories and private directories
141 if not app_dir.is_dir() or app_dir.name.startswith("_"):
142 continue
143
144 events_file = app_dir / "events.py"
145 if not events_file.exists():
146 continue
147
148 app_name = app_dir.name
149
150 try:
151 # Set context for decorator
152 _current_app = app_name
153
154 # Import triggers @on_event decorators
155 module_name = f"apps.{app_name}.events"
156 importlib.import_module(module_name)
157
158 # Count handlers for this app
159 app_handlers = sum(
160 1
161 for handlers in _handlers.values()
162 for (app, _) in handlers
163 if app == app_name
164 )
165
166 discovered_count += 1
167 total_handlers += app_handlers
168 logger.info(f"Loaded {app_handlers} event handler(s) from app: {app_name}")
169 except Exception as e:
170 # Gracefully handle errors - don't break server startup
171 logger.error(
172 f"Failed to load events from app '{app_name}': {e}", exc_info=True
173 )
174 finally:
175 _current_app = None
176
177 if discovered_count > 0:
178 logger.info(
179 f"Discovered {total_handlers} event handler(s) from {discovered_count} app(s)"
180 )
181
182 return discovered_count
183
184
185def _get_handlers(
186 msg: Dict[str, Any],
187) -> List[Tuple[str, Callable[[EventContext], None]]]:
188 """Get all handlers matching the given message.
189
190 Matches exact (tract, event), plus wildcards:
191 - ("*", "*") matches all events
192 - (tract, "*") matches all events in a tract
193 - ("*", event) matches event type across all tracts
194
195 Args:
196 msg: Callosum message with tract and event fields
197
198 Returns:
199 List of (app_name, handler_fn) tuples
200 """
201 tract = msg.get("tract", "")
202 event = msg.get("event", "")
203
204 handlers = []
205
206 # Exact match
207 handlers.extend(_handlers.get((tract, event), []))
208
209 # Wildcard: all events in this tract
210 if (tract, "*") in _handlers:
211 handlers.extend(_handlers[(tract, "*")])
212
213 # Wildcard: this event type in any tract
214 if ("*", event) in _handlers:
215 handlers.extend(_handlers[("*", event)])
216
217 # Wildcard: all events
218 if ("*", "*") in _handlers:
219 handlers.extend(_handlers[("*", "*")])
220
221 return handlers
222
223
224def _run_handler(
225 app_name: str,
226 handler: Callable[[EventContext], None],
227 ctx: EventContext,
228) -> None:
229 """Run a single handler with error handling.
230
231 Args:
232 app_name: Name of the app that owns this handler
233 handler: The handler function to call
234 ctx: Event context to pass to handler
235 """
236 try:
237 handler(ctx)
238 except Exception as e:
239 logger.error(
240 f"Event handler {handler.__name__} (app: {app_name}) failed: {e}",
241 exc_info=True,
242 )
243
244
245def dispatch(msg: Dict[str, Any], timeout: float = DEFAULT_TIMEOUT) -> int:
246 """Dispatch a Callosum message to matching handlers.
247
248 Handlers are submitted to the thread pool and this function blocks until
249 all handlers complete or timeout. This serializes event processing to
250 ensure handlers finish before the next event is processed.
251
252 Each handler is wrapped in error handling so failures don't affect other
253 handlers or the caller.
254
255 Args:
256 msg: Callosum message dict with tract, event, and other fields
257 timeout: Maximum seconds to wait per handler (default: 30)
258
259 Returns:
260 Number of handlers invoked
261 """
262 if _executor is None:
263 logger.debug("Event dispatcher not started, skipping dispatch")
264 return 0
265
266 handlers = _get_handlers(msg)
267 if not handlers:
268 return 0
269
270 tract = msg.get("tract", "")
271 event = msg.get("event", "")
272
273 futures: List[Tuple[str, str, Future]] = []
274
275 for app_name, handler in handlers:
276 ctx = EventContext(
277 msg=msg,
278 app=app_name,
279 tract=tract,
280 event=event,
281 )
282 future = _executor.submit(_run_handler, app_name, handler, ctx)
283 futures.append((app_name, handler.__name__, future))
284
285 # Wait for all handlers with timeout (serializes event processing)
286 for app_name, handler_name, future in futures:
287 try:
288 future.result(timeout=timeout)
289 except TimeoutError:
290 logger.warning(
291 f"Event handler {handler_name} (app: {app_name}) timed out after {timeout}s"
292 )
293 except Exception as e:
294 # Should not happen since _run_handler catches exceptions
295 logger.error(f"Unexpected error in handler {handler_name}: {e}")
296
297 return len(handlers)
298
299
300def start_dispatcher(workers: int = DEFAULT_WORKERS) -> None:
301 """Start the event dispatcher thread pool.
302
303 Args:
304 workers: Number of worker threads (default: 4)
305 """
306 global _executor
307
308 if _executor is not None:
309 logger.debug("Event dispatcher already started")
310 return
311
312 _executor = ThreadPoolExecutor(
313 max_workers=workers, thread_name_prefix="event_handler"
314 )
315 logger.info(f"Started event dispatcher with {workers} workers")
316
317
318def stop_dispatcher() -> None:
319 """Stop the event dispatcher thread pool gracefully."""
320 global _executor
321
322 if _executor is None:
323 return
324
325 logger.info("Stopping event dispatcher...")
326 _executor.shutdown(wait=True, cancel_futures=False)
327 _executor = None
328 logger.info("Event dispatcher stopped")
329
330
331def get_handler_count() -> int:
332 """Get the total number of registered handlers.
333
334 Returns:
335 Total handler count across all apps and event patterns
336 """
337 return sum(len(handlers) for handlers in _handlers.values())
338
339
340def clear_handlers() -> None:
341 """Clear all registered handlers. Useful for testing."""
342 _handlers.clear()