personal memory agent
at main 784 lines 31 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Callosum-based agent process manager for solstone. 5 6Cortex listens for agent requests via the Callosum message bus and manages 7agent process lifecycle: 8- Receives requests via Callosum (tract="cortex", event="request") 9- Creates <agent>/<timestamp>_active.jsonl files to track active agents 10- Spawns agent processes and captures their stdout events 11- Broadcasts all agent events back to Callosum 12- Renames to <agent>/<timestamp>.jsonl when complete 13 14Agent files provide persistence and historical record, while Callosum provides 15real-time event distribution to all interested services. 16""" 17 18from __future__ import annotations 19 20import json 21import logging 22import os 23import subprocess 24import sys 25import threading 26import time 27from pathlib import Path 28from typing import Any, Dict, Optional 29 30from think.callosum import CallosumConnection 31from think.runner import _atomic_symlink 32from think.utils import get_journal, get_rev, now_ms 33 34 35class AgentProcess: 36 """Manages a running agent subprocess.""" 37 38 def __init__(self, agent_id: str, process: subprocess.Popen, log_path: Path): 39 self.agent_id = agent_id 40 self.process = process 41 self.log_path = log_path 42 self.stop_event = threading.Event() 43 self.timeout_timer = None # For timeout support 44 self.start_time = time.time() # Track when agent started 45 46 def is_running(self) -> bool: 47 """Check if the agent process is still running.""" 48 return self.process.poll() is None and not self.stop_event.is_set() 49 50 def stop(self) -> None: 51 """Stop the agent process gracefully.""" 52 self.stop_event.set() 53 54 # Cancel timeout timer if it exists 55 if self.timeout_timer: 56 self.timeout_timer.cancel() 57 58 if self.process.poll() is None: 59 # First try SIGTERM for graceful shutdown 60 self.process.terminate() 61 try: 62 self.process.wait(timeout=10) # Give more time for graceful shutdown 63 except subprocess.TimeoutExpired: 64 logging.getLogger(__name__).warning( 65 f"Agent {self.agent_id} didn't stop gracefully, killing" 66 ) 67 self.process.kill() 68 self.process.wait() # Ensure zombie is reaped 69 70 71class CortexService: 72 """Callosum-based agent process manager.""" 73 74 def __init__(self, journal_path: Optional[str] = None): 75 self.journal_path = Path(journal_path or get_journal()) 76 self.agents_dir = self.journal_path / "agents" 77 self.agents_dir.mkdir(parents=True, exist_ok=True) 78 79 self.logger = logging.getLogger(__name__) 80 self.running_agents: Dict[str, AgentProcess] = {} 81 self.agent_requests: Dict[str, Dict[str, Any]] = {} # Store agent requests 82 self.lock = threading.RLock() 83 self.stop_event = threading.Event() 84 self.shutdown_requested = threading.Event() 85 86 # Callosum connection for receiving requests and broadcasting events 87 self.callosum = CallosumConnection(defaults={"rev": get_rev()}) 88 89 def _create_error_event( 90 self, 91 agent_id: str, 92 error: str, 93 trace: Optional[str] = None, 94 exit_code: Optional[int] = None, 95 ) -> Dict[str, Any]: 96 """Create standardized error event.""" 97 event = { 98 "event": "error", 99 "ts": now_ms(), 100 "agent_id": agent_id, 101 "error": error, 102 } 103 if trace: 104 event["trace"] = trace 105 if exit_code is not None: 106 event["exit_code"] = exit_code 107 return event 108 109 def _recover_orphaned_agents(self, active_files: list) -> None: 110 """Recover orphaned active agent files from a previous crash. 111 112 Appends an error event to each file and renames to completed. 113 """ 114 for file_path in active_files: 115 agent_id = file_path.stem.replace("_active", "") 116 try: 117 error_event = self._create_error_event( 118 agent_id, "Recovered: Cortex restarted while agent was running" 119 ) 120 with open(file_path, "a") as f: 121 f.write(json.dumps(error_event) + "\n") 122 123 completed_path = file_path.parent / f"{agent_id}.jsonl" 124 file_path.rename(completed_path) 125 self.logger.warning(f"Recovered orphaned agent: {agent_id}") 126 except Exception as e: 127 self.logger.error(f"Failed to recover agent {agent_id}: {e}") 128 129 def start(self) -> None: 130 """Start listening for agent requests via Callosum.""" 131 # Recover any orphaned active files from previous crash 132 active_files = list(self.agents_dir.glob("*/*_active.jsonl")) 133 if active_files: 134 self.logger.warning( 135 f"Found {len(active_files)} orphaned agent(s), recovering..." 136 ) 137 self._recover_orphaned_agents(active_files) 138 139 # Connect to Callosum to receive requests 140 try: 141 self.callosum.start(callback=self._handle_callosum_message) 142 self.logger.info("Connected to Callosum message bus") 143 self.callosum.emit("supervisor", "request", cmd=["sol", "agents", "check"]) 144 self.logger.info("Requested agents health check via supervisor") 145 except Exception as e: 146 self.logger.error(f"Failed to connect to Callosum: {e}") 147 sys.exit(1) 148 149 # Start status emission thread 150 threading.Thread( 151 target=self._emit_periodic_status, 152 name="cortex-status", 153 daemon=True, 154 ).start() 155 156 self.logger.info("Cortex service started, listening for agent requests") 157 158 while True: 159 try: 160 while not self.stop_event.is_set(): 161 time.sleep(1) 162 # Exit when idle during shutdown 163 if self.shutdown_requested.is_set(): 164 with self.lock: 165 if len(self.running_agents) == 0: 166 self.logger.info( 167 "No agents running, exiting gracefully" 168 ) 169 return 170 break 171 except KeyboardInterrupt: 172 self.logger.info("Shutdown requested, will exit when idle") 173 self.shutdown_requested.set() 174 175 def _handle_callosum_message(self, message: Dict[str, Any]) -> None: 176 """Handle incoming Callosum messages (callback).""" 177 # Filter for cortex tract and request event 178 if message.get("tract") != "cortex" or message.get("event") != "request": 179 return 180 181 # Handle the request 182 try: 183 self._handle_request(message) 184 except Exception as e: 185 self.logger.exception(f"Error handling request: {e}") 186 187 def _handle_request(self, request: Dict[str, Any]) -> None: 188 """Handle a new agent request from Callosum. 189 190 Cortex is a minimal process manager - it only handles: 191 - File lifecycle (<agent>/<id>_active.jsonl -> <agent>/<id>.jsonl) 192 - Process spawning and monitoring 193 - Event relay to Callosum 194 195 All config loading, validation, and hydration is done by agents.py. 196 """ 197 agent_id = request.get("agent_id") 198 if not agent_id: 199 self.logger.error("Received request without agent_id") 200 return 201 202 # Skip if this agent is already being processed 203 with self.lock: 204 if agent_id in self.running_agents: 205 self.logger.debug(f"Agent {agent_id} already running, skipping") 206 return 207 208 # Create _active.jsonl file (exclusive creation to prevent race conditions) 209 name = request.get("name", "unified") 210 safe_name = name.replace(":", "--") 211 agent_subdir = self.agents_dir / safe_name 212 agent_subdir.mkdir(parents=True, exist_ok=True) 213 file_path = agent_subdir / f"{agent_id}_active.jsonl" 214 if file_path.exists(): 215 self.logger.debug(f"Agent {agent_id} already claimed by another process") 216 return 217 218 try: 219 with open(file_path, "x") as f: # 'x' mode fails if file exists 220 f.write(json.dumps(request) + "\n") 221 except FileExistsError: 222 return 223 224 self.logger.info(f"Processing agent request: {agent_id}") 225 226 # Store request for later use (output writing) 227 with self.lock: 228 self.agent_requests[agent_id] = request 229 230 # Spawn agent process - it handles all validation/hydration 231 try: 232 self._spawn_subprocess( 233 agent_id, file_path, request, ["sol", "agents"], "agent" 234 ) 235 except Exception as e: 236 self.logger.exception(f"Failed to spawn agent {agent_id}: {e}") 237 self._write_error_and_complete(file_path, f"Failed to spawn agent: {e}") 238 239 def _spawn_subprocess( 240 self, 241 agent_id: str, 242 file_path: Path, 243 config: Dict[str, Any], 244 cmd: list[str], 245 process_type: str, 246 ) -> None: 247 """Spawn a subprocess and monitor its output. 248 249 Args: 250 agent_id: Unique identifier for this process 251 file_path: Path to the JSONL log file 252 config: Configuration dict to pass via NDJSON stdin 253 cmd: Command to run (e.g., ["sol", "agents"]) 254 process_type: Label for logging ("agent") 255 """ 256 try: 257 # Store the config for later use - thread safe 258 with self.lock: 259 self.agent_requests[agent_id] = config 260 261 # Pass the full config through as NDJSON 262 ndjson_input = json.dumps(config) 263 264 # Prepare environment 265 env = os.environ.copy() 266 267 # Promote top-level config keys to environment so tools can read 268 # them as defaults (e.g., sol call todos add uses SOL_FACET). 269 # Explicit env overrides below take precedence. 270 if config.get("facet"): 271 env["SOL_FACET"] = str(config["facet"]) 272 if config.get("day"): 273 env["SOL_DAY"] = str(config["day"]) 274 275 # Apply explicit env overrides (from dream.py etc.) — these win 276 env_overrides = config.get("env") 277 if env_overrides and isinstance(env_overrides, dict): 278 env.update({k: str(v) for k, v in env_overrides.items()}) 279 280 # Spawn the subprocess 281 self.logger.info(f"Spawning {process_type} {agent_id}: {cmd}") 282 self.logger.debug(f"NDJSON input: {ndjson_input}") 283 284 process = subprocess.Popen( 285 cmd, 286 stdin=subprocess.PIPE, 287 stdout=subprocess.PIPE, 288 stderr=subprocess.PIPE, 289 text=True, 290 env=env, 291 bufsize=1, 292 ) 293 294 # Send input and close stdin 295 process.stdin.write(ndjson_input + "\n") 296 process.stdin.close() 297 298 # Track the running process 299 agent = AgentProcess(agent_id, process, file_path) 300 with self.lock: 301 self.running_agents[agent_id] = agent 302 303 # Set up timeout (default to 10 minutes if not specified) 304 timeout_seconds = config.get("timeout_seconds", 600) 305 agent.timeout_timer = threading.Timer( 306 timeout_seconds, 307 lambda: self._timeout_agent(agent_id, agent, timeout_seconds), 308 ) 309 agent.timeout_timer.start() 310 311 # Start monitoring threads 312 threading.Thread( 313 target=self._monitor_stdout, args=(agent,), daemon=True 314 ).start() 315 316 threading.Thread( 317 target=self._monitor_stderr, args=(agent,), daemon=True 318 ).start() 319 320 self.logger.info( 321 f"{process_type.capitalize()} {agent_id} spawned successfully " 322 f"(PID: {process.pid})" 323 ) 324 325 except Exception as e: 326 self.logger.exception(f"Failed to spawn {process_type} {agent_id}: {e}") 327 self._write_error_and_complete( 328 file_path, f"Failed to spawn {process_type}: {e}" 329 ) 330 331 def _timeout_agent( 332 self, agent_id: str, agent: AgentProcess, timeout_seconds: int 333 ) -> None: 334 """Handle agent timeout.""" 335 if agent.is_running(): 336 self.logger.warning( 337 f"Agent {agent_id} timed out after {timeout_seconds} seconds" 338 ) 339 error_event = self._create_error_event( 340 agent_id, f"Agent timed out after {timeout_seconds} seconds" 341 ) 342 try: 343 with open(agent.log_path, "a") as f: 344 f.write(json.dumps(error_event) + "\n") 345 except Exception as e: 346 self.logger.error(f"Failed to write timeout event: {e}") 347 348 # Broadcast to callosum so wait_for_agents detects immediately 349 try: 350 event_copy = error_event.copy() 351 event_type = event_copy.pop("event", "error") 352 self.callosum.emit("cortex", event_type, **event_copy) 353 except Exception: 354 pass 355 356 agent.stop() 357 358 def _monitor_stdout(self, agent: AgentProcess) -> None: 359 """Monitor agent stdout and append events to the JSONL file.""" 360 if not agent.process.stdout: 361 return 362 363 try: 364 with agent.process.stdout: 365 for line in agent.process.stdout: 366 if not line: 367 continue 368 369 line = line.strip() 370 if not line: 371 continue 372 373 try: 374 # Parse JSON event 375 event = json.loads(line) 376 377 # Ensure event has timestamp and agent_id 378 if "ts" not in event: 379 event["ts"] = now_ms() 380 if "agent_id" not in event: 381 event["agent_id"] = agent.agent_id 382 383 # Inject agent name for WebSocket consumers 384 with self.lock: 385 _req = self.agent_requests.get(agent.agent_id) 386 if _req and "name" not in event: 387 event["name"] = _req.get("name", "") 388 389 # Append to JSONL file 390 with open(agent.log_path, "a") as f: 391 f.write(json.dumps(event) + "\n") 392 393 # Broadcast event to Callosum 394 try: 395 event_copy = event.copy() 396 event_type = event_copy.pop("event", "unknown") 397 self.callosum.emit("cortex", event_type, **event_copy) 398 except Exception as e: 399 self.logger.info( 400 f"Failed to broadcast event to Callosum: {e}" 401 ) 402 403 # Handle start event 404 if event.get("event") == "start": 405 # Capture model and provider for status reporting 406 with self.lock: 407 if agent.agent_id in self.agent_requests: 408 model = event.get("model") 409 if model: 410 self.agent_requests[agent.agent_id]["model"] = ( 411 model 412 ) 413 provider = event.get("provider") 414 if provider: 415 self.agent_requests[agent.agent_id]["provider"] = ( 416 provider 417 ) 418 419 # Handle finish or error event 420 if event.get("event") in ["finish", "error"]: 421 # Check for output (only on finish) 422 if event.get("event") == "finish": 423 result = event.get("result", "") 424 425 # Get original request (thread-safe access) 426 with self.lock: 427 original_request = self.agent_requests.get( 428 agent.agent_id 429 ) 430 431 # Log token usage if available 432 usage_data = event.get("usage") 433 if usage_data and original_request: 434 try: 435 from think.models import log_token_usage 436 from think.talent import key_to_context 437 438 model = original_request.get("model", "unknown") 439 name = original_request.get("name", "unknown") 440 context = key_to_context(name) 441 442 # Extract segment from env if set (flat merge puts env at top level) 443 env_config = original_request.get("env", {}) 444 segment = ( 445 env_config.get("SOL_SEGMENT") 446 if env_config 447 else None 448 ) 449 450 log_token_usage( 451 model=model, 452 usage=usage_data, 453 context=context, 454 segment=segment, 455 type="cogitate", 456 ) 457 except Exception as e: 458 self.logger.warning( 459 f"Failed to log token usage for agent {agent.agent_id}: {e}" 460 ) 461 462 # Write output if requested 463 if original_request and original_request.get("output"): 464 self._write_output( 465 agent.agent_id, 466 result, 467 original_request, 468 ) 469 470 # Break to trigger cleanup 471 break 472 473 except json.JSONDecodeError: 474 # Non-JSON output becomes info event 475 info_event = { 476 "event": "info", 477 "ts": now_ms(), 478 "message": line, 479 "agent_id": agent.agent_id, 480 } 481 with open(agent.log_path, "a") as f: 482 f.write(json.dumps(info_event) + "\n") 483 484 except Exception as e: 485 self.logger.error( 486 f"Error monitoring stdout for agent {agent.agent_id}: {e}" 487 ) 488 finally: 489 # Wait for process to fully exit (reaps zombie) 490 exit_code = agent.process.wait() 491 self.logger.info(f"Agent {agent.agent_id} exited with code {exit_code}") 492 493 # Check if finish event was emitted 494 has_finish = self._has_finish_event(agent.log_path) 495 496 if not has_finish: 497 # Write error event if no finish using standardized format 498 error_event = self._create_error_event( 499 agent.agent_id, 500 f"Agent exited with code {exit_code} without finish event", 501 exit_code=exit_code, 502 ) 503 with open(agent.log_path, "a") as f: 504 f.write(json.dumps(error_event) + "\n") 505 506 # Complete the file (rename from _active.jsonl to .jsonl) 507 self._complete_agent_file(agent.agent_id, agent.log_path) 508 509 # Remove from running agents and clean up stored request (thread-safe) 510 with self.lock: 511 if agent.agent_id in self.running_agents: 512 del self.running_agents[agent.agent_id] 513 # Clean up stored request 514 if agent.agent_id in self.agent_requests: 515 del self.agent_requests[agent.agent_id] 516 517 def _monitor_stderr(self, agent: AgentProcess) -> None: 518 """Monitor agent stderr for errors.""" 519 if not agent.process.stderr: 520 return 521 522 stderr_lines = [] 523 try: 524 with agent.process.stderr: 525 for line in agent.process.stderr: 526 if not line: 527 continue 528 stripped = line.strip() 529 if stripped: 530 stderr_lines.append(stripped) 531 # Pass through to cortex stderr with agent prefix for traceability 532 print( 533 f"[agent:{agent.agent_id}:stderr] {stripped}", 534 file=sys.stderr, 535 flush=True, 536 ) 537 538 except Exception as e: 539 self.logger.error( 540 f"Error monitoring stderr for agent {agent.agent_id}: {e}" 541 ) 542 finally: 543 # If process failed with stderr output, write error event 544 if stderr_lines: 545 exit_code = agent.process.poll() 546 if exit_code is not None and exit_code != 0: 547 error_event = self._create_error_event( 548 agent.agent_id, 549 "Process failed with stderr output", 550 trace="\n".join(stderr_lines), 551 exit_code=exit_code, 552 ) 553 try: 554 with open(agent.log_path, "a") as f: 555 f.write(json.dumps(error_event) + "\n") 556 except Exception as e: 557 self.logger.warning(f"Failed to write stderr event: {e}") 558 559 def _has_finish_event(self, file_path: Path) -> bool: 560 """Check if the JSONL file contains a finish or error event.""" 561 try: 562 with open(file_path, "r") as f: 563 for line in f: 564 try: 565 event = json.loads(line) 566 if event.get("event") in ["finish", "error"]: 567 return True 568 except json.JSONDecodeError: 569 continue 570 except Exception: 571 pass 572 return False 573 574 def _complete_agent_file(self, agent_id: str, file_path: Path) -> None: 575 """Complete an agent by renaming the file from _active.jsonl to .jsonl.""" 576 try: 577 completed_path = file_path.parent / f"{agent_id}.jsonl" 578 file_path.rename(completed_path) 579 self.logger.info(f"Completed agent {agent_id}: {completed_path}") 580 581 # Create convenience symlink: {name}.log -> {name}/{agent_id}.jsonl 582 request = self.agent_requests.get(agent_id) 583 if request: 584 name = request.get("name") 585 if name: 586 safe_name = name.replace(":", "--") 587 link_path = self.agents_dir / f"{safe_name}.log" 588 _atomic_symlink(link_path, f"{safe_name}/{agent_id}.jsonl") 589 self.logger.debug( 590 f"Symlinked {safe_name}.log -> {safe_name}/{agent_id}.jsonl" 591 ) 592 593 # Append summary to day index 594 self._append_day_index(agent_id, request, completed_path) 595 else: 596 self.logger.debug( 597 f"No name in request for {agent_id}, skipping symlink" 598 ) 599 except Exception as e: 600 self.logger.error(f"Failed to complete agent file {agent_id}: {e}") 601 602 def _append_day_index( 603 self, agent_id: str, request: Dict[str, Any], completed_path: Path 604 ) -> None: 605 """Append agent summary to day index file.""" 606 try: 607 # Determine day from request or agent_id timestamp 608 day = request.get("day") 609 if not day: 610 from datetime import datetime 611 612 ts_seconds = int(agent_id) / 1000 613 day = datetime.fromtimestamp(ts_seconds).strftime("%Y%m%d") 614 615 start_ts = request.get("ts", 0) 616 617 # Read last few lines to find finish/error event for runtime 618 runtime_seconds = None 619 status = "completed" 620 try: 621 with open(completed_path, "r") as f: 622 lines = f.readlines() 623 for line in reversed(lines[-10:]): 624 line = line.strip() 625 if not line: 626 continue 627 try: 628 event = json.loads(line) 629 event_type = event.get("event") 630 if event_type == "finish": 631 end_ts = event.get("ts", 0) 632 if end_ts and start_ts: 633 runtime_seconds = round((end_ts - start_ts) / 1000.0, 1) 634 break 635 if event_type == "error": 636 status = "error" 637 end_ts = event.get("ts", 0) 638 if end_ts and start_ts: 639 runtime_seconds = round((end_ts - start_ts) / 1000.0, 1) 640 break 641 except json.JSONDecodeError: 642 continue 643 except Exception: 644 pass 645 646 summary = { 647 "agent_id": agent_id, 648 "name": request.get("name", "unified"), 649 "day": day, 650 "facet": request.get("facet"), 651 "ts": start_ts, 652 "status": status, 653 "runtime_seconds": runtime_seconds, 654 "provider": request.get("provider"), 655 "model": request.get("model"), 656 "schedule": request.get("schedule"), 657 } 658 659 day_index_path = self.agents_dir / f"{day}.jsonl" 660 with open(day_index_path, "a") as f: 661 f.write(json.dumps(summary) + "\n") 662 f.flush() 663 664 except Exception as e: 665 self.logger.error(f"Failed to append day index for {agent_id}: {e}") 666 667 def _write_error_and_complete(self, file_path: Path, error_message: str) -> None: 668 """Write an error event to the file and mark it as complete.""" 669 try: 670 agent_id = file_path.stem.replace("_active", "") 671 error_event = self._create_error_event(agent_id, error_message) 672 with open(file_path, "a") as f: 673 f.write(json.dumps(error_event) + "\n") 674 675 # Complete the file 676 self._complete_agent_file(agent_id, file_path) 677 except Exception as e: 678 self.logger.error(f"Failed to write error and complete: {e}") 679 680 def _write_output(self, agent_id: str, result: str, config: Dict[str, Any]) -> None: 681 """Write agent output to config["output_path"]. 682 683 The output path is set by the caller — either derived by 684 prepare_config in agents.py (day/segment agents) or computed 685 by dream.py via get_activity_output_path (activity agents). 686 Cortex does not derive paths itself. 687 """ 688 output_path_str = config.get("output_path") 689 if not output_path_str: 690 return 691 692 try: 693 output_path = Path(output_path_str) 694 output_path.parent.mkdir(parents=True, exist_ok=True) 695 696 with open(output_path, "w", encoding="utf-8") as f: 697 f.write(result) 698 699 self.logger.info(f"Wrote agent {agent_id} output to {output_path}") 700 701 except Exception as e: 702 self.logger.error(f"Failed to write agent {agent_id} output: {e}") 703 704 def stop(self) -> None: 705 """Stop the Cortex service.""" 706 self.stop_event.set() 707 708 # Close Callosum connection 709 if self.callosum: 710 self.callosum.stop() 711 712 # Stop all running agents 713 with self.lock: 714 for agent in self.running_agents.values(): 715 agent.stop() 716 717 def _emit_periodic_status(self) -> None: 718 """Emit status events every 5 seconds (runs in background thread).""" 719 while not self.stop_event.is_set(): 720 try: 721 with self.lock: 722 agents = [] 723 for agent_id, agent_proc in self.running_agents.items(): 724 config = self.agent_requests.get(agent_id, {}) 725 agents.append( 726 { 727 "agent_id": agent_id, 728 "name": config.get("name", "unknown"), 729 "provider": config.get("provider", "unknown"), 730 "elapsed_seconds": int( 731 time.time() - agent_proc.start_time 732 ), 733 } 734 ) 735 736 # Only emit status when there are active agents 737 if agents: 738 self.callosum.emit( 739 "cortex", 740 "status", 741 running_agents=len(agents), 742 agents=agents, 743 ) 744 except Exception as e: 745 self.logger.debug(f"Status emission failed: {e}") 746 747 time.sleep(5) 748 749 def get_status(self) -> Dict[str, Any]: 750 """Get service status information.""" 751 with self.lock: 752 return { 753 "running_agents": len(self.running_agents), 754 "agent_ids": list(self.running_agents.keys()), 755 } 756 757 758def main() -> None: 759 """CLI entry point for the Cortex service.""" 760 import argparse 761 762 from think.utils import setup_cli 763 764 parser = argparse.ArgumentParser(description="solstone Cortex Agent Manager") 765 args = setup_cli(parser) 766 767 # Set up logging 768 logging.basicConfig( 769 level=logging.INFO if not args.verbose else logging.DEBUG, 770 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 771 ) 772 773 # Start the service 774 cortex = CortexService() 775 776 try: 777 cortex.start() 778 except KeyboardInterrupt: 779 logging.getLogger(__name__).info("Shutting down Cortex service") 780 cortex.stop() 781 782 783if __name__ == "__main__": 784 main()