personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Tests for observer app event handlers."""
5
6from __future__ import annotations
7
8import json
9
10import pytest
11
12from apps.events import EventContext
13from apps.observer.events import handle_observed, handle_transferred
14
15
16@pytest.fixture
17def observer_journal(tmp_path, monkeypatch):
18 """Create a temporary journal with a observer registered."""
19 from convey import state
20
21 journal = tmp_path / "journal"
22 journal.mkdir()
23
24 # Set convey state (used by apps.utils for storage paths)
25 monkeypatch.setattr(state, "journal_root", str(journal))
26
27 # Create observers directory
28 observers_dir = journal / "apps" / "observer" / "observers"
29 observers_dir.mkdir(parents=True)
30
31 # Create a test observer
32 observer_data = {
33 "key": "testkey123456789abcdef",
34 "name": "test-observer",
35 "created_at": 1704312000000,
36 "last_seen": None,
37 "last_segment": None,
38 "enabled": True,
39 "stats": {
40 "segments_received": 5,
41 "bytes_received": 1024,
42 },
43 }
44 observer_path = observers_dir / "testkey1.json"
45 with open(observer_path, "w") as f:
46 json.dump(observer_data, f)
47
48 class Env:
49 def __init__(self):
50 self.journal = journal
51 self.observers_dir = observers_dir
52 self.observer_path = observer_path
53
54 return Env()
55
56
57class TestHandleObserved:
58 """Tests for handle_observed event handler."""
59
60 def test_records_observed_for_observer(self, observer_journal):
61 """Handler records observed status for observer segment."""
62 ctx = EventContext(
63 msg={
64 "tract": "observe",
65 "event": "observed",
66 "observer": "test-observer",
67 "segment": "120000_300",
68 "day": "20250103",
69 },
70 app="observer",
71 tract="observe",
72 event="observed",
73 )
74
75 handle_observed(ctx)
76
77 # Check history was written
78 hist_path = (
79 observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl"
80 )
81 assert hist_path.exists()
82
83 with open(hist_path) as f:
84 record = json.loads(f.readline())
85
86 assert record["type"] == "observed"
87 assert record["segment"] == "120000_300"
88 assert "ts" in record
89
90 # Check stat was incremented
91 with open(observer_journal.observer_path) as f:
92 data = json.load(f)
93 assert data["stats"]["segments_observed"] == 1
94
95 def test_multiple_observed_events(self, observer_journal):
96 """Handler appends multiple observed records."""
97 for segment in ["120000_300", "130000_300", "140000_300"]:
98 ctx = EventContext(
99 msg={
100 "tract": "observe",
101 "event": "observed",
102 "observer": "test-observer",
103 "segment": segment,
104 "day": "20250103",
105 },
106 app="observer",
107 tract="observe",
108 event="observed",
109 )
110 handle_observed(ctx)
111
112 # Check all records written
113 hist_path = (
114 observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl"
115 )
116 with open(hist_path) as f:
117 lines = f.readlines()
118
119 assert len(lines) == 3
120 assert json.loads(lines[0])["segment"] == "120000_300"
121 assert json.loads(lines[1])["segment"] == "130000_300"
122 assert json.loads(lines[2])["segment"] == "140000_300"
123
124 # Check stat incremented 3 times
125 with open(observer_journal.observer_path) as f:
126 data = json.load(f)
127 assert data["stats"]["segments_observed"] == 3
128
129 def test_ignores_non_observer_events(self, observer_journal):
130 """Handler ignores events without observer field."""
131 ctx = EventContext(
132 msg={
133 "tract": "observe",
134 "event": "observed",
135 "segment": "120000_300",
136 "day": "20250103",
137 },
138 app="observer",
139 tract="observe",
140 event="observed",
141 )
142
143 handle_observed(ctx)
144
145 # No history should be created
146 hist_dir = observer_journal.observers_dir / "testkey1" / "hist"
147 assert not hist_dir.exists()
148
149 def test_ignores_unknown_observer(self, observer_journal):
150 """Handler ignores events for unknown observers."""
151 ctx = EventContext(
152 msg={
153 "tract": "observe",
154 "event": "observed",
155 "observer": "unknown-observer",
156 "segment": "120000_300",
157 "day": "20250103",
158 },
159 app="observer",
160 tract="observe",
161 event="observed",
162 )
163
164 handle_observed(ctx)
165
166 # No history should be created for unknown observer
167 hist_dir = observer_journal.observers_dir / "testkey1" / "hist"
168 assert not hist_dir.exists()
169
170 def test_handles_missing_segment(self, observer_journal):
171 """Handler handles events missing segment field."""
172 ctx = EventContext(
173 msg={
174 "tract": "observe",
175 "event": "observed",
176 "observer": "test-observer",
177 "day": "20250103",
178 },
179 app="observer",
180 tract="observe",
181 event="observed",
182 )
183
184 # Should not raise
185 handle_observed(ctx)
186
187 # No history should be created
188 hist_dir = observer_journal.observers_dir / "testkey1" / "hist"
189 assert not hist_dir.exists()
190
191 def test_handles_missing_day(self, observer_journal):
192 """Handler handles events missing day field."""
193 ctx = EventContext(
194 msg={
195 "tract": "observe",
196 "event": "observed",
197 "observer": "test-observer",
198 "segment": "120000_300",
199 },
200 app="observer",
201 tract="observe",
202 event="observed",
203 )
204
205 # Should not raise
206 handle_observed(ctx)
207
208 # No history should be created
209 hist_dir = observer_journal.observers_dir / "testkey1" / "hist"
210 assert not hist_dir.exists()
211
212 def test_handle_transferred(self, observer_journal, monkeypatch):
213 """Handler records transferred status, stats, and queues rescan."""
214 import think.callosum as callosum_module
215
216 calls = []
217 monkeypatch.setattr(
218 callosum_module,
219 "callosum_send",
220 lambda *a, **kw: calls.append((a, kw)) or True,
221 )
222
223 ctx = EventContext(
224 msg={
225 "tract": "observe",
226 "event": "transferred",
227 "observer": "test-observer",
228 "segment": "120000_300",
229 "day": "20250103",
230 },
231 app="observer",
232 tract="observe",
233 event="transferred",
234 )
235
236 handle_transferred(ctx)
237
238 hist_path = (
239 observer_journal.observers_dir / "testkey1" / "hist" / "20250103.jsonl"
240 )
241 assert hist_path.exists()
242 with open(hist_path) as f:
243 record = json.loads(f.readline())
244
245 assert record["type"] == "transferred"
246 assert record["segment"] == "120000_300"
247
248 with open(observer_journal.observer_path) as f:
249 data = json.load(f)
250 assert data["stats"]["segments_transferred"] == 1
251
252 assert calls == [
253 (
254 ("supervisor", "request"),
255 {"cmd": ["sol", "indexer", "--rescan"]},
256 )
257 ]