personal memory agent
at main 438 lines 13 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Stream identity for journal segments. 5 6A stream is a named series of segments from a single source. Every segment 7belongs to exactly one stream and links to its predecessor, creating a 8navigable chain with human-readable identity. 9 10Naming convention (separator is '.'): 11 Local observer: {hostname} e.g. "archon" (domain stripped: archon.local -> archon) 12 Local tmux: {hostname}.tmux e.g. "archon.tmux" 13 Observer: {observer_name} e.g. "laptop" (domain stripped: laptop.local -> laptop) 14 Import (Apple): import.apple 15 Import (Plaud): import.plaud 16 Import (generic): import.audio 17 Import (text): import.text 18 19Storage: 20 journal/streams/{name}.json - per-stream state (last segment, seq) 21 {segment_dir}/stream.json - per-segment marker (stream, prev, seq) 22""" 23 24from __future__ import annotations 25 26import json 27import logging 28import os 29import re 30import threading 31import time 32from pathlib import Path 33 34from think.utils import get_journal, iter_segments 35 36logger = logging.getLogger(__name__) 37 38# Valid stream name: lowercase, dots allowed, no path separators 39_STREAM_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9._-]*$") 40 41 42def _strip_hostname(name: str) -> str: 43 """Strip domain suffix from a hostname, keeping only the first label. 44 45 Dots in stream names are reserved for qualifiers (e.g., '.tmux') and 46 import prefixes (e.g., 'import.apple'). Hostnames like 'ja1r.local' 47 or '192.168.1.1' must be reduced to a dot-free base name. 48 49 Examples: 'ja1r.local' -> 'ja1r', '192.168.1.1' -> '192-168-1-1', 50 'archon' -> 'archon', 'my.host.example.com' -> 'my' 51 """ 52 name = name.strip() 53 if not name: 54 return name 55 # IP addresses: all parts are digits — join with dashes 56 parts = name.split(".") 57 if all(p.isdigit() for p in parts if p): 58 return "-".join(p for p in parts if p) 59 # Domain names: keep only the first label 60 return parts[0] 61 62 63def stream_name( 64 *, 65 host: str | None = None, 66 observer: str | None = None, 67 import_source: str | None = None, 68 qualifier: str | None = None, 69) -> str: 70 """Derive canonical stream name from source characteristics. 71 72 Exactly one of host, observer, or import_source must be provided. 73 74 Parameters 75 ---------- 76 host : str, optional 77 Local hostname (e.g., "archon"). 78 observer : str, optional 79 Observer name (e.g., "laptop"). 80 import_source : str, optional 81 Import source type (e.g., "apple", "plaud", "audio", "text"). 82 qualifier : str, optional 83 Sub-stream qualifier (e.g., "tmux"). Appended with dot separator. 84 85 Returns 86 ------- 87 str 88 Canonical stream name. 89 90 Raises 91 ------ 92 ValueError 93 If no source is provided, or the resulting name is invalid. 94 """ 95 if host: 96 base = _strip_hostname(host) 97 elif observer: 98 base = _strip_hostname(observer) 99 elif import_source: 100 base = f"import.{import_source}" 101 else: 102 raise ValueError("stream_name requires host, observer, or import_source") 103 104 # Sanitize: lowercase, replace spaces/slashes with dash, strip 105 name = base.lower().strip() 106 name = re.sub(r"[\s/\\]+", "-", name) 107 108 if qualifier: 109 qualifier = qualifier.lower().strip() 110 qualifier = re.sub(r"[\s/\\]+", "-", qualifier) 111 name = f"{name}.{qualifier}" 112 113 # Validate 114 if not name or ".." in name: 115 raise ValueError(f"Invalid stream name: {name!r}") 116 if not _STREAM_NAME_RE.match(name): 117 raise ValueError(f"Invalid stream name: {name!r}") 118 119 return name 120 121 122def _streams_dir() -> Path: 123 """Return the streams state directory, creating it if needed.""" 124 d = Path(get_journal()) / "streams" 125 d.mkdir(parents=True, exist_ok=True) 126 return d 127 128 129def get_stream_state(name: str) -> dict | None: 130 """Load stream state from journal/streams/{name}.json. 131 132 Returns 133 ------- 134 dict or None 135 Stream state dict, or None if the stream doesn't exist. 136 """ 137 path = _streams_dir() / f"{name}.json" 138 if not path.exists(): 139 return None 140 try: 141 with open(path, "r", encoding="utf-8") as f: 142 return json.load(f) 143 except (json.JSONDecodeError, OSError) as exc: 144 logger.warning("Failed to read stream state %s: %s", path, exc) 145 return None 146 147 148def update_stream( 149 name: str, 150 day: str, 151 segment: str, 152 *, 153 type: str | None = None, 154 host: str | None = None, 155 platform: str | None = None, 156) -> dict: 157 """Atomic read-modify-write of stream state file. 158 159 Creates the stream file on first segment. Increments seq and updates 160 last_day/last_segment. 161 162 Parameters 163 ---------- 164 name : str 165 Stream name. 166 day : str 167 Day string (YYYYMMDD). 168 segment : str 169 Segment key (HHMMSS_LEN). 170 type : str, optional 171 Stream type (e.g., "observer", "import"). 172 host : str, optional 173 Hostname for the stream. 174 platform : str, optional 175 Platform string (e.g., "linux", "darwin"). 176 177 Returns 178 ------- 179 dict 180 ``{"prev_day": ..., "prev_segment": ..., "seq": N}`` where prev 181 values are None for the first segment in a stream. 182 """ 183 streams_dir = _streams_dir() 184 state_path = streams_dir / f"{name}.json" 185 186 # Read existing state 187 state = None 188 if state_path.exists(): 189 try: 190 with open(state_path, "r", encoding="utf-8") as f: 191 state = json.load(f) 192 except (json.JSONDecodeError, OSError): 193 state = None 194 195 if state is None: 196 # First segment in stream 197 state = { 198 "name": name, 199 "type": type or "unknown", 200 "host": host, 201 "platform": platform, 202 "created_at": int(time.time()), 203 "last_day": day, 204 "last_segment": segment, 205 "seq": 1, 206 } 207 prev_day = None 208 prev_segment = None 209 seq = 1 210 else: 211 prev_day = state.get("last_day") 212 prev_segment = state.get("last_segment") 213 seq = state.get("seq", 0) + 1 214 state["last_day"] = day 215 state["last_segment"] = segment 216 state["seq"] = seq 217 # Update type/host/platform if provided (may be set on first call only) 218 if type: 219 state["type"] = type 220 if host: 221 state["host"] = host 222 if platform: 223 state["platform"] = platform 224 225 # Atomic write: write to unique tmp file then rename 226 tid = threading.get_ident() 227 tmp_path = state_path.with_suffix(f".{os.getpid()}-{tid}.tmp") 228 with open(tmp_path, "w", encoding="utf-8") as f: 229 json.dump(state, f, indent=2) 230 f.write("\n") 231 os.rename(str(tmp_path), str(state_path)) 232 233 return {"prev_day": prev_day, "prev_segment": prev_segment, "seq": seq} 234 235 236def write_segment_stream( 237 segment_dir: str | Path, 238 stream: str, 239 prev_day: str | None, 240 prev_segment: str | None, 241 seq: int, 242) -> None: 243 """Write stream.json marker into a segment directory. 244 245 Parameters 246 ---------- 247 segment_dir : str or Path 248 Path to the segment directory. 249 stream : str 250 Stream name. 251 prev_day : str or None 252 Previous segment's day (None for first segment). 253 prev_segment : str or None 254 Previous segment's key (None for first segment). 255 seq : int 256 Sequence number in stream. 257 """ 258 marker = { 259 "stream": stream, 260 "prev_day": prev_day, 261 "prev_segment": prev_segment, 262 "seq": seq, 263 } 264 marker_path = Path(segment_dir) / "stream.json" 265 with open(marker_path, "w", encoding="utf-8") as f: 266 json.dump(marker, f) 267 f.write("\n") 268 269 270def read_segment_stream(segment_dir: str | Path) -> dict | None: 271 """Read stream.json from a segment directory. 272 273 Returns 274 ------- 275 dict or None 276 Stream marker dict, or None if the file doesn't exist (pre-stream segments). 277 """ 278 marker_path = Path(segment_dir) / "stream.json" 279 if not marker_path.exists(): 280 return None 281 try: 282 with open(marker_path, "r", encoding="utf-8") as f: 283 return json.load(f) 284 except (json.JSONDecodeError, OSError) as exc: 285 logger.warning("Failed to read stream marker %s: %s", marker_path, exc) 286 return None 287 288 289def list_streams() -> list[dict]: 290 """List all stream state files from journal/streams/. 291 292 Returns 293 ------- 294 list[dict] 295 List of stream state dicts, sorted by name. 296 """ 297 streams_dir = _streams_dir() 298 result = [] 299 for path in sorted(streams_dir.glob("*.json")): 300 try: 301 with open(path, "r", encoding="utf-8") as f: 302 state = json.load(f) 303 result.append(state) 304 except (json.JSONDecodeError, OSError) as exc: 305 logger.warning("Failed to read stream %s: %s", path, exc) 306 return result 307 308 309def rebuild_stream_state(name: str | None = None) -> dict: 310 """Reconstruct stream state from per-segment markers. 311 312 Walks all day directories and reads stream.json from each segment. 313 Rebuilds the stream state file(s) from this data. 314 315 Parameters 316 ---------- 317 name : str, optional 318 If given, rebuild only this stream. Otherwise rebuild all. 319 320 Returns 321 ------- 322 dict 323 Summary: ``{"rebuilt": ["stream1", ...], "segments_scanned": N}`` 324 """ 325 from think.utils import day_dirs 326 327 streams: dict[str, dict] = {} # name -> {last_day, last_segment, seq, ...} 328 segments_scanned = 0 329 330 for day in sorted(day_dirs().keys()): 331 for _stream_name, seg_key, seg_dir in iter_segments(day): 332 marker = read_segment_stream(seg_dir) 333 if marker is None: 334 continue 335 336 stream_name_val = marker.get("stream") 337 if not stream_name_val: 338 continue 339 340 # Skip if filtering to specific stream 341 if name and stream_name_val != name: 342 continue 343 344 segments_scanned += 1 345 seq = marker.get("seq", 0) 346 347 if stream_name_val not in streams: 348 streams[stream_name_val] = { 349 "name": stream_name_val, 350 "type": "unknown", 351 "host": None, 352 "platform": None, 353 "created_at": int(time.time()), 354 "last_day": day, 355 "last_segment": seg_key, 356 "seq": seq, 357 } 358 else: 359 existing = streams[stream_name_val] 360 # Update if this segment has a higher seq 361 if seq > existing.get("seq", 0): 362 existing["last_day"] = day 363 existing["last_segment"] = seg_key 364 existing["seq"] = seq 365 366 # Write rebuilt state files 367 streams_dir = _streams_dir() 368 rebuilt = [] 369 for sname, state in streams.items(): 370 state_path = streams_dir / f"{sname}.json" 371 tmp_path = state_path.with_suffix(f".{os.getpid()}.tmp") 372 with open(tmp_path, "w", encoding="utf-8") as f: 373 json.dump(state, f, indent=2) 374 f.write("\n") 375 os.rename(str(tmp_path), str(state_path)) 376 rebuilt.append(sname) 377 378 return {"rebuilt": rebuilt, "segments_scanned": segments_scanned} 379 380 381def main() -> None: 382 """CLI entry point for sol streams.""" 383 import argparse 384 385 from think.utils import setup_cli 386 387 parser = argparse.ArgumentParser(description="Inspect and manage stream identity") 388 parser.add_argument( 389 "name", 390 nargs="?", 391 help="Stream name to inspect (omit to list all streams)", 392 ) 393 parser.add_argument( 394 "--rebuild", 395 action="store_true", 396 help="Reconstruct stream state from per-segment markers", 397 ) 398 399 args = setup_cli(parser) 400 401 if args.rebuild: 402 summary = rebuild_stream_state(name=args.name) 403 rebuilt = summary["rebuilt"] 404 scanned = summary["segments_scanned"] 405 if rebuilt: 406 print(f"Rebuilt {len(rebuilt)} stream(s) from {scanned} segments:") 407 for name in rebuilt: 408 print(f" {name}") 409 else: 410 print(f"No streams found ({scanned} segments scanned)") 411 return 412 413 if args.name: 414 # Inspect single stream 415 state = get_stream_state(args.name) 416 if state is None: 417 print(f"Stream not found: {args.name}") 418 raise SystemExit(1) 419 print(json.dumps(state, indent=2)) 420 return 421 422 # List all streams 423 streams = list_streams() 424 if not streams: 425 print("No streams found") 426 return 427 428 # Table header 429 print(f"{'Name':<24} {'Type':<12} {'Last Day':<10} {'Last Segment':<16} {'Seq':>5}") 430 print("-" * 71) 431 for s in streams: 432 print( 433 f"{s.get('name', '?'):<24} " 434 f"{s.get('type', '?'):<12} " 435 f"{s.get('last_day', '?'):<10} " 436 f"{s.get('last_segment', '?'):<16} " 437 f"{s.get('seq', 0):>5}" 438 )