personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""CLI for viewing service health logs.
5
6Usage:
7 sol health logs Show last 5 lines from each service
8 sol health logs -c 20 Show last 20 lines from each service
9 sol health logs -f Follow all logs for new output
10 sol health logs --since 30m Lines from last 30 minutes
11 sol health logs --service observer Only show observer logs
12 sol health logs --grep "error" Lines matching regex "error"
13"""
14
15from __future__ import annotations
16
17import argparse
18import os
19import re
20import sys
21import time
22from datetime import datetime, timedelta
23from pathlib import Path
24from typing import NamedTuple
25
26from think.utils import get_journal, setup_cli
27
28_DIM = "\033[2m"
29_RESET = "\033[0m"
30
31
32class LogLine(NamedTuple):
33 timestamp: datetime
34 service: str
35 stream: str
36 message: str
37 raw: str
38
39
40def _service_header(service: str, use_color: bool) -> str:
41 header = f"── {service} ──"
42 if use_color:
43 return f"{_DIM}{header}{_RESET}"
44 return header
45
46
47def parse_log_line(line: str) -> LogLine | None:
48 stripped = line.rstrip()
49 if len(stripped) < 20:
50 return None
51
52 try:
53 timestamp = datetime.fromisoformat(stripped[0:19])
54 except ValueError:
55 return None
56
57 open_idx = stripped.find("[", 19)
58 if open_idx == -1:
59 return None
60
61 close_idx = stripped.find("]", open_idx + 1)
62 if close_idx == -1:
63 return None
64
65 bracket = stripped[open_idx + 1 : close_idx]
66 parts = bracket.rsplit(":", 1)
67 if len(parts) != 2:
68 return None
69 service, stream = parts
70
71 message_start = close_idx + 2
72 message = stripped[message_start:] if message_start <= len(stripped) else ""
73
74 return LogLine(timestamp, service, stream, message, stripped)
75
76
77def parse_since(spec: str) -> datetime:
78 spec = spec.strip()
79 match = re.fullmatch(r"(\d+)([mhd])", spec)
80 if match:
81 amount = int(match.group(1))
82 unit = match.group(2)
83 unit_map = {"m": "minutes", "h": "hours", "d": "days"}
84 return datetime.now() - timedelta(**{unit_map[unit]: amount})
85
86 spec_upper = spec.upper()
87 formats = ["%I:%M%p", "%I%p", "%H:%M"]
88 for fmt in formats:
89 try:
90 parsed = datetime.strptime(spec_upper, fmt)
91 now = datetime.now()
92 return parsed.replace(year=now.year, month=now.month, day=now.day)
93 except ValueError:
94 continue
95
96 raise argparse.ArgumentTypeError(
97 f"Invalid time: {spec!r}. Use e.g., 30m, 2h, 1d, 4pm, 16:00"
98 )
99
100
101def compile_grep(pattern: str) -> re.Pattern[str]:
102 try:
103 return re.compile(pattern)
104 except re.error as error:
105 raise argparse.ArgumentTypeError(
106 f"Invalid Python regex: {pattern!r}: {error}"
107 ) from error
108
109
110def get_today_health_dir() -> Path | None:
111 journal = Path(os.path.expanduser(get_journal()))
112 today = datetime.now().strftime("%Y%m%d")
113 health_dir = journal / today / "health"
114 return health_dir if health_dir.is_dir() else None
115
116
117def get_day_log_files(health_dir: Path) -> list[Path]:
118 return sorted(p for p in health_dir.glob("*.log") if p.is_symlink())
119
120
121def tail_lines(path: Path, n: int) -> list[str]:
122 try:
123 lines = path.read_text(encoding="utf-8").splitlines()
124 return lines[-n:] if n else lines
125 except OSError:
126 return []
127
128
129def tail_lines_large(path: Path, n: int) -> list[str]:
130 try:
131 with path.open("rb") as f:
132 f.seek(0, 2)
133 size = f.tell()
134 if size == 0:
135 return []
136 chunk_size = 65536
137 lines: list[str] = []
138 remaining = size
139 while remaining > 0 and len(lines) < n + 1:
140 read_size = min(chunk_size, remaining)
141 remaining -= read_size
142 f.seek(remaining)
143 chunk = f.read(read_size).decode("utf-8", errors="replace")
144 lines = chunk.splitlines() + lines
145 return lines[-n:]
146 except OSError:
147 return []
148
149
150def _matches_filters(line: LogLine, args: argparse.Namespace) -> bool:
151 if args.since and line.timestamp < args.since:
152 return False
153 if args.service and line.service != args.service:
154 return False
155 if args.grep and not args.grep.search(line.raw):
156 return False
157 return True
158
159
160def collect_and_print(args: argparse.Namespace) -> None:
161 has_filters = args.since or args.service or args.grep
162 include_supervisor = not has_filters
163
164 lines: list[LogLine] = []
165
166 health_dir = get_today_health_dir()
167 if health_dir:
168 for log_path in get_day_log_files(health_dir):
169 raw_lines = (
170 tail_lines(log_path, 0) if has_filters else tail_lines(log_path, args.c)
171 )
172 for raw in raw_lines:
173 parsed = parse_log_line(raw)
174 if parsed and _matches_filters(parsed, args):
175 lines.append(parsed)
176
177 if include_supervisor:
178 journal = Path(os.path.expanduser(get_journal()))
179 sup_path = journal / "health" / "supervisor.log"
180 if sup_path.exists():
181 raw_lines = tail_lines_large(sup_path, args.c)
182 for raw in raw_lines:
183 parsed = parse_log_line(raw)
184 if parsed:
185 lines.append(parsed)
186
187 lines.sort(key=lambda line: line.timestamp)
188 if has_filters and args.c:
189 lines = lines[-args.c :]
190 use_color = sys.stdout.isatty()
191 last_service = None
192 for line in lines:
193 if use_color and line.service != last_service:
194 if last_service is not None:
195 print()
196 print(_service_header(line.service, use_color))
197 last_service = line.service
198 print(line.raw)
199
200
201def follow_logs(args: argparse.Namespace) -> None:
202 journal = Path(os.path.expanduser(get_journal()))
203 health_dir = journal / "health"
204 if not health_dir.is_dir():
205 print("No health directory found.", file=sys.stderr)
206 return
207
208 last_service = None
209 use_color = sys.stdout.isatty()
210 tracked: dict[Path, tuple[Path | None, object]] = {}
211
212 def open_logs() -> None:
213 for log_path in sorted(health_dir.glob("*.log")):
214 if log_path not in tracked:
215 try:
216 resolved = log_path.resolve()
217 fh = open(resolved, "r", encoding="utf-8")
218 fh.seek(0, 2)
219 tracked[log_path] = (resolved, fh)
220 except OSError:
221 pass
222
223 open_logs()
224
225 if not tracked:
226 print("No log files found.", file=sys.stderr)
227 return
228
229 last_check = time.monotonic()
230 try:
231 while True:
232 for symlink, (resolved, fh) in list(tracked.items()):
233 line = fh.readline()
234 while line:
235 line = line.rstrip("\n")
236 if line:
237 parsed = parse_log_line(line)
238 current_service = parsed.service if parsed else None
239 if (
240 use_color
241 and current_service
242 and current_service != last_service
243 ):
244 if last_service is not None:
245 print(flush=True)
246 print(
247 _service_header(current_service, use_color), flush=True
248 )
249 last_service = current_service
250 print(line, flush=True)
251 line = fh.readline()
252
253 now = time.monotonic()
254 if now - last_check >= 2.0:
255 last_check = now
256 for symlink in list(tracked):
257 if symlink.is_symlink():
258 new_target = symlink.resolve()
259 old_target, fh = tracked[symlink]
260 if new_target != old_target:
261 fh.close()
262 try:
263 new_fh = open(new_target, "r", encoding="utf-8")
264 tracked[symlink] = (new_target, new_fh)
265 except OSError:
266 del tracked[symlink]
267 open_logs()
268
269 time.sleep(0.2)
270 except KeyboardInterrupt:
271 pass
272 finally:
273 for _, (_, fh) in tracked.items():
274 try:
275 fh.close()
276 except Exception:
277 pass
278
279
280def main() -> None:
281 parser = argparse.ArgumentParser(
282 description="View service health logs",
283 formatter_class=argparse.RawDescriptionHelpFormatter,
284 )
285 parser.add_argument(
286 "-c",
287 type=int,
288 default=5,
289 metavar="N",
290 help="number of lines per log (default: 5)",
291 )
292 parser.add_argument(
293 "-f",
294 action="store_true",
295 help="follow logs for new output",
296 )
297 parser.add_argument(
298 "--since",
299 type=parse_since,
300 metavar="TIME",
301 help="show lines since TIME (e.g., 30m, 2h, 4pm, 16:00)",
302 )
303 parser.add_argument(
304 "--service",
305 metavar="NAME",
306 help="filter to a specific service",
307 )
308 parser.add_argument(
309 "--grep",
310 type=compile_grep,
311 metavar="PATTERN",
312 help="filter lines matching Python regex PATTERN",
313 )
314 args = setup_cli(parser)
315
316 if args.f:
317 follow_logs(args)
318 else:
319 collect_and_print(args)
320
321
322if __name__ == "__main__":
323 main()