personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""General utilities for solstone.
5
6This module provides core utilities for journal access, date/segment handling,
7configuration loading, and CLI setup. Talent-related utilities (prompt loading,
8agent configs, etc.) have been moved to think/talent.py.
9"""
10
11from __future__ import annotations
12
13import argparse
14import copy
15import json
16import logging
17import os
18import re
19import sys
20import time
21from datetime import datetime
22from pathlib import Path
23from typing import Any, Optional
24
25from timefhuman import timefhuman
26
27from media import MIME_TYPES
28
29DATE_RE = re.compile(r"\d{8}")
30
31
32def now_ms() -> int:
33 """Return current time as Unix epoch milliseconds."""
34 return int(time.time() * 1000)
35
36
37_rev_cache: str | None = "__unset__"
38
39
40def get_rev() -> str | None:
41 """Return short git commit hash, cached after first call. None if unavailable."""
42 global _rev_cache
43 if _rev_cache != "__unset__":
44 return _rev_cache
45 try:
46 import subprocess
47
48 result = subprocess.run(
49 ["git", "rev-parse", "--short", "HEAD"],
50 capture_output=True,
51 text=True,
52 timeout=5,
53 )
54 _rev_cache = result.stdout.strip() if result.returncode == 0 else None
55 except Exception:
56 _rev_cache = None
57 return _rev_cache
58
59
60def truncated_echo(text: str, max_bytes: int = 16384) -> None:
61 """Print text to stdout, truncating if it exceeds *max_bytes* UTF-8 bytes.
62
63 When the encoded output exceeds the limit it is cut at a clean UTF-8
64 character boundary and a warning is written to stderr reporting the
65 original size. Pass ``max_bytes=0`` to disable the limit.
66 """
67 encoded = text.encode("utf-8")
68 if max_bytes > 0 and len(encoded) > max_bytes:
69 truncated = encoded[:max_bytes].decode("utf-8", errors="ignore")
70 sys.stdout.write(truncated)
71 sys.stdout.write("\n")
72 sys.stderr.write(
73 f"[truncated: {len(encoded):,} bytes total, --max {max_bytes:,}]\n"
74 )
75 else:
76 sys.stdout.write(text)
77 sys.stdout.write("\n")
78
79
80def get_journal_info() -> tuple[str, str]:
81 """Return the journal path and its source.
82
83 Returns
84 -------
85 tuple[str, str]
86 (path, source) where source is "override" when
87 _SOLSTONE_JOURNAL_OVERRIDE is set, otherwise "project".
88 """
89 override = os.environ.get("_SOLSTONE_JOURNAL_OVERRIDE")
90 if override:
91 return override, "override"
92
93 project_root = Path(__file__).resolve().parent.parent
94 journal = str(project_root / "journal")
95 return journal, "project"
96
97
98def get_journal() -> str:
99 """Return the journal path: <project_root>/journal/
100
101 The journal always lives at ./journal/ relative to the solstone
102 project root. Auto-creates the directory if it doesn't exist.
103
104 Trust this function — never bypass it, cache its result, or set
105 _SOLSTONE_JOURNAL_OVERRIDE from application code. The env var
106 exists for external use only (tests, Makefile sandboxes). See
107 ``talent/coding/reference/environment.md``.
108 """
109 override = os.environ.get("_SOLSTONE_JOURNAL_OVERRIDE")
110 if override:
111 os.makedirs(override, exist_ok=True)
112 return override
113
114 project_root = Path(__file__).resolve().parent.parent
115 journal = str(project_root / "journal")
116 os.makedirs(journal, exist_ok=True)
117 return journal
118
119
120def day_path(day: Optional[str] = None) -> Path:
121 """Return absolute path for a day directory within the journal.
122
123 Parameters
124 ----------
125 day : str, optional
126 Day in YYYYMMDD format. If None, uses today's date.
127
128 Returns
129 -------
130 Path
131 Absolute path to the day directory. Directory is created if it doesn't exist.
132
133 Raises
134 ------
135 ValueError
136 If day format is invalid.
137 """
138 journal = get_journal()
139
140 # Handle "today" case
141 if day is None:
142 day = datetime.now().strftime("%Y%m%d")
143 elif not DATE_RE.fullmatch(day):
144 raise ValueError("day must be in YYYYMMDD format")
145
146 path = Path(journal) / day
147 path.mkdir(parents=True, exist_ok=True)
148 return path
149
150
151def day_dirs() -> dict[str, str]:
152 """Return mapping of YYYYMMDD day names to absolute paths.
153
154 Returns
155 -------
156 dict[str, str]
157 Mapping of day folder names to their full paths.
158 Example: {"20250101": "/path/to/journal/20250101", ...}
159 """
160 journal = get_journal()
161
162 days: dict[str, str] = {}
163 for name in os.listdir(journal):
164 if DATE_RE.fullmatch(name):
165 path = os.path.join(journal, name)
166 if os.path.isdir(path):
167 days[name] = path
168 return days
169
170
171def updated_days(exclude: set[str] | None = None) -> list[str]:
172 """Return journal days with pending stream data not yet processed daily.
173
174 A day is "updated" when it has a ``health/stream.updated`` marker that is
175 newer than its ``health/daily.updated`` marker (or daily.updated is missing).
176 Days without ``stream.updated`` are skipped entirely.
177
178 Parameters
179 ----------
180 exclude : set of str, optional
181 Day strings (YYYYMMDD) to skip.
182
183 Returns
184 -------
185 list of str
186 Sorted list of updated day strings.
187 """
188 days = day_dirs()
189 updated: list[str] = []
190 for name, path in days.items():
191 if exclude and name in exclude:
192 continue
193 stream = os.path.join(path, "health", "stream.updated")
194 if not os.path.isfile(stream):
195 continue
196 daily = os.path.join(path, "health", "daily.updated")
197 if not os.path.isfile(daily):
198 updated.append(name)
199 continue
200 if os.path.getmtime(stream) > os.path.getmtime(daily):
201 updated.append(name)
202 updated.sort()
203 return updated
204
205
206def segment_path(day: str, segment: str, stream: str) -> Path:
207 """Return absolute path for a segment directory within a stream.
208
209 Parameters
210 ----------
211 day : str
212 Day in YYYYMMDD format.
213 segment : str
214 Segment key in HHMMSS_LEN format.
215 stream : str
216 Stream name (e.g., "archon", "import.apple").
217
218 Returns
219 -------
220 Path
221 Absolute path to the segment directory (created if it doesn't exist).
222 """
223 path = day_path(day) / stream / segment
224 path.mkdir(parents=True, exist_ok=True)
225 return path
226
227
228def day_from_path(path: str | Path) -> str | None:
229 """Extract the YYYYMMDD day from a journal path.
230
231 Walks up the path's parents and returns the first directory name
232 that matches the YYYYMMDD date format.
233
234 Parameters
235 ----------
236 path : str or Path
237 Any path within the journal directory structure.
238
239 Returns
240 -------
241 str or None
242 The YYYYMMDD day string, or None if no date directory is found.
243 """
244 path = Path(path)
245 for parent in (path, *path.parents):
246 if DATE_RE.fullmatch(parent.name):
247 return parent.name
248 return None
249
250
251def iter_segments(day: str | Path) -> list[tuple[str, str, Path]]:
252 """Return all segments in a day, sorted chronologically.
253
254 Traverses the stream directory structure under a day directory and
255 returns segment information for all streams.
256
257 Parameters
258 ----------
259 day : str or Path
260 Day in YYYYMMDD format (str) or path to day directory (Path).
261
262 Returns
263 -------
264 list of (stream_name, segment_key, segment_path) tuples
265 Sorted by segment_key across all streams for chronological order.
266 """
267 if isinstance(day, Path):
268 day_dir = day
269 else:
270 day_dir = day_path(day)
271
272 if not day_dir.exists():
273 return []
274
275 results = []
276 for entry in day_dir.iterdir():
277 if not entry.is_dir():
278 continue
279 stream_name = entry.name
280 for seg_entry in entry.iterdir():
281 if seg_entry.is_dir() and segment_key(seg_entry.name):
282 results.append((stream_name, seg_entry.name, seg_entry))
283
284 results.sort(key=lambda x: x[1])
285 return results
286
287
288def segment_key(name_or_path: str) -> str | None:
289 """Extract segment key (HHMMSS_LEN) from any path/filename.
290
291 Parameters
292 ----------
293 name_or_path : str
294 Segment name, filename, or full path containing segment.
295
296 Returns
297 -------
298 str or None
299 Segment key in HHMMSS_LEN format if valid, None otherwise.
300
301 Examples
302 --------
303 >>> segment_key("143022_300")
304 "143022_300"
305 >>> segment_key("143022_300_summary.txt")
306 "143022_300"
307 >>> segment_key("/journal/20250109/143022_300/audio.jsonl")
308 "143022_300"
309 >>> segment_key("invalid")
310 None
311 """
312 # Match HHMMSS_LEN format: 6 digits, underscore, 1+ digits
313 pattern = r"\b(\d{6})_(\d+)(?:_|\b)"
314 match = re.search(pattern, name_or_path)
315 if match:
316 time_part = match.group(1)
317 len_part = match.group(2)
318 return f"{time_part}_{len_part}"
319 return None
320
321
322def segment_parse(
323 name_or_path: str,
324) -> tuple[datetime.time, datetime.time] | tuple[None, None]:
325 """Parse segment to extract start and end times as datetime objects.
326
327 Parameters
328 ----------
329 name_or_path : str
330 Segment name (e.g., "143022_300") or full path containing segment.
331
332 Returns
333 -------
334 tuple of (datetime.time, datetime.time) or (None, None)
335 Tuple of (start_time, end_time) where:
336 - start_time: datetime.time for HHMMSS
337 - end_time: datetime.time computed from start + LEN seconds
338 Returns (None, None) if not a valid HHMMSS_LEN segment format.
339
340 Examples
341 --------
342 >>> segment_parse("143022_300") # 14:30:22 + 300 seconds = 14:35:22
343 (datetime.time(14, 30, 22), datetime.time(14, 35, 22))
344 >>> segment_parse("/journal/20250109/143022_300/audio.jsonl")
345 (datetime.time(14, 30, 22), datetime.time(14, 35, 22))
346 >>> segment_parse("invalid")
347 (None, None)
348 """
349 from datetime import time, timedelta
350
351 # Extract just the segment name if it's a path
352 if "/" in name_or_path or "\\" in name_or_path:
353 path_parts = Path(name_or_path).parts
354 # Look for segment key in path parts after a YYYYMMDD day directory.
355 # Layout is YYYYMMDD/stream/HHMMSS_LEN/...
356 name = None
357 for i, part in enumerate(path_parts):
358 if part.isdigit() and len(part) == 8:
359 # Scan subsequent parts for a segment key
360 for j in range(i + 1, len(path_parts)):
361 if segment_key(path_parts[j]):
362 name = path_parts[j]
363 break
364 if name:
365 break
366 if name is None:
367 return (None, None)
368 else:
369 name = name_or_path
370
371 # Validate and extract HHMMSS_LEN from segment name
372 if "_" not in name:
373 return (None, None)
374
375 parts = name.split("_", 1) # Split on first underscore only
376 if (
377 len(parts) != 2
378 or not parts[0].isdigit()
379 or len(parts[0]) != 6
380 or not parts[1].isdigit()
381 ):
382 return (None, None)
383
384 time_str = parts[0]
385 length_str = parts[1]
386
387 # Parse HHMMSS to datetime.time
388 try:
389 hour = int(time_str[0:2])
390 minute = int(time_str[2:4])
391 second = int(time_str[4:6])
392
393 # Validate ranges
394 if not (0 <= hour <= 23 and 0 <= minute <= 59 and 0 <= second <= 59):
395 return (None, None)
396
397 start_time = time(hour, minute, second)
398 except (ValueError, IndexError):
399 return (None, None)
400
401 # Parse LEN and compute end time
402 try:
403 length_seconds = int(length_str)
404 # Compute end time by adding duration
405 start_dt = datetime.combine(datetime.today(), start_time)
406 end_dt = start_dt + timedelta(seconds=length_seconds)
407 end_time = end_dt.time()
408 return (start_time, end_time)
409 except ValueError:
410 return (None, None)
411
412
413def format_day(day: str) -> str:
414 """Format a day string (YYYYMMDD) as a human-readable date.
415
416 Parameters
417 ----------
418 day:
419 Day in YYYYMMDD format.
420
421 Returns
422 -------
423 str
424 Formatted date like "Friday, January 24, 2026".
425 Returns the original string if parsing fails.
426
427 Examples
428 --------
429 >>> format_day("20260124")
430 "Friday, January 24, 2026"
431 """
432 try:
433 dt = datetime.strptime(day, "%Y%m%d")
434 return dt.strftime("%A, %B %d, %Y")
435 except ValueError:
436 return day
437
438
439def iso_date(day: str) -> str:
440 """Convert a day string (YYYYMMDD) to ISO format (YYYY-MM-DD).
441
442 Parameters
443 ----------
444 day:
445 Day in YYYYMMDD format.
446
447 Returns
448 -------
449 str
450 ISO formatted date like "2026-01-24".
451 """
452 return f"{day[:4]}-{day[4:6]}-{day[6:8]}"
453
454
455def format_segment_times(segment: str) -> tuple[str, str] | tuple[None, None]:
456 """Format segment start and end times as human-readable strings.
457
458 Parameters
459 ----------
460 segment:
461 Segment key in HHMMSS_LEN format (e.g., "143022_300").
462
463 Returns
464 -------
465 tuple[str, str] | tuple[None, None]
466 Tuple of (start_time, end_time) as formatted strings like "2:30 PM".
467 Returns (None, None) if segment format is invalid.
468
469 Examples
470 --------
471 >>> format_segment_times("143022_300")
472 ("2:30 PM", "2:35 PM")
473 >>> format_segment_times("090000_3600")
474 ("9:00 AM", "10:00 AM")
475 """
476 start_time, end_time = segment_parse(segment)
477 if start_time is None or end_time is None:
478 return (None, None)
479
480 return (_format_time(start_time), _format_time(end_time))
481
482
483def _format_time(t: datetime.time) -> str:
484 """Format a time as 12-hour with AM/PM, no leading zero on hour.
485
486 Uses lstrip('0') for cross-platform compatibility (%-I is Unix-only).
487 """
488 return datetime.combine(datetime.today(), t).strftime("%I:%M %p").lstrip("0")
489
490
491def _load_default_config() -> dict[str, Any]:
492 """Load the default journal configuration from journal_default.json.
493
494 Returns
495 -------
496 dict
497 Default configuration structure.
498 """
499 default_path = Path(__file__).parent / "journal_default.json"
500 with open(default_path, "r", encoding="utf-8") as f:
501 return json.load(f)
502
503
504# Cached default config (loaded once at first use)
505_default_config: dict[str, Any] | None = None
506
507
508def get_config() -> dict[str, Any]:
509 """Return the journal configuration from config/journal.json.
510
511 When no journal.json exists, returns a deep copy of the defaults from
512 think/journal_default.json. Once journal.json exists it is the master
513 and is returned as-is with no merging of defaults.
514
515 Returns
516 -------
517 dict
518 Journal configuration.
519 """
520 global _default_config
521 if _default_config is None:
522 _default_config = _load_default_config()
523
524 journal = get_journal()
525 config_path = Path(journal) / "config" / "journal.json"
526
527 # Return defaults when no config file exists yet
528 if not config_path.exists():
529 return copy.deepcopy(_default_config)
530
531 try:
532 with open(config_path, "r", encoding="utf-8") as f:
533 return json.load(f)
534 except (json.JSONDecodeError, OSError) as exc:
535 # Log error but return defaults to avoid breaking callers
536 logging.getLogger(__name__).warning(
537 "Failed to load config from %s: %s", config_path, exc
538 )
539 return copy.deepcopy(_default_config)
540
541
542def _append_task_log(dir_path: str | Path, message: str) -> None:
543 """Append ``message`` to ``task_log.txt`` inside ``dir_path``."""
544 path = Path(dir_path) / "task_log.txt"
545 try:
546 path.parent.mkdir(parents=True, exist_ok=True)
547 with open(path, "a", encoding="utf-8") as f:
548 f.write(f"{int(time.time())}\t{message}\n")
549 except Exception:
550 pass
551
552
553def day_log(day: str, message: str) -> None:
554 """Convenience wrapper to log message for ``day``."""
555 _append_task_log(str(day_path(day)), message)
556
557
558def journal_log(message: str) -> None:
559 """Append ``message`` to the journal's ``task_log.txt``."""
560 _append_task_log(get_journal(), message)
561
562
563def day_input_summary(day: str) -> str:
564 """Return a human-readable summary of recording data available for a day.
565
566 Uses cluster_segments() to detect recording segments and computes
567 total duration from segment keys (HHMMSS_LEN format).
568
569 Parameters
570 ----------
571 day:
572 Day in YYYYMMDD format.
573
574 Returns
575 -------
576 str
577 Human-readable summary like "No recordings", "Light activity: 2 segments,
578 ~3 minutes", or "18 segments, ~7.5 hours".
579 """
580 from think.cluster import cluster_segments
581
582 segments = cluster_segments(day)
583
584 if not segments:
585 return "No recordings"
586
587 # Compute total duration from segment keys (HHMMSS_LEN format)
588 total_seconds = 0
589 for seg in segments:
590 key = seg.get("key", "")
591 if "_" in key:
592 parts = key.split("_")
593 if len(parts) >= 2 and parts[1].isdigit():
594 total_seconds += int(parts[1])
595
596 # Format duration
597 if total_seconds < 60:
598 duration_str = f"~{total_seconds} seconds"
599 elif total_seconds < 3600:
600 minutes = total_seconds / 60
601 duration_str = f"~{minutes:.0f} minutes"
602 else:
603 hours = total_seconds / 3600
604 duration_str = f"~{hours:.1f} hours"
605
606 segment_count = len(segments)
607
608 # Categorize activity level
609 if segment_count < 5 or total_seconds < 1800: # < 5 segments or < 30 min
610 return f"Light activity: {segment_count} segment{'s' if segment_count != 1 else ''}, {duration_str}"
611 else:
612 return f"{segment_count} segments, {duration_str}"
613
614
615def setup_cli(parser: argparse.ArgumentParser, *, parse_known: bool = False):
616 """Parse command line arguments and configure logging.
617
618 The parser will be extended with ``-v``/``--verbose`` and ``-d``/``--debug`` flags.
619 The journal path is resolved via get_journal() which auto-creates a default path
620 if needed. Environment variables from the journal config's ``env`` section
621 (in ``journal.json``) are loaded as fallbacks for any keys not already set.
622 The parsed arguments are returned. If ``parse_known`` is ``True`` a tuple of
623 ``(args, extra)`` is returned using :func:`argparse.ArgumentParser.parse_known_args`.
624 """
625 parser.add_argument(
626 "-v", "--verbose", action="store_true", help="Enable verbose output"
627 )
628 parser.add_argument(
629 "-d", "--debug", action="store_true", help="Enable debug logging"
630 )
631 if parse_known:
632 args, extra = parser.parse_known_args()
633 else:
634 args = parser.parse_args()
635 extra = None
636
637 if args.debug:
638 log_level = logging.DEBUG
639 elif args.verbose:
640 log_level = logging.INFO
641 else:
642 log_level = logging.WARNING
643
644 logging.basicConfig(level=log_level)
645
646 # Initialize journal path (auto-creates if needed)
647 get_journal()
648
649 # Load config env from journal.json — strict source for API keys
650 config = get_config()
651 for key, value in config.get("env", {}).items():
652 os.environ[key] = str(value)
653
654 return (args, extra) if parse_known else args
655
656
657def parse_time_range(text: str) -> Optional[tuple[str, str, str]]:
658 """Return ``(day, start, end)`` from a natural language time range.
659
660 Parameters
661 ----------
662 text:
663 Natural language description of a time range.
664
665 Returns
666 -------
667 tuple[str, str, str] | None
668 ``(day, start, end)`` if a single range within one day was detected.
669 ``day`` is ``YYYYMMDD`` and ``start``/``end`` are ``HHMMSS``. ``None``
670 if parsing fails.
671 """
672
673 try:
674 result = timefhuman(text)
675 except Exception as exc: # pragma: no cover - unexpected library failure
676 logging.info("timefhuman failed for %s: %s", text, exc)
677 return None
678
679 logging.debug("timefhuman(%s) -> %r", text, result)
680
681 if len(result) != 1:
682 logging.info("timefhuman did not return a single expression for %s", text)
683 return None
684
685 range_item = result[0]
686 if not isinstance(range_item, tuple) or len(range_item) != 2:
687 logging.info("Expected a range from %s but got %r", text, range_item)
688 return None
689
690 start_dt, end_dt = range_item
691 if start_dt.date() != end_dt.date():
692 logging.info("Range must be within a single day: %s -> %s", start_dt, end_dt)
693 return None
694
695 day = start_dt.strftime("%Y%m%d")
696 start = start_dt.strftime("%H%M%S")
697 end = end_dt.strftime("%H%M%S")
698 return day, start, end
699
700
701def get_raw_file(day: str, name: str) -> tuple[str, str, Any]:
702 """Return raw file path, mime type and metadata for a transcript.
703
704 Parameters
705 ----------
706 day:
707 Day folder in ``YYYYMMDD`` format.
708 name:
709 Transcript filename such as ``HHMMSS/audio.jsonl``,
710 ``HHMMSS/monitor_1_diff.json``, or ``HHMMSS/screen.jsonl``.
711
712 Returns
713 -------
714 tuple[str, str, Any]
715 ``(path, mime_type, metadata)`` where ``path`` is relative to the day
716 directory (read from metadata header), ``mime_type`` is determined
717 from the raw file extension, and ``metadata`` contains the parsed
718 JSON data (empty on failure).
719 """
720
721 day_dir = day_path(day)
722 transcript_path = day_dir / name
723
724 rel = None
725 meta: Any = {}
726
727 try:
728 with open(transcript_path, "r", encoding="utf-8") as f:
729 if name.endswith(".jsonl"):
730 # First line is metadata header with "raw" field
731 first_line = f.readline().strip()
732 if first_line:
733 header = json.loads(first_line)
734 rel = header.get("raw")
735
736 # Read remaining lines as metadata
737 meta = [json.loads(line) for line in f if line.strip()]
738 else:
739 # Non-JSONL format (e.g., _diff.json)
740 meta = json.load(f)
741 rel = meta.get("raw")
742 except Exception: # pragma: no cover - optional metadata
743 logging.debug("Failed to read %s", transcript_path)
744
745 if not rel:
746 raise ValueError(f"No 'raw' field found in metadata for {name}")
747
748 suffix = Path(rel).suffix.lower()
749 mime = {**MIME_TYPES, ".png": "image/png"}.get(suffix, "application/octet-stream")
750
751 return rel, mime, meta
752
753
754# =============================================================================
755# SOL_* Environment Variable Helpers
756# =============================================================================
757
758
759def get_sol_day() -> str | None:
760 """Read SOL_DAY from the environment."""
761 return os.environ.get("SOL_DAY") or None
762
763
764def get_sol_facet() -> str | None:
765 """Read SOL_FACET from the environment."""
766 return os.environ.get("SOL_FACET") or None
767
768
769def get_sol_segment() -> str | None:
770 """Read SOL_SEGMENT from the environment."""
771 return os.environ.get("SOL_SEGMENT") or None
772
773
774def get_sol_stream() -> str | None:
775 """Read SOL_STREAM from the environment."""
776 return os.environ.get("SOL_STREAM") or None
777
778
779def get_sol_activity() -> str | None:
780 """Read SOL_ACTIVITY from the environment."""
781 return os.environ.get("SOL_ACTIVITY") or None
782
783
784def resolve_sol_day(arg: str | None) -> str:
785 """Return *arg* if provided, else SOL_DAY from env, else exit with error.
786
787 Intended for CLI commands where ``day`` is required but can be supplied
788 via the SOL_DAY environment variable as a convenience.
789 """
790 if arg:
791 return arg
792 env = get_sol_day()
793 if env:
794 return env
795 import typer
796
797 typer.echo("Error: day is required (pass as argument or set SOL_DAY).", err=True)
798 raise typer.Exit(1)
799
800
801def resolve_sol_facet(arg: str | None) -> str:
802 """Return *arg* if provided, else SOL_FACET from env, else exit with error.
803
804 Intended for CLI commands where ``facet`` is required but can be supplied
805 via the SOL_FACET environment variable as a convenience.
806 """
807 if arg:
808 return arg
809 env = get_sol_facet()
810 if env:
811 return env
812 import typer
813
814 typer.echo(
815 "Error: facet is required (pass as argument or set SOL_FACET).", err=True
816 )
817 raise typer.Exit(1)
818
819
820def resolve_sol_segment(arg: str | None) -> str | None:
821 """Return *arg* if provided, else SOL_SEGMENT from env, else None.
822
823 Unlike :func:`resolve_sol_day` this does **not** error when missing
824 because segment is typically optional.
825 """
826 if arg:
827 return arg
828 return get_sol_segment()
829
830
831# =============================================================================
832# Service Port Discovery
833# =============================================================================
834
835
836def find_available_port(host: str = "127.0.0.1") -> int:
837 """Find an available port by binding to port 0.
838
839 Uses the socket bind/getsockname/close pattern to let the OS assign
840 an available port.
841
842 Args:
843 host: Host address to bind to (default: 127.0.0.1)
844
845 Returns:
846 Available port number
847 """
848 import socket
849
850 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
851 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
852 sock.bind((host, 0))
853 _, port = sock.getsockname()
854 sock.close()
855 return port
856
857
858def write_service_port(service: str, port: int) -> None:
859 """Write a service's port to the health directory.
860
861 Creates journal/health/{service}.port with the port number.
862
863 Args:
864 service: Service name (e.g., "convey", "cortex")
865 port: Port number to write
866 """
867 health_dir = Path(get_journal()) / "health"
868 health_dir.mkdir(parents=True, exist_ok=True)
869 port_file = health_dir / f"{service}.port"
870 port_file.write_text(str(port))
871
872
873def read_service_port(service: str) -> int | None:
874 """Read a service's port from the health directory.
875
876 Args:
877 service: Service name (e.g., "convey", "cortex")
878
879 Returns:
880 Port number if file exists and is valid, None otherwise
881 """
882 port_file = Path(get_journal()) / "health" / f"{service}.port"
883 try:
884 return int(port_file.read_text().strip())
885 except (FileNotFoundError, ValueError):
886 return None