personal memory agent
at main 257 lines 7.8 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Tests for observer app event handlers.""" 5 6from __future__ import annotations 7 8import json 9 10import pytest 11 12from apps.events import EventContext 13from apps.observer.events import handle_observed, handle_transferred 14 15 16@pytest.fixture 17def observer_journal(tmp_path, monkeypatch): 18 """Create a temporary journal with a observer registered.""" 19 from convey import state 20 21 journal = tmp_path / "journal" 22 journal.mkdir() 23 24 # Set convey state (used by apps.utils for storage paths) 25 monkeypatch.setattr(state, "journal_root", str(journal)) 26 27 # Create observers directory 28 observers_dir = journal / "apps" / "observer" / "observers" 29 observers_dir.mkdir(parents=True) 30 31 # Create a test observer 32 observer_data = { 33 "key": "testkey123456789abcdef", 34 "name": "test-observer", 35 "created_at": 1704312000000, 36 "last_seen": None, 37 "last_segment": None, 38 "enabled": True, 39 "stats": { 40 "segments_received": 5, 41 "bytes_received": 1024, 42 }, 43 } 44 observer_path = observers_dir / "testkey1.json" 45 with open(observer_path, "w") as f: 46 json.dump(observer_data, f) 47 48 class Env: 49 def __init__(self): 50 self.journal = journal 51 self.observers_dir = observers_dir 52 self.observer_path = observer_path 53 54 return Env() 55 56 57class TestHandleObserved: 58 """Tests for handle_observed event handler.""" 59 60 def test_records_observed_for_observer(self, observer_journal): 61 """Handler records observed status for observer segment.""" 62 ctx = EventContext( 63 msg={ 64 "tract": "observe", 65 "event": "observed", 66 "observer": "test-observer", 67 "segment": "120000_300", 68 "day": "20250103", 69 }, 70 app="observer", 71 tract="observe", 72 event="observed", 73 ) 74 75 handle_observed(ctx) 76 77 # Check history was written 78 hist_path = ( 79 observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl" 80 ) 81 assert hist_path.exists() 82 83 with open(hist_path) as f: 84 record = json.loads(f.readline()) 85 86 assert record["type"] == "observed" 87 assert record["segment"] == "120000_300" 88 assert "ts" in record 89 90 # Check stat was incremented 91 with open(observer_journal.observer_path) as f: 92 data = json.load(f) 93 assert data["stats"]["segments_observed"] == 1 94 95 def test_multiple_observed_events(self, observer_journal): 96 """Handler appends multiple observed records.""" 97 for segment in ["120000_300", "130000_300", "140000_300"]: 98 ctx = EventContext( 99 msg={ 100 "tract": "observe", 101 "event": "observed", 102 "observer": "test-observer", 103 "segment": segment, 104 "day": "20250103", 105 }, 106 app="observer", 107 tract="observe", 108 event="observed", 109 ) 110 handle_observed(ctx) 111 112 # Check all records written 113 hist_path = ( 114 observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl" 115 ) 116 with open(hist_path) as f: 117 lines = f.readlines() 118 119 assert len(lines) == 3 120 assert json.loads(lines[0])["segment"] == "120000_300" 121 assert json.loads(lines[1])["segment"] == "130000_300" 122 assert json.loads(lines[2])["segment"] == "140000_300" 123 124 # Check stat incremented 3 times 125 with open(observer_journal.observer_path) as f: 126 data = json.load(f) 127 assert data["stats"]["segments_observed"] == 3 128 129 def test_ignores_non_observer_events(self, observer_journal): 130 """Handler ignores events without observer field.""" 131 ctx = EventContext( 132 msg={ 133 "tract": "observe", 134 "event": "observed", 135 "segment": "120000_300", 136 "day": "20250103", 137 }, 138 app="observer", 139 tract="observe", 140 event="observed", 141 ) 142 143 handle_observed(ctx) 144 145 # No history should be created 146 hist_dir = observer_journal.observers_dir / "testkey1" / "hist" 147 assert not hist_dir.exists() 148 149 def test_ignores_unknown_observer(self, observer_journal): 150 """Handler ignores events for unknown observers.""" 151 ctx = EventContext( 152 msg={ 153 "tract": "observe", 154 "event": "observed", 155 "observer": "unknown-observer", 156 "segment": "120000_300", 157 "day": "20250103", 158 }, 159 app="observer", 160 tract="observe", 161 event="observed", 162 ) 163 164 handle_observed(ctx) 165 166 # No history should be created for unknown observer 167 hist_dir = observer_journal.observers_dir / "testkey1" / "hist" 168 assert not hist_dir.exists() 169 170 def test_handles_missing_segment(self, observer_journal): 171 """Handler handles events missing segment field.""" 172 ctx = EventContext( 173 msg={ 174 "tract": "observe", 175 "event": "observed", 176 "observer": "test-observer", 177 "day": "20250103", 178 }, 179 app="observer", 180 tract="observe", 181 event="observed", 182 ) 183 184 # Should not raise 185 handle_observed(ctx) 186 187 # No history should be created 188 hist_dir = observer_journal.observers_dir / "testkey1" / "hist" 189 assert not hist_dir.exists() 190 191 def test_handles_missing_day(self, observer_journal): 192 """Handler handles events missing day field.""" 193 ctx = EventContext( 194 msg={ 195 "tract": "observe", 196 "event": "observed", 197 "observer": "test-observer", 198 "segment": "120000_300", 199 }, 200 app="observer", 201 tract="observe", 202 event="observed", 203 ) 204 205 # Should not raise 206 handle_observed(ctx) 207 208 # No history should be created 209 hist_dir = observer_journal.observers_dir / "testkey1" / "hist" 210 assert not hist_dir.exists() 211 212 def test_handle_transferred(self, observer_journal, monkeypatch): 213 """Handler records transferred status, stats, and queues rescan.""" 214 import think.callosum as callosum_module 215 216 calls = [] 217 monkeypatch.setattr( 218 callosum_module, 219 "callosum_send", 220 lambda *a, **kw: calls.append((a, kw)) or True, 221 ) 222 223 ctx = EventContext( 224 msg={ 225 "tract": "observe", 226 "event": "transferred", 227 "observer": "test-observer", 228 "segment": "120000_300", 229 "day": "20250103", 230 }, 231 app="observer", 232 tract="observe", 233 event="transferred", 234 ) 235 236 handle_transferred(ctx) 237 238 hist_path = ( 239 observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl" 240 ) 241 assert hist_path.exists() 242 with open(hist_path) as f: 243 record = json.loads(f.readline()) 244 245 assert record["type"] == "transferred" 246 assert record["segment"] == "120000_300" 247 248 with open(observer_journal.observer_path) as f: 249 data = json.load(f) 250 assert data["stats"]["segments_transferred"] == 1 251 252 assert calls == [ 253 ( 254 ("supervisor", "request"), 255 {"cmd": ["sol", "indexer", "--rescan"]}, 256 ) 257 ]