personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Tests for observe/sync.py - sync service for remote uploads."""
5
6import signal
7import threading
8import time
9from unittest.mock import MagicMock, patch
10
11import pytest
12import requests
13
14
15@pytest.fixture
16def sync_journal(tmp_path):
17 """Create a temporary journal structure for sync tests.
18
19 Returns a dict with 'path' and 'day' keys.
20 """
21 from datetime import datetime
22
23 journal = tmp_path / "journal"
24 journal.mkdir()
25
26 # Use today's date so get_pending_segments finds it within days_back
27 day = datetime.now().strftime("%Y%m%d")
28 day_dir = journal / day
29 day_dir.mkdir()
30
31 # Create segment with files under default stream
32 segment = "120000_300"
33 stream_dir = day_dir / "default"
34 stream_dir.mkdir()
35 segment_dir = stream_dir / segment
36 segment_dir.mkdir()
37
38 audio_file = segment_dir / "audio.flac"
39 audio_file.write_bytes(b"audio data for testing")
40
41 video_file = segment_dir / "screen.webm"
42 video_file.write_bytes(b"video data for testing")
43
44 # Create health directory
45 health_dir = day_dir / "health"
46 health_dir.mkdir()
47
48 return {"path": journal, "day": day}
49
50
51def test_compute_file_sha256(sync_journal):
52 """Test SHA256 computation."""
53 from observe.utils import compute_file_sha256
54
55 journal = sync_journal["path"]
56 day = sync_journal["day"]
57 test_file = journal / day / "default" / "120000_300" / "audio.flac"
58 sha = compute_file_sha256(test_file)
59
60 # Just verify it's a valid SHA256 hex string
61 assert len(sha) == 64
62 assert all(c in "0123456789abcdef" for c in sha)
63
64
65def test_get_sync_state_path(sync_journal, monkeypatch):
66 """Test sync state path generation."""
67 from observe.sync import get_sync_state_path
68
69 journal = sync_journal["path"]
70 day = sync_journal["day"]
71 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
72
73 path = get_sync_state_path(day)
74 assert path == journal / day / "health" / "sync.jsonl"
75
76
77def test_append_and_load_sync_state(sync_journal, monkeypatch):
78 """Test appending and loading sync state records."""
79 from observe.sync import append_sync_record, load_sync_state
80
81 journal = sync_journal["path"]
82 day = sync_journal["day"]
83 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
84
85 # Initially empty
86 records = load_sync_state(day)
87 assert records == []
88
89 # Append a pending record
90 record1 = {
91 "ts": 1234567890000,
92 "segment": "120000_300",
93 "status": "pending",
94 "files": [{"name": "audio.flac", "sha256": "abc123"}],
95 }
96 append_sync_record(day, record1)
97
98 # Append a confirmed record
99 record2 = {
100 "ts": 1234567891000,
101 "segment": "120000_300",
102 "status": "confirmed",
103 }
104 append_sync_record(day, record2)
105
106 # Load and verify
107 records = load_sync_state(day)
108 assert len(records) == 2
109 assert records[0]["status"] == "pending"
110 assert records[1]["status"] == "confirmed"
111
112
113def test_get_pending_segments(sync_journal, monkeypatch):
114 """Test scanning for pending segments."""
115 from observe.sync import append_sync_record, get_pending_segments
116
117 journal = sync_journal["path"]
118 day = sync_journal["day"]
119 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
120
121 # Add pending segment
122 append_sync_record(
123 day,
124 {
125 "ts": 1234567890000,
126 "segment": "120000_300",
127 "status": "pending",
128 "files": [{"name": "audio.flac", "sha256": "abc123"}],
129 },
130 )
131
132 # Add another pending segment
133 segment2_dir = journal / day / "default" / "130000_300"
134 segment2_dir.mkdir(parents=True)
135 append_sync_record(
136 day,
137 {
138 "ts": 1234567890001,
139 "segment": "130000_300",
140 "status": "pending",
141 "files": [{"name": "audio.flac", "sha256": "def456"}],
142 },
143 )
144
145 # Add a confirmed segment (should not be returned)
146 append_sync_record(
147 day,
148 {
149 "ts": 1234567890002,
150 "segment": "140000_300",
151 "status": "pending",
152 "files": [{"name": "audio.flac", "sha256": "ghi789"}],
153 },
154 )
155 append_sync_record(
156 day,
157 {
158 "ts": 1234567890003,
159 "segment": "140000_300",
160 "status": "confirmed",
161 },
162 )
163
164 # Get pending
165 pending = get_pending_segments(days_back=7)
166
167 assert len(pending) == 2
168 segments = {p.segment for p in pending}
169 assert "120000_300" in segments
170 assert "130000_300" in segments
171 assert "140000_300" not in segments # Already confirmed
172
173
174def test_get_pending_segments_empty(sync_journal, monkeypatch):
175 """Test scanning when no pending segments exist."""
176 from observe.sync import get_pending_segments
177
178 journal = sync_journal["path"]
179 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
180
181 pending = get_pending_segments(days_back=7)
182 assert pending == []
183
184
185class TestSyncService:
186 """Tests for SyncService class."""
187
188 @pytest.fixture
189 def mock_remote_client(self):
190 """Create a mock RemoteClient."""
191 with patch("observe.sync.RemoteClient") as mock:
192 client = MagicMock()
193 client.session = MagicMock()
194 mock.return_value = client
195 yield client
196
197 @pytest.fixture
198 def mock_callosum(self):
199 """Create a mock CallosumConnection."""
200 with patch("observe.sync.CallosumConnection") as mock:
201 conn = MagicMock()
202 mock.return_value = conn
203 yield conn
204
205 def test_sync_service_init(self, sync_journal, monkeypatch):
206 """Test SyncService initialization."""
207 from observe.sync import SyncService
208
209 journal = sync_journal["path"]
210 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
211
212 service = SyncService(
213 remote_url="https://server/ingest/key",
214 days_back=7,
215 )
216
217 assert service.remote_url == "https://server/ingest/key"
218 assert service.days_back == 7
219
220 def test_check_confirmation_success(
221 self, sync_journal, monkeypatch, mock_remote_client, mock_callosum
222 ):
223 """Test successful sha256 confirmation check."""
224 from observe.sync import SyncService
225
226 journal = sync_journal["path"]
227 day = sync_journal["day"]
228 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
229
230 service = SyncService("https://server/ingest/key")
231 service._client = mock_remote_client
232
233 # Mock segments endpoint response
234 mock_response = MagicMock()
235 mock_response.status_code = 200
236 mock_response.json.return_value = [
237 {
238 "key": "120000_300",
239 "files": [
240 {"name": "audio.flac", "sha256": "abc123", "size": 100},
241 {"name": "screen.webm", "sha256": "def456", "size": 200},
242 ],
243 }
244 ]
245 mock_remote_client.session.get.return_value = mock_response
246
247 # Check with matching sha256s
248 result = service._check_confirmation(
249 day,
250 "120000_300",
251 {"audio.flac": "abc123", "screen.webm": "def456"},
252 )
253
254 assert result is True
255 mock_remote_client.session.get.assert_called_once()
256
257 def test_check_confirmation_mismatch(
258 self, sync_journal, monkeypatch, mock_remote_client, mock_callosum
259 ):
260 """Test sha256 mismatch returns False."""
261 from observe.sync import SyncService
262
263 journal = sync_journal["path"]
264 day = sync_journal["day"]
265 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
266
267 service = SyncService("https://server/ingest/key")
268 service._client = mock_remote_client
269
270 # Mock response with wrong sha256
271 mock_response = MagicMock()
272 mock_response.status_code = 200
273 mock_response.json.return_value = [
274 {
275 "key": "120000_300",
276 "files": [
277 {"name": "audio.flac", "sha256": "wrong_hash", "size": 100},
278 ],
279 }
280 ]
281 mock_remote_client.session.get.return_value = mock_response
282
283 result = service._check_confirmation(
284 day,
285 "120000_300",
286 {"audio.flac": "abc123"},
287 )
288
289 assert result is False
290
291 def test_check_confirmation_segment_not_found(
292 self, sync_journal, monkeypatch, mock_remote_client, mock_callosum
293 ):
294 """Test segment not in response returns False."""
295 from observe.sync import SyncService
296
297 journal = sync_journal["path"]
298 day = sync_journal["day"]
299 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
300
301 service = SyncService("https://server/ingest/key")
302 service._client = mock_remote_client
303
304 # Mock empty response
305 mock_response = MagicMock()
306 mock_response.status_code = 200
307 mock_response.json.return_value = []
308 mock_remote_client.session.get.return_value = mock_response
309
310 result = service._check_confirmation(
311 day,
312 "120000_300",
313 {"audio.flac": "abc123"},
314 )
315
316 assert result is False
317
318 def test_cleanup_segment(self, sync_journal, monkeypatch):
319 """Test segment cleanup deletes files."""
320 from observe.sync import SyncService
321
322 journal = sync_journal["path"]
323 day = sync_journal["day"]
324 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
325
326 service = SyncService("https://server/ingest/key")
327
328 segment_dir = journal / day / "default" / "120000_300"
329 audio_file = segment_dir / "audio.flac"
330 video_file = segment_dir / "screen.webm"
331
332 # Verify files exist
333 assert audio_file.exists()
334 assert video_file.exists()
335
336 # Cleanup
337 service._cleanup_segment(segment_dir, [audio_file, video_file])
338
339 # Files should be deleted
340 assert not audio_file.exists()
341 assert not video_file.exists()
342 # Directory should be removed if empty
343 assert not segment_dir.exists()
344
345 def test_handle_observing_message(
346 self, sync_journal, monkeypatch, mock_remote_client, mock_callosum
347 ):
348 """Test handling observe.observing message."""
349 from observe.sync import SyncService, load_sync_state
350
351 journal = sync_journal["path"]
352 day = sync_journal["day"]
353 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
354
355 service = SyncService("https://server/ingest/key")
356 service._callosum = mock_callosum
357
358 # Simulate observing message with metadata
359 message = {
360 "tract": "observe",
361 "event": "observing",
362 "day": day,
363 "segment": "120000_300",
364 "files": ["audio.flac", "screen.webm"],
365 "host": "testhost",
366 "platform": "linux",
367 "stream": "default",
368 "meta": {"facet": "work"},
369 }
370
371 service._handle_message(message)
372
373 # Check pending record was written with metadata
374 records = load_sync_state(day)
375 assert len(records) == 1
376 assert records[0]["status"] == "pending"
377 assert records[0]["segment"] == "120000_300"
378 assert len(records[0]["files"]) == 2
379 # Verify metadata was extracted and merged
380 assert records[0]["meta"]["host"] == "testhost"
381 assert records[0]["meta"]["platform"] == "linux"
382 assert records[0]["meta"]["facet"] == "work"
383
384 # Check segment was queued with metadata
385 assert service._queue.qsize() == 1
386 seg_info = service._queue.get_nowait()
387 assert seg_info.meta["host"] == "testhost"
388 assert seg_info.meta["facet"] == "work"
389
390
391def test_sync_service_startup_with_pending(sync_journal, monkeypatch):
392 """Test that startup loads pending segments into the queue with metadata."""
393 from observe.sync import SyncService, append_sync_record
394
395 journal = sync_journal["path"]
396 day = sync_journal["day"]
397 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
398
399 # Add pending segment with metadata
400 append_sync_record(
401 day,
402 {
403 "ts": 1234567890000,
404 "segment": "120000_300",
405 "status": "pending",
406 "files": [{"name": "audio.flac", "sha256": "abc123"}],
407 "meta": {"host": "remote-host", "platform": "darwin"},
408 },
409 )
410
411 with patch("observe.sync.RemoteClient"), patch("observe.sync.CallosumConnection"):
412 service = SyncService("https://server/ingest/key")
413 # Replace worker with no-op so thread exits immediately
414 service._sync_worker = lambda: None
415 service.start()
416
417 # Pending segment should have been queued with metadata
418 assert service._queue.qsize() == 1
419 seg_info = service._queue.get_nowait()
420 assert seg_info.segment == "120000_300"
421 assert seg_info.day == day
422 assert seg_info.meta["host"] == "remote-host"
423 assert seg_info.meta["platform"] == "darwin"
424
425 service.stop()
426
427
428def test_process_segment_skips_upload_if_already_confirmed(sync_journal, monkeypatch):
429 """Test that segment already on server is skipped without upload."""
430 from observe.sync import SegmentInfo, SyncService, UploadResult
431
432 journal = sync_journal["path"]
433 day = sync_journal["day"]
434 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
435
436 # Create SegmentInfo
437 seg_info = SegmentInfo(
438 day=day,
439 segment="120000_300",
440 files=[
441 {"name": "audio.flac", "sha256": "abc123"},
442 {"name": "screen.webm", "sha256": "def456"},
443 ],
444 meta={"stream": "default"},
445 )
446
447 with patch("observe.sync.CallosumConnection") as mock_callosum_class:
448 mock_callosum = MagicMock()
449 mock_callosum_class.return_value = mock_callosum
450
451 with patch("observe.sync.RemoteClient") as mock_client_class:
452 mock_client = MagicMock()
453 mock_session = MagicMock()
454
455 # Simulate server already has the segment with matching SHA256
456 server_response = MagicMock()
457 server_response.status_code = 200
458 server_response.json.return_value = [
459 {
460 "key": "120000_300",
461 "files": [
462 {"name": "audio.flac", "sha256": "abc123"},
463 {"name": "screen.webm", "sha256": "def456"},
464 ],
465 }
466 ]
467 mock_session.get.return_value = server_response
468 mock_client.session = mock_session
469 mock_client.upload_segment = MagicMock(return_value=UploadResult(True))
470 mock_client_class.return_value = mock_client
471
472 service = SyncService("https://server/ingest/key")
473
474 # Call _process_segment directly (internal method)
475 service._process_segment(seg_info)
476
477 # Upload should NOT have been called (already confirmed)
478 mock_client.upload_segment.assert_not_called()
479
480
481def test_process_segment_uploads_if_not_on_server(sync_journal, monkeypatch):
482 """Test that segment not on server is uploaded."""
483 from observe.sync import SegmentInfo, SyncService, UploadResult
484
485 journal = sync_journal["path"]
486 day = sync_journal["day"]
487 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
488
489 seg_info = SegmentInfo(
490 day=day,
491 segment="120000_300",
492 files=[
493 {"name": "audio.flac", "sha256": "abc123"},
494 ],
495 meta={"stream": "default"},
496 )
497
498 with patch("observe.sync.CallosumConnection") as mock_callosum_class:
499 mock_callosum = MagicMock()
500 mock_callosum_class.return_value = mock_callosum
501
502 with patch("observe.sync.RemoteClient") as mock_client_class:
503 mock_client = MagicMock()
504 mock_session = MagicMock()
505
506 # First call: server doesn't have segment (pre-check)
507 # Second call: server has segment (post-upload confirm)
508 responses = [
509 MagicMock(status_code=200, json=MagicMock(return_value=[])),
510 MagicMock(
511 status_code=200,
512 json=MagicMock(
513 return_value=[
514 {
515 "key": "120000_300",
516 "files": [{"name": "audio.flac", "sha256": "abc123"}],
517 }
518 ]
519 ),
520 ),
521 ]
522 mock_session.get.side_effect = responses
523 mock_client.session = mock_session
524 mock_client.upload_segment = MagicMock(return_value=UploadResult(True))
525 mock_client_class.return_value = mock_client
526
527 service = SyncService("https://server/ingest/key")
528 service._process_segment(seg_info)
529
530 # Upload SHOULD have been called
531 mock_client.upload_segment.assert_called_once()
532
533
534def test_process_segment_passes_metadata_to_upload(sync_journal, monkeypatch):
535 """Test that metadata is passed through to upload_segment call."""
536 from observe.sync import SegmentInfo, SyncService, UploadResult
537
538 journal = sync_journal["path"]
539 day = sync_journal["day"]
540 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
541
542 # Create SegmentInfo with metadata
543 seg_info = SegmentInfo(
544 day=day,
545 segment="120000_300",
546 files=[
547 {"name": "audio.flac", "sha256": "abc123"},
548 ],
549 meta={
550 "host": "laptop",
551 "platform": "linux",
552 "facet": "meetings",
553 "stream": "default",
554 },
555 )
556
557 with patch("observe.sync.CallosumConnection") as mock_callosum_class:
558 mock_callosum = MagicMock()
559 mock_callosum_class.return_value = mock_callosum
560
561 with patch("observe.sync.RemoteClient") as mock_client_class:
562 mock_client = MagicMock()
563 mock_session = MagicMock()
564
565 # First call: server doesn't have segment (pre-check)
566 # Second call: server has segment (post-upload confirm)
567 responses = [
568 MagicMock(status_code=200, json=MagicMock(return_value=[])),
569 MagicMock(
570 status_code=200,
571 json=MagicMock(
572 return_value=[
573 {
574 "key": "120000_300",
575 "files": [{"name": "audio.flac", "sha256": "abc123"}],
576 }
577 ]
578 ),
579 ),
580 ]
581 mock_session.get.side_effect = responses
582 mock_client.session = mock_session
583 mock_client.upload_segment = MagicMock(return_value=UploadResult(True))
584 mock_client_class.return_value = mock_client
585
586 service = SyncService("https://server/ingest/key")
587 service._process_segment(seg_info)
588
589 # Verify upload was called with metadata
590 mock_client.upload_segment.assert_called_once()
591 call_kwargs = mock_client.upload_segment.call_args.kwargs
592 assert call_kwargs["meta"] == {
593 "host": "laptop",
594 "platform": "linux",
595 "facet": "meetings",
596 "stream": "default",
597 }
598
599
600def test_handle_message_skips_zero_byte_files(sync_journal, monkeypatch):
601 """Test that 0-byte files are skipped during message handling."""
602 from observe.sync import SyncService, load_sync_state
603
604 journal = sync_journal["path"]
605 day = sync_journal["day"]
606 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
607
608 # Create segment directory with mixed files
609 seg_dir = journal / day / "default" / "120000_300"
610 seg_dir.mkdir(parents=True, exist_ok=True)
611 (seg_dir / "audio.flac").write_bytes(b"real audio data")
612 (seg_dir / "screen.webm").write_bytes(b"") # 0-byte
613
614 with patch("observe.sync.RemoteClient"), patch("observe.sync.CallosumConnection"):
615 service = SyncService("https://server/ingest/key")
616
617 message = {
618 "tract": "observe",
619 "event": "observing",
620 "day": day,
621 "segment": "120000_300",
622 "files": ["audio.flac", "screen.webm"],
623 "stream": "default",
624 }
625 service._handle_message(message)
626
627 # Only valid file should be queued
628 assert service._queue.qsize() == 1
629 seg_info = service._queue.get_nowait()
630 assert len(seg_info.files) == 1
631 assert seg_info.files[0]["name"] == "audio.flac"
632
633 # Pending record should only have 1 file
634 records = load_sync_state(day)
635 assert len(records) == 1
636 assert len(records[0]["files"]) == 1
637
638
639def test_handle_message_skips_all_zero_byte_files(sync_journal, monkeypatch):
640 """Test that segment is not queued when all files are 0-byte."""
641 from observe.sync import SyncService, load_sync_state
642
643 journal = sync_journal["path"]
644 day = sync_journal["day"]
645 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
646
647 # Create segment directory with only 0-byte files
648 seg_dir = journal / day / "default" / "120000_300"
649 seg_dir.mkdir(parents=True, exist_ok=True)
650 (seg_dir / "audio.flac").write_bytes(b"")
651 (seg_dir / "screen.webm").write_bytes(b"")
652
653 with patch("observe.sync.RemoteClient"), patch("observe.sync.CallosumConnection"):
654 service = SyncService("https://server/ingest/key")
655
656 message = {
657 "tract": "observe",
658 "event": "observing",
659 "day": day,
660 "segment": "120000_300",
661 "files": ["audio.flac", "screen.webm"],
662 "stream": "default",
663 }
664 service._handle_message(message)
665
666 # No segment should be queued
667 assert service._queue.qsize() == 0
668
669 # No pending record should be written
670 records = load_sync_state(day)
671 assert len(records) == 0
672
673
674def test_process_segment_duplicate_skips_confirmation(sync_journal, monkeypatch):
675 """Test that duplicate upload skips confirmation polling."""
676 from observe.sync import SegmentInfo, SyncService, UploadResult, load_sync_state
677
678 journal = sync_journal["path"]
679 day = sync_journal["day"]
680 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
681
682 seg_info = SegmentInfo(
683 day=day,
684 segment="120000_300",
685 files=[
686 {"name": "audio.flac", "sha256": "abc123"},
687 ],
688 meta={"stream": "default"},
689 )
690
691 # Create the segment file so cleanup has something to work with
692 seg_dir = journal / day / "default" / "120000_300"
693 seg_dir.mkdir(parents=True, exist_ok=True)
694 (seg_dir / "audio.flac").write_bytes(b"audio data")
695
696 with patch("observe.sync.CallosumConnection") as mock_callosum_class:
697 mock_callosum = MagicMock()
698 mock_callosum_class.return_value = mock_callosum
699
700 with patch("observe.sync.RemoteClient") as mock_client_class:
701 mock_client = MagicMock()
702 mock_session = MagicMock()
703
704 # Pre-check: server doesn't have segment yet
705 mock_session.get.return_value = MagicMock(
706 status_code=200, json=MagicMock(return_value=[])
707 )
708 mock_client.session = mock_session
709 # Upload returns duplicate
710 mock_client.upload_segment = MagicMock(
711 return_value=UploadResult(True, duplicate=True)
712 )
713 mock_client_class.return_value = mock_client
714
715 service = SyncService("https://server/ingest/key")
716 service._process_segment(seg_info)
717
718 # Upload should have been called
719 mock_client.upload_segment.assert_called_once()
720
721 # Confirmation polling should NOT have happened
722 # (only 1 GET call for pre-check, no additional confirmation GETs)
723 assert mock_session.get.call_count == 1
724
725 # Confirmed record should have been written
726 records = load_sync_state(day)
727 confirmed = [r for r in records if r.get("status") == "confirmed"]
728 assert len(confirmed) == 1
729 assert confirmed[0]["segment"] == "120000_300"
730
731
732def test_sync_service_stop_drains_queue(sync_journal, monkeypatch):
733 """stop() should drain queued segments before returning."""
734 from observe.sync import SegmentInfo, SyncService
735
736 journal = sync_journal["path"]
737 day = sync_journal["day"]
738 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
739
740 with (
741 patch("observe.sync.RemoteClient"),
742 patch("observe.sync.CallosumConnection"),
743 ):
744 service = SyncService("https://server/ingest/key")
745 # Prevent start() from running the real worker
746 real_worker = service._sync_worker
747 service._sync_worker = lambda: None
748 service.start()
749
750 assert service._worker_thread is not None
751 service._worker_thread.join(timeout=1.0)
752 assert not service._worker_thread.is_alive()
753
754 service._queue.put(
755 SegmentInfo(
756 day=day, segment="120000_301", files=[{"name": "audio.flac"}], meta={}
757 )
758 )
759 service._queue.put(
760 SegmentInfo(
761 day=day, segment="120000_302", files=[{"name": "audio.flac"}], meta={}
762 )
763 )
764
765 service._sync_worker = real_worker
766 service._stop_event.clear()
767
768 with patch.object(service, "_process_segment") as mock_process:
769 mock_process.return_value = None
770
771 service._worker_thread = threading.Thread(
772 target=service._sync_worker, daemon=False
773 )
774 service._worker_thread.start()
775
776 service.stop()
777
778 assert mock_process.call_count == 2
779 service._worker_thread.join(timeout=1.0)
780
781
782def test_sync_service_stop_disconnects_callosum_first(sync_journal, monkeypatch):
783 """stop() should stop callosum before setting the stop event."""
784 from observe.sync import SyncService
785
786 journal = sync_journal["path"]
787 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
788
789 mock_callosum = MagicMock()
790 shutdown_order: list[str] = []
791 service = SyncService("https://server/ingest/key")
792 service._callosum = mock_callosum
793 service._worker_thread = None
794 service._stop_event = MagicMock()
795
796 service._callosum.stop.side_effect = lambda: shutdown_order.append("callosum")
797 service._stop_event.set.side_effect = lambda: shutdown_order.append("stop_event")
798
799 service.stop()
800
801 assert shutdown_order == ["callosum", "stop_event"]
802
803
804def test_sync_service_worker_thread_not_daemon(sync_journal, monkeypatch):
805 """Worker thread should be non-daemon."""
806 from observe.sync import SyncService
807
808 journal = sync_journal["path"]
809 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
810
811 with (
812 patch("observe.sync.get_pending_segments", return_value=[]),
813 patch("observe.sync.RemoteClient"),
814 patch("observe.sync.CallosumConnection"),
815 ):
816 service = SyncService("https://server/ingest/key")
817 service.start()
818
819 assert service._worker_thread is not None
820 assert service._worker_thread.daemon is False
821
822 service.stop()
823
824
825def test_sync_service_drain_completes_current_segment(sync_journal, monkeypatch):
826 """Current segment should finish when stop_event is set and draining is active."""
827 from observe.sync import SegmentInfo, SyncService, UploadResult
828
829 journal = sync_journal["path"]
830 day = sync_journal["day"]
831 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
832
833 service = SyncService("https://server/ingest/key")
834 service._client.upload_segment = MagicMock(return_value=UploadResult(True))
835 service._check_confirmation = MagicMock(side_effect=[False, True])
836
837 segment_info = SegmentInfo(
838 day=day,
839 segment="120000_300",
840 files=[{"name": "audio.flac", "sha256": "abc123"}],
841 meta={"stream": "default"},
842 )
843 service._queue.put(segment_info)
844
845 service._draining = True
846 service._stop_event.set()
847
848 def upload_side_effect(*_args, **_kwargs):
849 service._stop_event.set()
850 return UploadResult(True)
851
852 service._client.upload_segment.side_effect = upload_side_effect
853
854 service._process_segment(service._queue.get_nowait())
855
856 assert service._client.upload_segment.call_count == 1
857 assert not (journal / day / "default" / "120000_300" / "audio.flac").exists()
858
859
860def test_main_sigterm_triggers_stop(monkeypatch):
861 """main() should stop the service on SIGTERM."""
862 import argparse
863
864 import observe.sync
865
866 handlers: dict[int, object] = {}
867
868 def capture_signal(signum, handler):
869 handlers[signum] = handler
870 return handler
871
872 args = argparse.Namespace(remote="https://server/ingest/key", days_back=7)
873
874 with (
875 patch("observe.sync.setup_cli", return_value=args),
876 patch("observe.sync.SyncService") as mock_service,
877 patch("observe.sync.signal.signal", side_effect=capture_signal),
878 ):
879 service_instance = MagicMock()
880 mock_service.return_value = service_instance
881
882 thread = threading.Thread(target=observe.sync.main)
883 thread.start()
884
885 deadline = time.time() + 1.0
886 while signal.SIGTERM not in handlers and time.time() < deadline:
887 time.sleep(0.01)
888 assert signal.SIGTERM in handlers
889 assert signal.SIGINT in handlers
890
891 handlers[signal.SIGTERM](signal.SIGTERM, None)
892 thread.join(timeout=1.0)
893
894 assert thread.is_alive() is False
895 service_instance.start.assert_called_once()
896 service_instance.stop.assert_called_once()
897
898
899class TestCheckRemoteHealth:
900 """Tests for check_remote_health() function."""
901
902 def test_health_check_success(self):
903 """Test successful health check returns True with connection info."""
904 from observe.sync import check_remote_health
905
906 with patch("observe.sync.requests.get") as mock_get:
907 mock_response = MagicMock()
908 mock_response.status_code = 200
909 mock_get.return_value = mock_response
910
911 success, message = check_remote_health(
912 "http://server.local:8000/app/remote/ingest/abc12345xyz"
913 )
914
915 assert success is True
916 assert "server.local:8000" in message
917 assert "abc12345" in message # Key prefix
918
919 def test_health_check_invalid_key(self):
920 """Test 401 response returns False with appropriate message."""
921 from observe.sync import check_remote_health
922
923 with patch("observe.sync.requests.get") as mock_get:
924 mock_response = MagicMock()
925 mock_response.status_code = 401
926 mock_get.return_value = mock_response
927
928 success, message = check_remote_health(
929 "http://server.local:8000/app/remote/ingest/badkey"
930 )
931
932 assert success is False
933 assert "401" in message or "Invalid key" in message
934
935 def test_health_check_revoked_key(self):
936 """Test 403 response returns False with error details."""
937 from observe.sync import check_remote_health
938
939 with patch("observe.sync.requests.get") as mock_get:
940 mock_response = MagicMock()
941 mock_response.status_code = 403
942 mock_response.text = '{"error": "Remote revoked"}'
943 mock_response.json.return_value = {"error": "Remote revoked"}
944 mock_get.return_value = mock_response
945
946 success, message = check_remote_health(
947 "http://server.local:8000/app/remote/ingest/revokedkey"
948 )
949
950 assert success is False
951 assert "403" in message or "revoked" in message.lower()
952
953 def test_health_check_403_non_json_body(self):
954 """Test 403 with non-JSON body doesn't crash."""
955 from observe.sync import check_remote_health
956
957 with patch("observe.sync.requests.get") as mock_get:
958 mock_response = MagicMock()
959 mock_response.status_code = 403
960 mock_response.text = "Forbidden"
961 mock_response.json.side_effect = requests.exceptions.JSONDecodeError(
962 "", "", 0
963 )
964 mock_get.return_value = mock_response
965
966 success, message = check_remote_health(
967 "http://server.local:8000/app/remote/ingest/badkey"
968 )
969
970 assert success is False
971 assert "403" in message
972
973 def test_health_check_connection_refused(self):
974 """Test connection refused returns False with clear message."""
975 from observe.sync import check_remote_health
976
977 with patch("observe.sync.requests.get") as mock_get:
978 mock_get.side_effect = requests.exceptions.ConnectionError(
979 "Connection refused"
980 )
981
982 success, message = check_remote_health(
983 "http://server.local:8000/app/remote/ingest/key123"
984 )
985
986 assert success is False
987 assert "refused" in message.lower() or "connection" in message.lower()
988
989 def test_health_check_timeout(self):
990 """Test timeout returns False with timeout message."""
991 from observe.sync import check_remote_health
992
993 with patch("observe.sync.requests.get") as mock_get:
994 mock_get.side_effect = requests.exceptions.Timeout("timed out")
995
996 success, message = check_remote_health(
997 "http://server.local:8000/app/remote/ingest/key123",
998 timeout=5.0,
999 )
1000
1001 assert success is False
1002 assert "timeout" in message.lower()
1003
1004 def test_health_check_host_not_found(self):
1005 """Test DNS failure returns False with host not found message."""
1006 from observe.sync import check_remote_health
1007
1008 with patch("observe.sync.requests.get") as mock_get:
1009 mock_get.side_effect = requests.exceptions.ConnectionError(
1010 "Name or service not known"
1011 )
1012
1013 success, message = check_remote_health(
1014 "http://nonexistent.invalid/app/remote/ingest/key123"
1015 )
1016
1017 assert success is False
1018 assert "not found" in message.lower() or "connection" in message.lower()