personal memory agent
at main 260 lines 8.0 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Tests for think.streams module.""" 5 6import threading 7 8import pytest 9 10from think.streams import ( 11 get_stream_state, 12 list_streams, 13 read_segment_stream, 14 rebuild_stream_state, 15 stream_name, 16 update_stream, 17 write_segment_stream, 18) 19 20# --- stream_name tests --- 21 22 23def test_stream_name_observer(): 24 """Host only -> hostname.""" 25 assert stream_name(host="archon") == "archon" 26 27 28def test_stream_name_observer_tmux(): 29 """Host + qualifier -> host.tmux.""" 30 assert stream_name(host="archon", qualifier="tmux") == "archon.tmux" 31 32 33def test_stream_name_observer(): 34 """Observer name -> observer name.""" 35 assert stream_name(observer="laptop") == "laptop" 36 37 38def test_stream_name_import_apple(): 39 """import_source='apple' -> import.apple.""" 40 assert stream_name(import_source="apple") == "import.apple" 41 42 43def test_stream_name_import_text(): 44 """import_source='text' -> import.text.""" 45 assert stream_name(import_source="text") == "import.text" 46 47 48def test_stream_name_sanitization(): 49 """Spaces, slashes, uppercase are normalized.""" 50 assert stream_name(host="My Host") == "my-host" 51 assert stream_name(host="FOO/BAR") == "foo-bar" 52 assert stream_name(host=" ARCHON ") == "archon" 53 assert stream_name(observer="My Laptop") == "my-laptop" 54 55 56def test_stream_name_hostname_stripping(): 57 """Domain suffixes are stripped from hostnames and observer names.""" 58 # .local, .home, .lan etc — keep only first label 59 assert stream_name(host="ja1r.local") == "ja1r" 60 assert stream_name(host="archon.home") == "archon" 61 assert stream_name(host="server.corp.example.com") == "server" 62 assert stream_name(observer="phone.local") == "phone" 63 64 # With qualifier — dot is for qualifier only 65 assert stream_name(host="ja1r.local", qualifier="tmux") == "ja1r.tmux" 66 67 # Simple hostnames unchanged 68 assert stream_name(host="archon") == "archon" 69 assert stream_name(observer="laptop") == "laptop" 70 71 # IP addresses become dash-separated 72 assert stream_name(host="192.168.1.1") == "192-168-1-1" 73 assert stream_name(host="10.0.0.1") == "10-0-0-1" 74 75 76def test_stream_name_validation(): 77 """Empty/invalid raises ValueError.""" 78 with pytest.raises(ValueError): 79 stream_name() # No source 80 81 with pytest.raises(ValueError): 82 stream_name(host="") # Empty after strip 83 84 with pytest.raises(ValueError): 85 stream_name(host=" ") # Whitespace only 86 87 88# --- update_stream tests --- 89 90 91def test_update_stream_first_segment(tmp_path, monkeypatch): 92 """First segment creates state, prev=None, seq=1.""" 93 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 94 95 result = update_stream("archon", "20250119", "142500_300", type="observer") 96 97 assert result["prev_day"] is None 98 assert result["prev_segment"] is None 99 assert result["seq"] == 1 100 101 # State file should exist 102 state = get_stream_state("archon") 103 assert state is not None 104 assert state["name"] == "archon" 105 assert state["type"] == "observer" 106 assert state["last_day"] == "20250119" 107 assert state["last_segment"] == "142500_300" 108 assert state["seq"] == 1 109 110 111def test_update_stream_subsequent(tmp_path, monkeypatch): 112 """Subsequent segments increment seq and return correct prev.""" 113 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 114 115 update_stream("archon", "20250119", "142500_300", type="observer") 116 result = update_stream("archon", "20250119", "143000_300") 117 118 assert result["prev_day"] == "20250119" 119 assert result["prev_segment"] == "142500_300" 120 assert result["seq"] == 2 121 122 state = get_stream_state("archon") 123 assert state["seq"] == 2 124 assert state["last_segment"] == "143000_300" 125 126 127def test_update_stream_cross_day(tmp_path, monkeypatch): 128 """Prev points to different day when crossing midnight.""" 129 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 130 131 update_stream("archon", "20250119", "235500_300") 132 result = update_stream("archon", "20250120", "000000_300") 133 134 assert result["prev_day"] == "20250119" 135 assert result["prev_segment"] == "235500_300" 136 assert result["seq"] == 2 137 138 139# --- write/read segment stream tests --- 140 141 142def test_write_read_segment_stream(tmp_path): 143 """Round-trip write/read stream.json.""" 144 seg_dir = tmp_path / "20250119" / "default" / "142500_300" 145 seg_dir.mkdir(parents=True) 146 147 write_segment_stream(seg_dir, "archon", "20250119", "142000_300", 5) 148 149 marker = read_segment_stream(seg_dir) 150 assert marker is not None 151 assert marker["stream"] == "archon" 152 assert marker["prev_day"] == "20250119" 153 assert marker["prev_segment"] == "142000_300" 154 assert marker["seq"] == 5 155 156 157def test_write_segment_stream_first(tmp_path): 158 """First segment has None prev values.""" 159 seg_dir = tmp_path / "20250119" / "default" / "142500_300" 160 seg_dir.mkdir(parents=True) 161 162 write_segment_stream(seg_dir, "archon", None, None, 1) 163 164 marker = read_segment_stream(seg_dir) 165 assert marker["prev_day"] is None 166 assert marker["prev_segment"] is None 167 assert marker["seq"] == 1 168 169 170def test_read_segment_stream_missing(tmp_path): 171 """Returns None for pre-stream segments.""" 172 seg_dir = tmp_path / "20250119" / "default" / "142500_300" 173 seg_dir.mkdir(parents=True) 174 175 assert read_segment_stream(seg_dir) is None 176 177 178# --- list_streams tests --- 179 180 181def test_list_streams(tmp_path, monkeypatch): 182 """Discovers all stream state files.""" 183 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 184 185 update_stream("archon", "20250119", "142500_300", type="observer") 186 update_stream("laptop", "20250119", "142500_300", type="observer") 187 update_stream("import.apple", "20250119", "100000_300", type="import") 188 189 streams = list_streams() 190 names = [s["name"] for s in streams] 191 assert "archon" in names 192 assert "laptop" in names 193 assert "import.apple" in names 194 assert len(streams) == 3 195 196 197# --- rebuild_stream_state tests --- 198 199 200def test_rebuild_stream_state(tmp_path, monkeypatch): 201 """Reconstructs state from segment markers.""" 202 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 203 204 # Create segment dirs with stream markers under default stream 205 day_dir = tmp_path / "20250119" 206 seg1 = day_dir / "default" / "142500_300" 207 seg2 = day_dir / "default" / "143000_300" 208 seg1.mkdir(parents=True) 209 seg2.mkdir(parents=True) 210 211 write_segment_stream(seg1, "archon", None, None, 1) 212 write_segment_stream(seg2, "archon", "20250119", "142500_300", 2) 213 214 # Delete stream state files to simulate corruption 215 streams_dir = tmp_path / "streams" 216 if streams_dir.exists(): 217 for f in streams_dir.glob("*.json"): 218 f.unlink() 219 220 # Rebuild 221 summary = rebuild_stream_state() 222 assert "archon" in summary["rebuilt"] 223 assert summary["segments_scanned"] == 2 224 225 # Verify rebuilt state 226 state = get_stream_state("archon") 227 assert state is not None 228 assert state["seq"] == 2 229 assert state["last_segment"] == "143000_300" 230 231 232# --- atomicity test --- 233 234 235def test_update_stream_atomicity(tmp_path, monkeypatch): 236 """Concurrent writes don't corrupt state file.""" 237 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 238 239 errors = [] 240 241 def writer(stream_id): 242 try: 243 for i in range(10): 244 update_stream("archon", "20250119", f"{140000 + i}_300") 245 except Exception as e: 246 errors.append(e) 247 248 threads = [threading.Thread(target=writer, args=(i,)) for i in range(4)] 249 for t in threads: 250 t.start() 251 for t in threads: 252 t.join() 253 254 assert not errors, f"Errors during concurrent writes: {errors}" 255 256 # State file should be valid JSON 257 state = get_stream_state("archon") 258 assert state is not None 259 assert state["seq"] > 0 260 assert state["name"] == "archon"