personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Tests for think.runner and logs tract integration."""
5
6import os
7
8import pytest
9
10from think.runner import ManagedProcess, run_task
11
12
13@pytest.fixture
14def journal_path(tmp_path):
15 """Set up a temporary journal path."""
16 journal = tmp_path / "journal"
17 journal.mkdir()
18 os.environ["_SOLSTONE_JOURNAL_OVERRIDE"] = str(journal)
19 yield journal
20 # Cleanup
21 if "_SOLSTONE_JOURNAL_OVERRIDE" in os.environ:
22 del os.environ["_SOLSTONE_JOURNAL_OVERRIDE"]
23
24
25def test_managed_process_has_ref_and_pid(journal_path, mock_callosum):
26 """Test that ManagedProcess exposes ref and pid."""
27 managed = ManagedProcess.spawn(["echo", "test"])
28
29 # Verify ref and pid are accessible
30 assert managed.ref is not None
31 assert isinstance(managed.ref, str)
32 assert managed.pid > 0
33 assert isinstance(managed.pid, int)
34 assert managed.name == "echo" # Derived from cmd[0]
35
36 # Wait and cleanup
37 managed.wait()
38 managed.cleanup()
39
40
41def test_managed_process_uses_ref_as_ref(journal_path, mock_callosum):
42 """Test that ref becomes the ref when provided."""
43 ref = "1730476800123"
44 managed = ManagedProcess.spawn(["echo", "test"], ref=ref)
45
46 # Verify ref matches ref
47 assert managed.ref == ref
48 assert managed.name == "echo"
49
50 # Wait and cleanup
51 managed.wait()
52 managed.cleanup()
53
54
55def test_logs_tract_exec_event(journal_path, mock_callosum):
56 """Test that exec event is emitted when process starts."""
57 from think.callosum import CallosumConnection
58
59 received = []
60 listener = CallosumConnection()
61 listener.start(callback=lambda msg: received.append(msg))
62
63 # Spawn process
64 managed = ManagedProcess.spawn(["echo", "hello"])
65
66 # Find exec event
67 exec_events = [msg for msg in received if msg.get("event") == "exec"]
68 assert len(exec_events) >= 1
69
70 exec_event = exec_events[0]
71 assert exec_event["tract"] == "logs"
72 assert exec_event["event"] == "exec"
73 assert exec_event["ref"] == managed.ref
74 assert exec_event["name"] == "echo"
75 assert exec_event["pid"] == managed.pid
76 assert exec_event["cmd"] == ["echo", "hello"]
77 assert "log_path" in exec_event
78
79 # Wait and cleanup
80 managed.wait()
81 managed.cleanup()
82 listener.stop()
83
84
85def test_logs_tract_line_event(journal_path, mock_callosum):
86 """Test that line events are emitted for stdout/stderr."""
87 import think.callosum
88
89 received = []
90 listener = think.callosum.CallosumConnection()
91 listener.start(callback=lambda msg: received.append(msg))
92
93 # Spawn process that outputs text
94 managed = ManagedProcess.spawn(["echo", "hello logs tract"])
95
96 # Wait for process and cleanup threads before checking events
97 managed.wait()
98 managed.cleanup()
99
100 # Find line events
101 line_events = [msg for msg in received if msg.get("event") == "line"]
102 assert len(line_events) >= 1
103
104 # Verify line event structure
105 line_event = line_events[0]
106 assert line_event["tract"] == "logs"
107 assert line_event["event"] == "line"
108 assert line_event["ref"] == managed.ref
109 assert line_event["name"] == "echo"
110 assert line_event["pid"] == managed.pid
111 assert line_event["stream"] in ["stdout", "stderr"]
112 assert "line" in line_event
113 assert "hello logs tract" in line_event["line"]
114
115 # Stop listener
116 listener.stop()
117
118
119def test_logs_tract_exit_event(journal_path, mock_callosum):
120 """Test that exit event is emitted when process completes."""
121 from think.callosum import CallosumConnection
122
123 received = []
124 listener = CallosumConnection()
125 listener.start(callback=lambda msg: received.append(msg))
126
127 # Spawn and wait for process
128 managed = ManagedProcess.spawn(["echo", "test"])
129 managed.wait()
130 managed.cleanup()
131
132 # Find exit event
133 exit_events = [msg for msg in received if msg.get("event") == "exit"]
134 assert len(exit_events) >= 1
135
136 exit_event = exit_events[0]
137 assert exit_event["tract"] == "logs"
138 assert exit_event["event"] == "exit"
139 assert exit_event["ref"] == managed.ref
140 assert exit_event["name"] == "echo"
141 assert exit_event["pid"] == managed.pid
142 assert exit_event["exit_code"] == 0
143 assert "duration_ms" in exit_event
144 assert exit_event["duration_ms"] >= 0
145 assert exit_event["cmd"] == ["echo", "test"]
146 assert "log_path" in exit_event
147
148 listener.stop()
149
150
151def test_logs_tract_all_events_have_common_fields(journal_path, mock_callosum):
152 """Test that all logs tract events have process, name, and pid."""
153 from think.callosum import CallosumConnection
154
155 received = []
156 listener = CallosumConnection()
157 listener.start(callback=lambda msg: received.append(msg))
158
159 # Run a process
160 managed = ManagedProcess.spawn(["echo", "test"])
161 managed.wait()
162 managed.cleanup()
163
164 # Filter to only logs tract events
165 logs_events = [msg for msg in received if msg.get("tract") == "logs"]
166 assert len(logs_events) >= 3 # exec, line, exit
167
168 # Verify common fields in all events
169 for event in logs_events:
170 assert "ref" in event
171 assert "name" in event
172 assert "pid" in event
173 assert "ts" in event # Auto-added by Callosum
174 assert event["ref"] == managed.ref
175 assert event["name"] == "echo"
176 assert event["pid"] == managed.pid
177
178 listener.stop()
179
180
181def test_run_task_emits_logs_tract_events(journal_path, mock_callosum):
182 """Test that run_task function emits logs tract events."""
183 from think.callosum import CallosumConnection
184
185 received = []
186 listener = CallosumConnection()
187 listener.start(callback=lambda msg: received.append(msg))
188
189 # Run task
190 success, exit_code, log_path = run_task(["echo", "run_task test"])
191
192 # Verify success
193 assert success is True
194 assert exit_code == 0
195 assert log_path.exists()
196
197 # Verify events were emitted
198 logs_events = [msg for msg in received if msg.get("tract") == "logs"]
199 event_types = [msg["event"] for msg in logs_events]
200
201 assert "exec" in event_types
202 assert "line" in event_types
203 assert "exit" in event_types
204
205 listener.stop()
206
207
208def test_ref_links_to_task_tract(journal_path, mock_callosum):
209 """Test that providing ref links logs to task tract."""
210 from think.callosum import CallosumConnection
211
212 received = []
213 listener = CallosumConnection()
214 listener.start(callback=lambda msg: received.append(msg))
215
216 ref = "1730476800999"
217 managed = ManagedProcess.spawn(["echo", "linked"], ref=ref)
218 managed.wait()
219 managed.cleanup()
220
221 # Verify all logs events use ref as process
222 logs_events = [msg for msg in received if msg.get("tract") == "logs"]
223 assert len(logs_events) >= 3
224
225 for event in logs_events:
226 assert event["ref"] == ref
227
228 listener.stop()
229
230
231def test_error_exit_code_in_exit_event(journal_path, mock_callosum):
232 """Test that non-zero exit codes are captured in exit event."""
233 from think.callosum import CallosumConnection
234
235 received = []
236 listener = CallosumConnection()
237 listener.start(callback=lambda msg: received.append(msg))
238
239 # Run process that exits with error
240 managed = ManagedProcess.spawn(["sh", "-c", "exit 42"])
241 exit_code = managed.wait()
242 managed.cleanup()
243
244 # Verify exit code
245 assert exit_code == 42
246
247 # Find exit event
248 exit_events = [msg for msg in received if msg.get("event") == "exit"]
249 assert len(exit_events) >= 1
250
251 exit_event = exit_events[0]
252 assert exit_event["exit_code"] == 42
253
254 listener.stop()
255
256
257def test_process_creates_health_log(journal_path, mock_callosum):
258 """Test that process output is logged to health directory."""
259 managed = ManagedProcess.spawn(["echo", "logged output"])
260 ref = managed.ref
261 managed.wait()
262 managed.cleanup()
263
264 # Verify log file was created with {ref}_{name}.log format
265 from datetime import datetime
266
267 day = datetime.now().strftime("%Y%m%d")
268 log_path = journal_path / day / "health" / f"{ref}_echo.log"
269
270 assert log_path.exists()
271 content = log_path.read_text()
272 assert "logged output" in content
273
274 # Verify day-level symlink exists
275 day_symlink = journal_path / day / "health" / "echo.log"
276 assert day_symlink.is_symlink()
277 assert day_symlink.resolve() == log_path.resolve()
278
279 # Verify journal-level symlink exists
280 journal_symlink = journal_path / "health" / "echo.log"
281 assert journal_symlink.is_symlink()
282 assert journal_symlink.resolve() == log_path.resolve()
283
284
285def test_process_day_override(journal_path, mock_callosum):
286 """Test that day parameter overrides log directory placement."""
287 target_day = "20240101"
288 managed = ManagedProcess.spawn(["echo", "day test"], day=target_day)
289 ref = managed.ref
290 managed.wait()
291 managed.cleanup()
292
293 # Log should be in target day, not today
294 log_path = journal_path / target_day / "health" / f"{ref}_echo.log"
295 assert log_path.exists()
296 content = log_path.read_text()
297 assert "day test" in content
298
299 # Today's health directory should NOT have this log
300 from datetime import datetime
301
302 today = datetime.now().strftime("%Y%m%d")
303 if today != target_day:
304 today_log = journal_path / today / "health" / f"{ref}_echo.log"
305 assert not today_log.exists()
306
307 # Day-level symlink in target day
308 day_symlink = journal_path / target_day / "health" / "echo.log"
309 assert day_symlink.is_symlink()
310 assert day_symlink.resolve() == log_path.resolve()
311
312 # Journal-level symlink points to target day
313 journal_symlink = journal_path / "health" / "echo.log"
314 assert journal_symlink.is_symlink()
315 assert journal_symlink.resolve() == log_path.resolve()
316
317
318def test_run_task_day_override(journal_path, mock_callosum):
319 """Test that run_task passes day through to log placement."""
320 target_day = "20240201"
321 success, exit_code, log_path = run_task(["echo", "task day test"], day=target_day)
322
323 assert success
324 assert exit_code == 0
325 assert target_day in str(log_path)
326 assert log_path.exists()