personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Tests for think.streams module."""
5
6import threading
7
8import pytest
9
10from think.streams import (
11 get_stream_state,
12 list_streams,
13 read_segment_stream,
14 rebuild_stream_state,
15 stream_name,
16 update_stream,
17 write_segment_stream,
18)
19
20# --- stream_name tests ---
21
22
23def test_stream_name_observer():
24 """Host only -> hostname."""
25 assert stream_name(host="archon") == "archon"
26
27
28def test_stream_name_observer_tmux():
29 """Host + qualifier -> host.tmux."""
30 assert stream_name(host="archon", qualifier="tmux") == "archon.tmux"
31
32
33def test_stream_name_observer():
34 """Observer name -> observer name."""
35 assert stream_name(observer="laptop") == "laptop"
36
37
38def test_stream_name_import_apple():
39 """import_source='apple' -> import.apple."""
40 assert stream_name(import_source="apple") == "import.apple"
41
42
43def test_stream_name_import_text():
44 """import_source='text' -> import.text."""
45 assert stream_name(import_source="text") == "import.text"
46
47
48def test_stream_name_sanitization():
49 """Spaces, slashes, uppercase are normalized."""
50 assert stream_name(host="My Host") == "my-host"
51 assert stream_name(host="FOO/BAR") == "foo-bar"
52 assert stream_name(host=" ARCHON ") == "archon"
53 assert stream_name(observer="My Laptop") == "my-laptop"
54
55
56def test_stream_name_hostname_stripping():
57 """Domain suffixes are stripped from hostnames and observer names."""
58 # .local, .home, .lan etc — keep only first label
59 assert stream_name(host="ja1r.local") == "ja1r"
60 assert stream_name(host="archon.home") == "archon"
61 assert stream_name(host="server.corp.example.com") == "server"
62 assert stream_name(observer="phone.local") == "phone"
63
64 # With qualifier — dot is for qualifier only
65 assert stream_name(host="ja1r.local", qualifier="tmux") == "ja1r.tmux"
66
67 # Simple hostnames unchanged
68 assert stream_name(host="archon") == "archon"
69 assert stream_name(observer="laptop") == "laptop"
70
71 # IP addresses become dash-separated
72 assert stream_name(host="192.168.1.1") == "192-168-1-1"
73 assert stream_name(host="10.0.0.1") == "10-0-0-1"
74
75
76def test_stream_name_validation():
77 """Empty/invalid raises ValueError."""
78 with pytest.raises(ValueError):
79 stream_name() # No source
80
81 with pytest.raises(ValueError):
82 stream_name(host="") # Empty after strip
83
84 with pytest.raises(ValueError):
85 stream_name(host=" ") # Whitespace only
86
87
88# --- update_stream tests ---
89
90
91def test_update_stream_first_segment(tmp_path, monkeypatch):
92 """First segment creates state, prev=None, seq=1."""
93 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
94
95 result = update_stream("archon", "20250119", "142500_300", type="observer")
96
97 assert result["prev_day"] is None
98 assert result["prev_segment"] is None
99 assert result["seq"] == 1
100
101 # State file should exist
102 state = get_stream_state("archon")
103 assert state is not None
104 assert state["name"] == "archon"
105 assert state["type"] == "observer"
106 assert state["last_day"] == "20250119"
107 assert state["last_segment"] == "142500_300"
108 assert state["seq"] == 1
109
110
111def test_update_stream_subsequent(tmp_path, monkeypatch):
112 """Subsequent segments increment seq and return correct prev."""
113 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
114
115 update_stream("archon", "20250119", "142500_300", type="observer")
116 result = update_stream("archon", "20250119", "143000_300")
117
118 assert result["prev_day"] == "20250119"
119 assert result["prev_segment"] == "142500_300"
120 assert result["seq"] == 2
121
122 state = get_stream_state("archon")
123 assert state["seq"] == 2
124 assert state["last_segment"] == "143000_300"
125
126
127def test_update_stream_cross_day(tmp_path, monkeypatch):
128 """Prev points to different day when crossing midnight."""
129 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
130
131 update_stream("archon", "20250119", "235500_300")
132 result = update_stream("archon", "20250120", "000000_300")
133
134 assert result["prev_day"] == "20250119"
135 assert result["prev_segment"] == "235500_300"
136 assert result["seq"] == 2
137
138
139# --- write/read segment stream tests ---
140
141
142def test_write_read_segment_stream(tmp_path):
143 """Round-trip write/read stream.json."""
144 seg_dir = tmp_path / "20250119" / "default" / "142500_300"
145 seg_dir.mkdir(parents=True)
146
147 write_segment_stream(seg_dir, "archon", "20250119", "142000_300", 5)
148
149 marker = read_segment_stream(seg_dir)
150 assert marker is not None
151 assert marker["stream"] == "archon"
152 assert marker["prev_day"] == "20250119"
153 assert marker["prev_segment"] == "142000_300"
154 assert marker["seq"] == 5
155
156
157def test_write_segment_stream_first(tmp_path):
158 """First segment has None prev values."""
159 seg_dir = tmp_path / "20250119" / "default" / "142500_300"
160 seg_dir.mkdir(parents=True)
161
162 write_segment_stream(seg_dir, "archon", None, None, 1)
163
164 marker = read_segment_stream(seg_dir)
165 assert marker["prev_day"] is None
166 assert marker["prev_segment"] is None
167 assert marker["seq"] == 1
168
169
170def test_read_segment_stream_missing(tmp_path):
171 """Returns None for pre-stream segments."""
172 seg_dir = tmp_path / "20250119" / "default" / "142500_300"
173 seg_dir.mkdir(parents=True)
174
175 assert read_segment_stream(seg_dir) is None
176
177
178# --- list_streams tests ---
179
180
181def test_list_streams(tmp_path, monkeypatch):
182 """Discovers all stream state files."""
183 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
184
185 update_stream("archon", "20250119", "142500_300", type="observer")
186 update_stream("laptop", "20250119", "142500_300", type="observer")
187 update_stream("import.apple", "20250119", "100000_300", type="import")
188
189 streams = list_streams()
190 names = [s["name"] for s in streams]
191 assert "archon" in names
192 assert "laptop" in names
193 assert "import.apple" in names
194 assert len(streams) == 3
195
196
197# --- rebuild_stream_state tests ---
198
199
200def test_rebuild_stream_state(tmp_path, monkeypatch):
201 """Reconstructs state from segment markers."""
202 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
203
204 # Create segment dirs with stream markers under default stream
205 day_dir = tmp_path / "20250119"
206 seg1 = day_dir / "default" / "142500_300"
207 seg2 = day_dir / "default" / "143000_300"
208 seg1.mkdir(parents=True)
209 seg2.mkdir(parents=True)
210
211 write_segment_stream(seg1, "archon", None, None, 1)
212 write_segment_stream(seg2, "archon", "20250119", "142500_300", 2)
213
214 # Delete stream state files to simulate corruption
215 streams_dir = tmp_path / "streams"
216 if streams_dir.exists():
217 for f in streams_dir.glob("*.json"):
218 f.unlink()
219
220 # Rebuild
221 summary = rebuild_stream_state()
222 assert "archon" in summary["rebuilt"]
223 assert summary["segments_scanned"] == 2
224
225 # Verify rebuilt state
226 state = get_stream_state("archon")
227 assert state is not None
228 assert state["seq"] == 2
229 assert state["last_segment"] == "143000_300"
230
231
232# --- atomicity test ---
233
234
235def test_update_stream_atomicity(tmp_path, monkeypatch):
236 """Concurrent writes don't corrupt state file."""
237 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
238
239 errors = []
240
241 def writer(stream_id):
242 try:
243 for i in range(10):
244 update_stream("archon", "20250119", f"{140000 + i}_300")
245 except Exception as e:
246 errors.append(e)
247
248 threads = [threading.Thread(target=writer, args=(i,)) for i in range(4)]
249 for t in threads:
250 t.start()
251 for t in threads:
252 t.join()
253
254 assert not errors, f"Errors during concurrent writes: {errors}"
255
256 # State file should be valid JSON
257 state = get_stream_state("archon")
258 assert state is not None
259 assert state["seq"] > 0
260 assert state["name"] == "archon"