personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Stream identity for journal segments.
5
6A stream is a named series of segments from a single source. Every segment
7belongs to exactly one stream and links to its predecessor, creating a
8navigable chain with human-readable identity.
9
10Naming convention (separator is '.'):
11 Local observer: {hostname} e.g. "archon" (domain stripped: archon.local -> archon)
12 Local tmux: {hostname}.tmux e.g. "archon.tmux"
13 Observer: {observer_name} e.g. "laptop" (domain stripped: laptop.local -> laptop)
14 Import (Apple): import.apple
15 Import (Plaud): import.plaud
16 Import (generic): import.audio
17 Import (text): import.text
18
19Storage:
20 journal/streams/{name}.json - per-stream state (last segment, seq)
21 {segment_dir}/stream.json - per-segment marker (stream, prev, seq)
22"""
23
24from __future__ import annotations
25
26import json
27import logging
28import os
29import re
30import threading
31import time
32from pathlib import Path
33
34from think.utils import get_journal, iter_segments
35
36logger = logging.getLogger(__name__)
37
38# Valid stream name: lowercase, dots allowed, no path separators
39_STREAM_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9._-]*$")
40
41
42def _strip_hostname(name: str) -> str:
43 """Strip domain suffix from a hostname, keeping only the first label.
44
45 Dots in stream names are reserved for qualifiers (e.g., '.tmux') and
46 import prefixes (e.g., 'import.apple'). Hostnames like 'ja1r.local'
47 or '192.168.1.1' must be reduced to a dot-free base name.
48
49 Examples: 'ja1r.local' -> 'ja1r', '192.168.1.1' -> '192-168-1-1',
50 'archon' -> 'archon', 'my.host.example.com' -> 'my'
51 """
52 name = name.strip()
53 if not name:
54 return name
55 # IP addresses: all parts are digits — join with dashes
56 parts = name.split(".")
57 if all(p.isdigit() for p in parts if p):
58 return "-".join(p for p in parts if p)
59 # Domain names: keep only the first label
60 return parts[0]
61
62
63def stream_name(
64 *,
65 host: str | None = None,
66 observer: str | None = None,
67 import_source: str | None = None,
68 qualifier: str | None = None,
69) -> str:
70 """Derive canonical stream name from source characteristics.
71
72 Exactly one of host, observer, or import_source must be provided.
73
74 Parameters
75 ----------
76 host : str, optional
77 Local hostname (e.g., "archon").
78 observer : str, optional
79 Observer name (e.g., "laptop").
80 import_source : str, optional
81 Import source type (e.g., "apple", "plaud", "audio", "text").
82 qualifier : str, optional
83 Sub-stream qualifier (e.g., "tmux"). Appended with dot separator.
84
85 Returns
86 -------
87 str
88 Canonical stream name.
89
90 Raises
91 ------
92 ValueError
93 If no source is provided, or the resulting name is invalid.
94 """
95 if host:
96 base = _strip_hostname(host)
97 elif observer:
98 base = _strip_hostname(observer)
99 elif import_source:
100 base = f"import.{import_source}"
101 else:
102 raise ValueError("stream_name requires host, observer, or import_source")
103
104 # Sanitize: lowercase, replace spaces/slashes with dash, strip
105 name = base.lower().strip()
106 name = re.sub(r"[\s/\\]+", "-", name)
107
108 if qualifier:
109 qualifier = qualifier.lower().strip()
110 qualifier = re.sub(r"[\s/\\]+", "-", qualifier)
111 name = f"{name}.{qualifier}"
112
113 # Validate
114 if not name or ".." in name:
115 raise ValueError(f"Invalid stream name: {name!r}")
116 if not _STREAM_NAME_RE.match(name):
117 raise ValueError(f"Invalid stream name: {name!r}")
118
119 return name
120
121
122def _streams_dir() -> Path:
123 """Return the streams state directory, creating it if needed."""
124 d = Path(get_journal()) / "streams"
125 d.mkdir(parents=True, exist_ok=True)
126 return d
127
128
129def get_stream_state(name: str) -> dict | None:
130 """Load stream state from journal/streams/{name}.json.
131
132 Returns
133 -------
134 dict or None
135 Stream state dict, or None if the stream doesn't exist.
136 """
137 path = _streams_dir() / f"{name}.json"
138 if not path.exists():
139 return None
140 try:
141 with open(path, "r", encoding="utf-8") as f:
142 return json.load(f)
143 except (json.JSONDecodeError, OSError) as exc:
144 logger.warning("Failed to read stream state %s: %s", path, exc)
145 return None
146
147
148def update_stream(
149 name: str,
150 day: str,
151 segment: str,
152 *,
153 type: str | None = None,
154 host: str | None = None,
155 platform: str | None = None,
156) -> dict:
157 """Atomic read-modify-write of stream state file.
158
159 Creates the stream file on first segment. Increments seq and updates
160 last_day/last_segment.
161
162 Parameters
163 ----------
164 name : str
165 Stream name.
166 day : str
167 Day string (YYYYMMDD).
168 segment : str
169 Segment key (HHMMSS_LEN).
170 type : str, optional
171 Stream type (e.g., "observer", "import").
172 host : str, optional
173 Hostname for the stream.
174 platform : str, optional
175 Platform string (e.g., "linux", "darwin").
176
177 Returns
178 -------
179 dict
180 ``{"prev_day": ..., "prev_segment": ..., "seq": N}`` where prev
181 values are None for the first segment in a stream.
182 """
183 streams_dir = _streams_dir()
184 state_path = streams_dir / f"{name}.json"
185
186 # Read existing state
187 state = None
188 if state_path.exists():
189 try:
190 with open(state_path, "r", encoding="utf-8") as f:
191 state = json.load(f)
192 except (json.JSONDecodeError, OSError):
193 state = None
194
195 if state is None:
196 # First segment in stream
197 state = {
198 "name": name,
199 "type": type or "unknown",
200 "host": host,
201 "platform": platform,
202 "created_at": int(time.time()),
203 "last_day": day,
204 "last_segment": segment,
205 "seq": 1,
206 }
207 prev_day = None
208 prev_segment = None
209 seq = 1
210 else:
211 prev_day = state.get("last_day")
212 prev_segment = state.get("last_segment")
213 seq = state.get("seq", 0) + 1
214 state["last_day"] = day
215 state["last_segment"] = segment
216 state["seq"] = seq
217 # Update type/host/platform if provided (may be set on first call only)
218 if type:
219 state["type"] = type
220 if host:
221 state["host"] = host
222 if platform:
223 state["platform"] = platform
224
225 # Atomic write: write to unique tmp file then rename
226 tid = threading.get_ident()
227 tmp_path = state_path.with_suffix(f".{os.getpid()}-{tid}.tmp")
228 with open(tmp_path, "w", encoding="utf-8") as f:
229 json.dump(state, f, indent=2)
230 f.write("\n")
231 os.rename(str(tmp_path), str(state_path))
232
233 return {"prev_day": prev_day, "prev_segment": prev_segment, "seq": seq}
234
235
236def write_segment_stream(
237 segment_dir: str | Path,
238 stream: str,
239 prev_day: str | None,
240 prev_segment: str | None,
241 seq: int,
242) -> None:
243 """Write stream.json marker into a segment directory.
244
245 Parameters
246 ----------
247 segment_dir : str or Path
248 Path to the segment directory.
249 stream : str
250 Stream name.
251 prev_day : str or None
252 Previous segment's day (None for first segment).
253 prev_segment : str or None
254 Previous segment's key (None for first segment).
255 seq : int
256 Sequence number in stream.
257 """
258 marker = {
259 "stream": stream,
260 "prev_day": prev_day,
261 "prev_segment": prev_segment,
262 "seq": seq,
263 }
264 marker_path = Path(segment_dir) / "stream.json"
265 with open(marker_path, "w", encoding="utf-8") as f:
266 json.dump(marker, f)
267 f.write("\n")
268
269
270def read_segment_stream(segment_dir: str | Path) -> dict | None:
271 """Read stream.json from a segment directory.
272
273 Returns
274 -------
275 dict or None
276 Stream marker dict, or None if the file doesn't exist (pre-stream segments).
277 """
278 marker_path = Path(segment_dir) / "stream.json"
279 if not marker_path.exists():
280 return None
281 try:
282 with open(marker_path, "r", encoding="utf-8") as f:
283 return json.load(f)
284 except (json.JSONDecodeError, OSError) as exc:
285 logger.warning("Failed to read stream marker %s: %s", marker_path, exc)
286 return None
287
288
289def list_streams() -> list[dict]:
290 """List all stream state files from journal/streams/.
291
292 Returns
293 -------
294 list[dict]
295 List of stream state dicts, sorted by name.
296 """
297 streams_dir = _streams_dir()
298 result = []
299 for path in sorted(streams_dir.glob("*.json")):
300 try:
301 with open(path, "r", encoding="utf-8") as f:
302 state = json.load(f)
303 result.append(state)
304 except (json.JSONDecodeError, OSError) as exc:
305 logger.warning("Failed to read stream %s: %s", path, exc)
306 return result
307
308
309def rebuild_stream_state(name: str | None = None) -> dict:
310 """Reconstruct stream state from per-segment markers.
311
312 Walks all day directories and reads stream.json from each segment.
313 Rebuilds the stream state file(s) from this data.
314
315 Parameters
316 ----------
317 name : str, optional
318 If given, rebuild only this stream. Otherwise rebuild all.
319
320 Returns
321 -------
322 dict
323 Summary: ``{"rebuilt": ["stream1", ...], "segments_scanned": N}``
324 """
325 from think.utils import day_dirs
326
327 streams: dict[str, dict] = {} # name -> {last_day, last_segment, seq, ...}
328 segments_scanned = 0
329
330 for day in sorted(day_dirs().keys()):
331 for _stream_name, seg_key, seg_dir in iter_segments(day):
332 marker = read_segment_stream(seg_dir)
333 if marker is None:
334 continue
335
336 stream_name_val = marker.get("stream")
337 if not stream_name_val:
338 continue
339
340 # Skip if filtering to specific stream
341 if name and stream_name_val != name:
342 continue
343
344 segments_scanned += 1
345 seq = marker.get("seq", 0)
346
347 if stream_name_val not in streams:
348 streams[stream_name_val] = {
349 "name": stream_name_val,
350 "type": "unknown",
351 "host": None,
352 "platform": None,
353 "created_at": int(time.time()),
354 "last_day": day,
355 "last_segment": seg_key,
356 "seq": seq,
357 }
358 else:
359 existing = streams[stream_name_val]
360 # Update if this segment has a higher seq
361 if seq > existing.get("seq", 0):
362 existing["last_day"] = day
363 existing["last_segment"] = seg_key
364 existing["seq"] = seq
365
366 # Write rebuilt state files
367 streams_dir = _streams_dir()
368 rebuilt = []
369 for sname, state in streams.items():
370 state_path = streams_dir / f"{sname}.json"
371 tmp_path = state_path.with_suffix(f".{os.getpid()}.tmp")
372 with open(tmp_path, "w", encoding="utf-8") as f:
373 json.dump(state, f, indent=2)
374 f.write("\n")
375 os.rename(str(tmp_path), str(state_path))
376 rebuilt.append(sname)
377
378 return {"rebuilt": rebuilt, "segments_scanned": segments_scanned}
379
380
381def main() -> None:
382 """CLI entry point for sol streams."""
383 import argparse
384
385 from think.utils import setup_cli
386
387 parser = argparse.ArgumentParser(description="Inspect and manage stream identity")
388 parser.add_argument(
389 "name",
390 nargs="?",
391 help="Stream name to inspect (omit to list all streams)",
392 )
393 parser.add_argument(
394 "--rebuild",
395 action="store_true",
396 help="Reconstruct stream state from per-segment markers",
397 )
398
399 args = setup_cli(parser)
400
401 if args.rebuild:
402 summary = rebuild_stream_state(name=args.name)
403 rebuilt = summary["rebuilt"]
404 scanned = summary["segments_scanned"]
405 if rebuilt:
406 print(f"Rebuilt {len(rebuilt)} stream(s) from {scanned} segments:")
407 for name in rebuilt:
408 print(f" {name}")
409 else:
410 print(f"No streams found ({scanned} segments scanned)")
411 return
412
413 if args.name:
414 # Inspect single stream
415 state = get_stream_state(args.name)
416 if state is None:
417 print(f"Stream not found: {args.name}")
418 raise SystemExit(1)
419 print(json.dumps(state, indent=2))
420 return
421
422 # List all streams
423 streams = list_streams()
424 if not streams:
425 print("No streams found")
426 return
427
428 # Table header
429 print(f"{'Name':<24} {'Type':<12} {'Last Day':<10} {'Last Segment':<16} {'Seq':>5}")
430 print("-" * 71)
431 for s in streams:
432 print(
433 f"{s.get('name', '?'):<24} "
434 f"{s.get('type', '?'):<12} "
435 f"{s.get('last_day', '?'):<10} "
436 f"{s.get('last_segment', '?'):<16} "
437 f"{s.get('seq', 0):>5}"
438 )