personal memory agent
at main 886 lines 26 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""General utilities for solstone. 5 6This module provides core utilities for journal access, date/segment handling, 7configuration loading, and CLI setup. Talent-related utilities (prompt loading, 8agent configs, etc.) have been moved to think/talent.py. 9""" 10 11from __future__ import annotations 12 13import argparse 14import copy 15import json 16import logging 17import os 18import re 19import sys 20import time 21from datetime import datetime 22from pathlib import Path 23from typing import Any, Optional 24 25from timefhuman import timefhuman 26 27from media import MIME_TYPES 28 29DATE_RE = re.compile(r"\d{8}") 30 31 32def now_ms() -> int: 33 """Return current time as Unix epoch milliseconds.""" 34 return int(time.time() * 1000) 35 36 37_rev_cache: str | None = "__unset__" 38 39 40def get_rev() -> str | None: 41 """Return short git commit hash, cached after first call. None if unavailable.""" 42 global _rev_cache 43 if _rev_cache != "__unset__": 44 return _rev_cache 45 try: 46 import subprocess 47 48 result = subprocess.run( 49 ["git", "rev-parse", "--short", "HEAD"], 50 capture_output=True, 51 text=True, 52 timeout=5, 53 ) 54 _rev_cache = result.stdout.strip() if result.returncode == 0 else None 55 except Exception: 56 _rev_cache = None 57 return _rev_cache 58 59 60def truncated_echo(text: str, max_bytes: int = 16384) -> None: 61 """Print text to stdout, truncating if it exceeds *max_bytes* UTF-8 bytes. 62 63 When the encoded output exceeds the limit it is cut at a clean UTF-8 64 character boundary and a warning is written to stderr reporting the 65 original size. Pass ``max_bytes=0`` to disable the limit. 66 """ 67 encoded = text.encode("utf-8") 68 if max_bytes > 0 and len(encoded) > max_bytes: 69 truncated = encoded[:max_bytes].decode("utf-8", errors="ignore") 70 sys.stdout.write(truncated) 71 sys.stdout.write("\n") 72 sys.stderr.write( 73 f"[truncated: {len(encoded):,} bytes total, --max {max_bytes:,}]\n" 74 ) 75 else: 76 sys.stdout.write(text) 77 sys.stdout.write("\n") 78 79 80def get_journal_info() -> tuple[str, str]: 81 """Return the journal path and its source. 82 83 Returns 84 ------- 85 tuple[str, str] 86 (path, source) where source is "override" when 87 _SOLSTONE_JOURNAL_OVERRIDE is set, otherwise "project". 88 """ 89 override = os.environ.get("_SOLSTONE_JOURNAL_OVERRIDE") 90 if override: 91 return override, "override" 92 93 project_root = Path(__file__).resolve().parent.parent 94 journal = str(project_root / "journal") 95 return journal, "project" 96 97 98def get_journal() -> str: 99 """Return the journal path: <project_root>/journal/ 100 101 The journal always lives at ./journal/ relative to the solstone 102 project root. Auto-creates the directory if it doesn't exist. 103 104 Trust this function — never bypass it, cache its result, or set 105 _SOLSTONE_JOURNAL_OVERRIDE from application code. The env var 106 exists for external use only (tests, Makefile sandboxes). See 107 ``talent/coding/reference/environment.md``. 108 """ 109 override = os.environ.get("_SOLSTONE_JOURNAL_OVERRIDE") 110 if override: 111 os.makedirs(override, exist_ok=True) 112 return override 113 114 project_root = Path(__file__).resolve().parent.parent 115 journal = str(project_root / "journal") 116 os.makedirs(journal, exist_ok=True) 117 return journal 118 119 120def day_path(day: Optional[str] = None) -> Path: 121 """Return absolute path for a day directory within the journal. 122 123 Parameters 124 ---------- 125 day : str, optional 126 Day in YYYYMMDD format. If None, uses today's date. 127 128 Returns 129 ------- 130 Path 131 Absolute path to the day directory. Directory is created if it doesn't exist. 132 133 Raises 134 ------ 135 ValueError 136 If day format is invalid. 137 """ 138 journal = get_journal() 139 140 # Handle "today" case 141 if day is None: 142 day = datetime.now().strftime("%Y%m%d") 143 elif not DATE_RE.fullmatch(day): 144 raise ValueError("day must be in YYYYMMDD format") 145 146 path = Path(journal) / day 147 path.mkdir(parents=True, exist_ok=True) 148 return path 149 150 151def day_dirs() -> dict[str, str]: 152 """Return mapping of YYYYMMDD day names to absolute paths. 153 154 Returns 155 ------- 156 dict[str, str] 157 Mapping of day folder names to their full paths. 158 Example: {"20250101": "/path/to/journal/20250101", ...} 159 """ 160 journal = get_journal() 161 162 days: dict[str, str] = {} 163 for name in os.listdir(journal): 164 if DATE_RE.fullmatch(name): 165 path = os.path.join(journal, name) 166 if os.path.isdir(path): 167 days[name] = path 168 return days 169 170 171def updated_days(exclude: set[str] | None = None) -> list[str]: 172 """Return journal days with pending stream data not yet processed daily. 173 174 A day is "updated" when it has a ``health/stream.updated`` marker that is 175 newer than its ``health/daily.updated`` marker (or daily.updated is missing). 176 Days without ``stream.updated`` are skipped entirely. 177 178 Parameters 179 ---------- 180 exclude : set of str, optional 181 Day strings (YYYYMMDD) to skip. 182 183 Returns 184 ------- 185 list of str 186 Sorted list of updated day strings. 187 """ 188 days = day_dirs() 189 updated: list[str] = [] 190 for name, path in days.items(): 191 if exclude and name in exclude: 192 continue 193 stream = os.path.join(path, "health", "stream.updated") 194 if not os.path.isfile(stream): 195 continue 196 daily = os.path.join(path, "health", "daily.updated") 197 if not os.path.isfile(daily): 198 updated.append(name) 199 continue 200 if os.path.getmtime(stream) > os.path.getmtime(daily): 201 updated.append(name) 202 updated.sort() 203 return updated 204 205 206def segment_path(day: str, segment: str, stream: str) -> Path: 207 """Return absolute path for a segment directory within a stream. 208 209 Parameters 210 ---------- 211 day : str 212 Day in YYYYMMDD format. 213 segment : str 214 Segment key in HHMMSS_LEN format. 215 stream : str 216 Stream name (e.g., "archon", "import.apple"). 217 218 Returns 219 ------- 220 Path 221 Absolute path to the segment directory (created if it doesn't exist). 222 """ 223 path = day_path(day) / stream / segment 224 path.mkdir(parents=True, exist_ok=True) 225 return path 226 227 228def day_from_path(path: str | Path) -> str | None: 229 """Extract the YYYYMMDD day from a journal path. 230 231 Walks up the path's parents and returns the first directory name 232 that matches the YYYYMMDD date format. 233 234 Parameters 235 ---------- 236 path : str or Path 237 Any path within the journal directory structure. 238 239 Returns 240 ------- 241 str or None 242 The YYYYMMDD day string, or None if no date directory is found. 243 """ 244 path = Path(path) 245 for parent in (path, *path.parents): 246 if DATE_RE.fullmatch(parent.name): 247 return parent.name 248 return None 249 250 251def iter_segments(day: str | Path) -> list[tuple[str, str, Path]]: 252 """Return all segments in a day, sorted chronologically. 253 254 Traverses the stream directory structure under a day directory and 255 returns segment information for all streams. 256 257 Parameters 258 ---------- 259 day : str or Path 260 Day in YYYYMMDD format (str) or path to day directory (Path). 261 262 Returns 263 ------- 264 list of (stream_name, segment_key, segment_path) tuples 265 Sorted by segment_key across all streams for chronological order. 266 """ 267 if isinstance(day, Path): 268 day_dir = day 269 else: 270 day_dir = day_path(day) 271 272 if not day_dir.exists(): 273 return [] 274 275 results = [] 276 for entry in day_dir.iterdir(): 277 if not entry.is_dir(): 278 continue 279 stream_name = entry.name 280 for seg_entry in entry.iterdir(): 281 if seg_entry.is_dir() and segment_key(seg_entry.name): 282 results.append((stream_name, seg_entry.name, seg_entry)) 283 284 results.sort(key=lambda x: x[1]) 285 return results 286 287 288def segment_key(name_or_path: str) -> str | None: 289 """Extract segment key (HHMMSS_LEN) from any path/filename. 290 291 Parameters 292 ---------- 293 name_or_path : str 294 Segment name, filename, or full path containing segment. 295 296 Returns 297 ------- 298 str or None 299 Segment key in HHMMSS_LEN format if valid, None otherwise. 300 301 Examples 302 -------- 303 >>> segment_key("143022_300") 304 "143022_300" 305 >>> segment_key("143022_300_summary.txt") 306 "143022_300" 307 >>> segment_key("/journal/20250109/143022_300/audio.jsonl") 308 "143022_300" 309 >>> segment_key("invalid") 310 None 311 """ 312 # Match HHMMSS_LEN format: 6 digits, underscore, 1+ digits 313 pattern = r"\b(\d{6})_(\d+)(?:_|\b)" 314 match = re.search(pattern, name_or_path) 315 if match: 316 time_part = match.group(1) 317 len_part = match.group(2) 318 return f"{time_part}_{len_part}" 319 return None 320 321 322def segment_parse( 323 name_or_path: str, 324) -> tuple[datetime.time, datetime.time] | tuple[None, None]: 325 """Parse segment to extract start and end times as datetime objects. 326 327 Parameters 328 ---------- 329 name_or_path : str 330 Segment name (e.g., "143022_300") or full path containing segment. 331 332 Returns 333 ------- 334 tuple of (datetime.time, datetime.time) or (None, None) 335 Tuple of (start_time, end_time) where: 336 - start_time: datetime.time for HHMMSS 337 - end_time: datetime.time computed from start + LEN seconds 338 Returns (None, None) if not a valid HHMMSS_LEN segment format. 339 340 Examples 341 -------- 342 >>> segment_parse("143022_300") # 14:30:22 + 300 seconds = 14:35:22 343 (datetime.time(14, 30, 22), datetime.time(14, 35, 22)) 344 >>> segment_parse("/journal/20250109/143022_300/audio.jsonl") 345 (datetime.time(14, 30, 22), datetime.time(14, 35, 22)) 346 >>> segment_parse("invalid") 347 (None, None) 348 """ 349 from datetime import time, timedelta 350 351 # Extract just the segment name if it's a path 352 if "/" in name_or_path or "\\" in name_or_path: 353 path_parts = Path(name_or_path).parts 354 # Look for segment key in path parts after a YYYYMMDD day directory. 355 # Layout is YYYYMMDD/stream/HHMMSS_LEN/... 356 name = None 357 for i, part in enumerate(path_parts): 358 if part.isdigit() and len(part) == 8: 359 # Scan subsequent parts for a segment key 360 for j in range(i + 1, len(path_parts)): 361 if segment_key(path_parts[j]): 362 name = path_parts[j] 363 break 364 if name: 365 break 366 if name is None: 367 return (None, None) 368 else: 369 name = name_or_path 370 371 # Validate and extract HHMMSS_LEN from segment name 372 if "_" not in name: 373 return (None, None) 374 375 parts = name.split("_", 1) # Split on first underscore only 376 if ( 377 len(parts) != 2 378 or not parts[0].isdigit() 379 or len(parts[0]) != 6 380 or not parts[1].isdigit() 381 ): 382 return (None, None) 383 384 time_str = parts[0] 385 length_str = parts[1] 386 387 # Parse HHMMSS to datetime.time 388 try: 389 hour = int(time_str[0:2]) 390 minute = int(time_str[2:4]) 391 second = int(time_str[4:6]) 392 393 # Validate ranges 394 if not (0 <= hour <= 23 and 0 <= minute <= 59 and 0 <= second <= 59): 395 return (None, None) 396 397 start_time = time(hour, minute, second) 398 except (ValueError, IndexError): 399 return (None, None) 400 401 # Parse LEN and compute end time 402 try: 403 length_seconds = int(length_str) 404 # Compute end time by adding duration 405 start_dt = datetime.combine(datetime.today(), start_time) 406 end_dt = start_dt + timedelta(seconds=length_seconds) 407 end_time = end_dt.time() 408 return (start_time, end_time) 409 except ValueError: 410 return (None, None) 411 412 413def format_day(day: str) -> str: 414 """Format a day string (YYYYMMDD) as a human-readable date. 415 416 Parameters 417 ---------- 418 day: 419 Day in YYYYMMDD format. 420 421 Returns 422 ------- 423 str 424 Formatted date like "Friday, January 24, 2026". 425 Returns the original string if parsing fails. 426 427 Examples 428 -------- 429 >>> format_day("20260124") 430 "Friday, January 24, 2026" 431 """ 432 try: 433 dt = datetime.strptime(day, "%Y%m%d") 434 return dt.strftime("%A, %B %d, %Y") 435 except ValueError: 436 return day 437 438 439def iso_date(day: str) -> str: 440 """Convert a day string (YYYYMMDD) to ISO format (YYYY-MM-DD). 441 442 Parameters 443 ---------- 444 day: 445 Day in YYYYMMDD format. 446 447 Returns 448 ------- 449 str 450 ISO formatted date like "2026-01-24". 451 """ 452 return f"{day[:4]}-{day[4:6]}-{day[6:8]}" 453 454 455def format_segment_times(segment: str) -> tuple[str, str] | tuple[None, None]: 456 """Format segment start and end times as human-readable strings. 457 458 Parameters 459 ---------- 460 segment: 461 Segment key in HHMMSS_LEN format (e.g., "143022_300"). 462 463 Returns 464 ------- 465 tuple[str, str] | tuple[None, None] 466 Tuple of (start_time, end_time) as formatted strings like "2:30 PM". 467 Returns (None, None) if segment format is invalid. 468 469 Examples 470 -------- 471 >>> format_segment_times("143022_300") 472 ("2:30 PM", "2:35 PM") 473 >>> format_segment_times("090000_3600") 474 ("9:00 AM", "10:00 AM") 475 """ 476 start_time, end_time = segment_parse(segment) 477 if start_time is None or end_time is None: 478 return (None, None) 479 480 return (_format_time(start_time), _format_time(end_time)) 481 482 483def _format_time(t: datetime.time) -> str: 484 """Format a time as 12-hour with AM/PM, no leading zero on hour. 485 486 Uses lstrip('0') for cross-platform compatibility (%-I is Unix-only). 487 """ 488 return datetime.combine(datetime.today(), t).strftime("%I:%M %p").lstrip("0") 489 490 491def _load_default_config() -> dict[str, Any]: 492 """Load the default journal configuration from journal_default.json. 493 494 Returns 495 ------- 496 dict 497 Default configuration structure. 498 """ 499 default_path = Path(__file__).parent / "journal_default.json" 500 with open(default_path, "r", encoding="utf-8") as f: 501 return json.load(f) 502 503 504# Cached default config (loaded once at first use) 505_default_config: dict[str, Any] | None = None 506 507 508def get_config() -> dict[str, Any]: 509 """Return the journal configuration from config/journal.json. 510 511 When no journal.json exists, returns a deep copy of the defaults from 512 think/journal_default.json. Once journal.json exists it is the master 513 and is returned as-is with no merging of defaults. 514 515 Returns 516 ------- 517 dict 518 Journal configuration. 519 """ 520 global _default_config 521 if _default_config is None: 522 _default_config = _load_default_config() 523 524 journal = get_journal() 525 config_path = Path(journal) / "config" / "journal.json" 526 527 # Return defaults when no config file exists yet 528 if not config_path.exists(): 529 return copy.deepcopy(_default_config) 530 531 try: 532 with open(config_path, "r", encoding="utf-8") as f: 533 return json.load(f) 534 except (json.JSONDecodeError, OSError) as exc: 535 # Log error but return defaults to avoid breaking callers 536 logging.getLogger(__name__).warning( 537 "Failed to load config from %s: %s", config_path, exc 538 ) 539 return copy.deepcopy(_default_config) 540 541 542def _append_task_log(dir_path: str | Path, message: str) -> None: 543 """Append ``message`` to ``task_log.txt`` inside ``dir_path``.""" 544 path = Path(dir_path) / "task_log.txt" 545 try: 546 path.parent.mkdir(parents=True, exist_ok=True) 547 with open(path, "a", encoding="utf-8") as f: 548 f.write(f"{int(time.time())}\t{message}\n") 549 except Exception: 550 pass 551 552 553def day_log(day: str, message: str) -> None: 554 """Convenience wrapper to log message for ``day``.""" 555 _append_task_log(str(day_path(day)), message) 556 557 558def journal_log(message: str) -> None: 559 """Append ``message`` to the journal's ``task_log.txt``.""" 560 _append_task_log(get_journal(), message) 561 562 563def day_input_summary(day: str) -> str: 564 """Return a human-readable summary of recording data available for a day. 565 566 Uses cluster_segments() to detect recording segments and computes 567 total duration from segment keys (HHMMSS_LEN format). 568 569 Parameters 570 ---------- 571 day: 572 Day in YYYYMMDD format. 573 574 Returns 575 ------- 576 str 577 Human-readable summary like "No recordings", "Light activity: 2 segments, 578 ~3 minutes", or "18 segments, ~7.5 hours". 579 """ 580 from think.cluster import cluster_segments 581 582 segments = cluster_segments(day) 583 584 if not segments: 585 return "No recordings" 586 587 # Compute total duration from segment keys (HHMMSS_LEN format) 588 total_seconds = 0 589 for seg in segments: 590 key = seg.get("key", "") 591 if "_" in key: 592 parts = key.split("_") 593 if len(parts) >= 2 and parts[1].isdigit(): 594 total_seconds += int(parts[1]) 595 596 # Format duration 597 if total_seconds < 60: 598 duration_str = f"~{total_seconds} seconds" 599 elif total_seconds < 3600: 600 minutes = total_seconds / 60 601 duration_str = f"~{minutes:.0f} minutes" 602 else: 603 hours = total_seconds / 3600 604 duration_str = f"~{hours:.1f} hours" 605 606 segment_count = len(segments) 607 608 # Categorize activity level 609 if segment_count < 5 or total_seconds < 1800: # < 5 segments or < 30 min 610 return f"Light activity: {segment_count} segment{'s' if segment_count != 1 else ''}, {duration_str}" 611 else: 612 return f"{segment_count} segments, {duration_str}" 613 614 615def setup_cli(parser: argparse.ArgumentParser, *, parse_known: bool = False): 616 """Parse command line arguments and configure logging. 617 618 The parser will be extended with ``-v``/``--verbose`` and ``-d``/``--debug`` flags. 619 The journal path is resolved via get_journal() which auto-creates a default path 620 if needed. Environment variables from the journal config's ``env`` section 621 (in ``journal.json``) are loaded as fallbacks for any keys not already set. 622 The parsed arguments are returned. If ``parse_known`` is ``True`` a tuple of 623 ``(args, extra)`` is returned using :func:`argparse.ArgumentParser.parse_known_args`. 624 """ 625 parser.add_argument( 626 "-v", "--verbose", action="store_true", help="Enable verbose output" 627 ) 628 parser.add_argument( 629 "-d", "--debug", action="store_true", help="Enable debug logging" 630 ) 631 if parse_known: 632 args, extra = parser.parse_known_args() 633 else: 634 args = parser.parse_args() 635 extra = None 636 637 if args.debug: 638 log_level = logging.DEBUG 639 elif args.verbose: 640 log_level = logging.INFO 641 else: 642 log_level = logging.WARNING 643 644 logging.basicConfig(level=log_level) 645 646 # Initialize journal path (auto-creates if needed) 647 get_journal() 648 649 # Load config env from journal.json — strict source for API keys 650 config = get_config() 651 for key, value in config.get("env", {}).items(): 652 os.environ[key] = str(value) 653 654 return (args, extra) if parse_known else args 655 656 657def parse_time_range(text: str) -> Optional[tuple[str, str, str]]: 658 """Return ``(day, start, end)`` from a natural language time range. 659 660 Parameters 661 ---------- 662 text: 663 Natural language description of a time range. 664 665 Returns 666 ------- 667 tuple[str, str, str] | None 668 ``(day, start, end)`` if a single range within one day was detected. 669 ``day`` is ``YYYYMMDD`` and ``start``/``end`` are ``HHMMSS``. ``None`` 670 if parsing fails. 671 """ 672 673 try: 674 result = timefhuman(text) 675 except Exception as exc: # pragma: no cover - unexpected library failure 676 logging.info("timefhuman failed for %s: %s", text, exc) 677 return None 678 679 logging.debug("timefhuman(%s) -> %r", text, result) 680 681 if len(result) != 1: 682 logging.info("timefhuman did not return a single expression for %s", text) 683 return None 684 685 range_item = result[0] 686 if not isinstance(range_item, tuple) or len(range_item) != 2: 687 logging.info("Expected a range from %s but got %r", text, range_item) 688 return None 689 690 start_dt, end_dt = range_item 691 if start_dt.date() != end_dt.date(): 692 logging.info("Range must be within a single day: %s -> %s", start_dt, end_dt) 693 return None 694 695 day = start_dt.strftime("%Y%m%d") 696 start = start_dt.strftime("%H%M%S") 697 end = end_dt.strftime("%H%M%S") 698 return day, start, end 699 700 701def get_raw_file(day: str, name: str) -> tuple[str, str, Any]: 702 """Return raw file path, mime type and metadata for a transcript. 703 704 Parameters 705 ---------- 706 day: 707 Day folder in ``YYYYMMDD`` format. 708 name: 709 Transcript filename such as ``HHMMSS/audio.jsonl``, 710 ``HHMMSS/monitor_1_diff.json``, or ``HHMMSS/screen.jsonl``. 711 712 Returns 713 ------- 714 tuple[str, str, Any] 715 ``(path, mime_type, metadata)`` where ``path`` is relative to the day 716 directory (read from metadata header), ``mime_type`` is determined 717 from the raw file extension, and ``metadata`` contains the parsed 718 JSON data (empty on failure). 719 """ 720 721 day_dir = day_path(day) 722 transcript_path = day_dir / name 723 724 rel = None 725 meta: Any = {} 726 727 try: 728 with open(transcript_path, "r", encoding="utf-8") as f: 729 if name.endswith(".jsonl"): 730 # First line is metadata header with "raw" field 731 first_line = f.readline().strip() 732 if first_line: 733 header = json.loads(first_line) 734 rel = header.get("raw") 735 736 # Read remaining lines as metadata 737 meta = [json.loads(line) for line in f if line.strip()] 738 else: 739 # Non-JSONL format (e.g., _diff.json) 740 meta = json.load(f) 741 rel = meta.get("raw") 742 except Exception: # pragma: no cover - optional metadata 743 logging.debug("Failed to read %s", transcript_path) 744 745 if not rel: 746 raise ValueError(f"No 'raw' field found in metadata for {name}") 747 748 suffix = Path(rel).suffix.lower() 749 mime = {**MIME_TYPES, ".png": "image/png"}.get(suffix, "application/octet-stream") 750 751 return rel, mime, meta 752 753 754# ============================================================================= 755# SOL_* Environment Variable Helpers 756# ============================================================================= 757 758 759def get_sol_day() -> str | None: 760 """Read SOL_DAY from the environment.""" 761 return os.environ.get("SOL_DAY") or None 762 763 764def get_sol_facet() -> str | None: 765 """Read SOL_FACET from the environment.""" 766 return os.environ.get("SOL_FACET") or None 767 768 769def get_sol_segment() -> str | None: 770 """Read SOL_SEGMENT from the environment.""" 771 return os.environ.get("SOL_SEGMENT") or None 772 773 774def get_sol_stream() -> str | None: 775 """Read SOL_STREAM from the environment.""" 776 return os.environ.get("SOL_STREAM") or None 777 778 779def get_sol_activity() -> str | None: 780 """Read SOL_ACTIVITY from the environment.""" 781 return os.environ.get("SOL_ACTIVITY") or None 782 783 784def resolve_sol_day(arg: str | None) -> str: 785 """Return *arg* if provided, else SOL_DAY from env, else exit with error. 786 787 Intended for CLI commands where ``day`` is required but can be supplied 788 via the SOL_DAY environment variable as a convenience. 789 """ 790 if arg: 791 return arg 792 env = get_sol_day() 793 if env: 794 return env 795 import typer 796 797 typer.echo("Error: day is required (pass as argument or set SOL_DAY).", err=True) 798 raise typer.Exit(1) 799 800 801def resolve_sol_facet(arg: str | None) -> str: 802 """Return *arg* if provided, else SOL_FACET from env, else exit with error. 803 804 Intended for CLI commands where ``facet`` is required but can be supplied 805 via the SOL_FACET environment variable as a convenience. 806 """ 807 if arg: 808 return arg 809 env = get_sol_facet() 810 if env: 811 return env 812 import typer 813 814 typer.echo( 815 "Error: facet is required (pass as argument or set SOL_FACET).", err=True 816 ) 817 raise typer.Exit(1) 818 819 820def resolve_sol_segment(arg: str | None) -> str | None: 821 """Return *arg* if provided, else SOL_SEGMENT from env, else None. 822 823 Unlike :func:`resolve_sol_day` this does **not** error when missing 824 because segment is typically optional. 825 """ 826 if arg: 827 return arg 828 return get_sol_segment() 829 830 831# ============================================================================= 832# Service Port Discovery 833# ============================================================================= 834 835 836def find_available_port(host: str = "127.0.0.1") -> int: 837 """Find an available port by binding to port 0. 838 839 Uses the socket bind/getsockname/close pattern to let the OS assign 840 an available port. 841 842 Args: 843 host: Host address to bind to (default: 127.0.0.1) 844 845 Returns: 846 Available port number 847 """ 848 import socket 849 850 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 851 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 852 sock.bind((host, 0)) 853 _, port = sock.getsockname() 854 sock.close() 855 return port 856 857 858def write_service_port(service: str, port: int) -> None: 859 """Write a service's port to the health directory. 860 861 Creates journal/health/{service}.port with the port number. 862 863 Args: 864 service: Service name (e.g., "convey", "cortex") 865 port: Port number to write 866 """ 867 health_dir = Path(get_journal()) / "health" 868 health_dir.mkdir(parents=True, exist_ok=True) 869 port_file = health_dir / f"{service}.port" 870 port_file.write_text(str(port)) 871 872 873def read_service_port(service: str) -> int | None: 874 """Read a service's port from the health directory. 875 876 Args: 877 service: Service name (e.g., "convey", "cortex") 878 879 Returns: 880 Port number if file exists and is valid, None otherwise 881 """ 882 port_file = Path(get_journal()) / "health" / f"{service}.port" 883 try: 884 return int(port_file.read_text().strip()) 885 except (FileNotFoundError, ValueError): 886 return None