personal memory agent
at main 1018 lines 35 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Tests for observe/sync.py - sync service for remote uploads.""" 5 6import signal 7import threading 8import time 9from unittest.mock import MagicMock, patch 10 11import pytest 12import requests 13 14 15@pytest.fixture 16def sync_journal(tmp_path): 17 """Create a temporary journal structure for sync tests. 18 19 Returns a dict with 'path' and 'day' keys. 20 """ 21 from datetime import datetime 22 23 journal = tmp_path / "journal" 24 journal.mkdir() 25 26 # Use today's date so get_pending_segments finds it within days_back 27 day = datetime.now().strftime("%Y%m%d") 28 day_dir = journal / day 29 day_dir.mkdir() 30 31 # Create segment with files under default stream 32 segment = "120000_300" 33 stream_dir = day_dir / "default" 34 stream_dir.mkdir() 35 segment_dir = stream_dir / segment 36 segment_dir.mkdir() 37 38 audio_file = segment_dir / "audio.flac" 39 audio_file.write_bytes(b"audio data for testing") 40 41 video_file = segment_dir / "screen.webm" 42 video_file.write_bytes(b"video data for testing") 43 44 # Create health directory 45 health_dir = day_dir / "health" 46 health_dir.mkdir() 47 48 return {"path": journal, "day": day} 49 50 51def test_compute_file_sha256(sync_journal): 52 """Test SHA256 computation.""" 53 from observe.utils import compute_file_sha256 54 55 journal = sync_journal["path"] 56 day = sync_journal["day"] 57 test_file = journal / day / "default" / "120000_300" / "audio.flac" 58 sha = compute_file_sha256(test_file) 59 60 # Just verify it's a valid SHA256 hex string 61 assert len(sha) == 64 62 assert all(c in "0123456789abcdef" for c in sha) 63 64 65def test_get_sync_state_path(sync_journal, monkeypatch): 66 """Test sync state path generation.""" 67 from observe.sync import get_sync_state_path 68 69 journal = sync_journal["path"] 70 day = sync_journal["day"] 71 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 72 73 path = get_sync_state_path(day) 74 assert path == journal / day / "health" / "sync.jsonl" 75 76 77def test_append_and_load_sync_state(sync_journal, monkeypatch): 78 """Test appending and loading sync state records.""" 79 from observe.sync import append_sync_record, load_sync_state 80 81 journal = sync_journal["path"] 82 day = sync_journal["day"] 83 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 84 85 # Initially empty 86 records = load_sync_state(day) 87 assert records == [] 88 89 # Append a pending record 90 record1 = { 91 "ts": 1234567890000, 92 "segment": "120000_300", 93 "status": "pending", 94 "files": [{"name": "audio.flac", "sha256": "abc123"}], 95 } 96 append_sync_record(day, record1) 97 98 # Append a confirmed record 99 record2 = { 100 "ts": 1234567891000, 101 "segment": "120000_300", 102 "status": "confirmed", 103 } 104 append_sync_record(day, record2) 105 106 # Load and verify 107 records = load_sync_state(day) 108 assert len(records) == 2 109 assert records[0]["status"] == "pending" 110 assert records[1]["status"] == "confirmed" 111 112 113def test_get_pending_segments(sync_journal, monkeypatch): 114 """Test scanning for pending segments.""" 115 from observe.sync import append_sync_record, get_pending_segments 116 117 journal = sync_journal["path"] 118 day = sync_journal["day"] 119 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 120 121 # Add pending segment 122 append_sync_record( 123 day, 124 { 125 "ts": 1234567890000, 126 "segment": "120000_300", 127 "status": "pending", 128 "files": [{"name": "audio.flac", "sha256": "abc123"}], 129 }, 130 ) 131 132 # Add another pending segment 133 segment2_dir = journal / day / "default" / "130000_300" 134 segment2_dir.mkdir(parents=True) 135 append_sync_record( 136 day, 137 { 138 "ts": 1234567890001, 139 "segment": "130000_300", 140 "status": "pending", 141 "files": [{"name": "audio.flac", "sha256": "def456"}], 142 }, 143 ) 144 145 # Add a confirmed segment (should not be returned) 146 append_sync_record( 147 day, 148 { 149 "ts": 1234567890002, 150 "segment": "140000_300", 151 "status": "pending", 152 "files": [{"name": "audio.flac", "sha256": "ghi789"}], 153 }, 154 ) 155 append_sync_record( 156 day, 157 { 158 "ts": 1234567890003, 159 "segment": "140000_300", 160 "status": "confirmed", 161 }, 162 ) 163 164 # Get pending 165 pending = get_pending_segments(days_back=7) 166 167 assert len(pending) == 2 168 segments = {p.segment for p in pending} 169 assert "120000_300" in segments 170 assert "130000_300" in segments 171 assert "140000_300" not in segments # Already confirmed 172 173 174def test_get_pending_segments_empty(sync_journal, monkeypatch): 175 """Test scanning when no pending segments exist.""" 176 from observe.sync import get_pending_segments 177 178 journal = sync_journal["path"] 179 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 180 181 pending = get_pending_segments(days_back=7) 182 assert pending == [] 183 184 185class TestSyncService: 186 """Tests for SyncService class.""" 187 188 @pytest.fixture 189 def mock_remote_client(self): 190 """Create a mock RemoteClient.""" 191 with patch("observe.sync.RemoteClient") as mock: 192 client = MagicMock() 193 client.session = MagicMock() 194 mock.return_value = client 195 yield client 196 197 @pytest.fixture 198 def mock_callosum(self): 199 """Create a mock CallosumConnection.""" 200 with patch("observe.sync.CallosumConnection") as mock: 201 conn = MagicMock() 202 mock.return_value = conn 203 yield conn 204 205 def test_sync_service_init(self, sync_journal, monkeypatch): 206 """Test SyncService initialization.""" 207 from observe.sync import SyncService 208 209 journal = sync_journal["path"] 210 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 211 212 service = SyncService( 213 remote_url="https://server/ingest/key", 214 days_back=7, 215 ) 216 217 assert service.remote_url == "https://server/ingest/key" 218 assert service.days_back == 7 219 220 def test_check_confirmation_success( 221 self, sync_journal, monkeypatch, mock_remote_client, mock_callosum 222 ): 223 """Test successful sha256 confirmation check.""" 224 from observe.sync import SyncService 225 226 journal = sync_journal["path"] 227 day = sync_journal["day"] 228 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 229 230 service = SyncService("https://server/ingest/key") 231 service._client = mock_remote_client 232 233 # Mock segments endpoint response 234 mock_response = MagicMock() 235 mock_response.status_code = 200 236 mock_response.json.return_value = [ 237 { 238 "key": "120000_300", 239 "files": [ 240 {"name": "audio.flac", "sha256": "abc123", "size": 100}, 241 {"name": "screen.webm", "sha256": "def456", "size": 200}, 242 ], 243 } 244 ] 245 mock_remote_client.session.get.return_value = mock_response 246 247 # Check with matching sha256s 248 result = service._check_confirmation( 249 day, 250 "120000_300", 251 {"audio.flac": "abc123", "screen.webm": "def456"}, 252 ) 253 254 assert result is True 255 mock_remote_client.session.get.assert_called_once() 256 257 def test_check_confirmation_mismatch( 258 self, sync_journal, monkeypatch, mock_remote_client, mock_callosum 259 ): 260 """Test sha256 mismatch returns False.""" 261 from observe.sync import SyncService 262 263 journal = sync_journal["path"] 264 day = sync_journal["day"] 265 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 266 267 service = SyncService("https://server/ingest/key") 268 service._client = mock_remote_client 269 270 # Mock response with wrong sha256 271 mock_response = MagicMock() 272 mock_response.status_code = 200 273 mock_response.json.return_value = [ 274 { 275 "key": "120000_300", 276 "files": [ 277 {"name": "audio.flac", "sha256": "wrong_hash", "size": 100}, 278 ], 279 } 280 ] 281 mock_remote_client.session.get.return_value = mock_response 282 283 result = service._check_confirmation( 284 day, 285 "120000_300", 286 {"audio.flac": "abc123"}, 287 ) 288 289 assert result is False 290 291 def test_check_confirmation_segment_not_found( 292 self, sync_journal, monkeypatch, mock_remote_client, mock_callosum 293 ): 294 """Test segment not in response returns False.""" 295 from observe.sync import SyncService 296 297 journal = sync_journal["path"] 298 day = sync_journal["day"] 299 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 300 301 service = SyncService("https://server/ingest/key") 302 service._client = mock_remote_client 303 304 # Mock empty response 305 mock_response = MagicMock() 306 mock_response.status_code = 200 307 mock_response.json.return_value = [] 308 mock_remote_client.session.get.return_value = mock_response 309 310 result = service._check_confirmation( 311 day, 312 "120000_300", 313 {"audio.flac": "abc123"}, 314 ) 315 316 assert result is False 317 318 def test_cleanup_segment(self, sync_journal, monkeypatch): 319 """Test segment cleanup deletes files.""" 320 from observe.sync import SyncService 321 322 journal = sync_journal["path"] 323 day = sync_journal["day"] 324 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 325 326 service = SyncService("https://server/ingest/key") 327 328 segment_dir = journal / day / "default" / "120000_300" 329 audio_file = segment_dir / "audio.flac" 330 video_file = segment_dir / "screen.webm" 331 332 # Verify files exist 333 assert audio_file.exists() 334 assert video_file.exists() 335 336 # Cleanup 337 service._cleanup_segment(segment_dir, [audio_file, video_file]) 338 339 # Files should be deleted 340 assert not audio_file.exists() 341 assert not video_file.exists() 342 # Directory should be removed if empty 343 assert not segment_dir.exists() 344 345 def test_handle_observing_message( 346 self, sync_journal, monkeypatch, mock_remote_client, mock_callosum 347 ): 348 """Test handling observe.observing message.""" 349 from observe.sync import SyncService, load_sync_state 350 351 journal = sync_journal["path"] 352 day = sync_journal["day"] 353 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 354 355 service = SyncService("https://server/ingest/key") 356 service._callosum = mock_callosum 357 358 # Simulate observing message with metadata 359 message = { 360 "tract": "observe", 361 "event": "observing", 362 "day": day, 363 "segment": "120000_300", 364 "files": ["audio.flac", "screen.webm"], 365 "host": "testhost", 366 "platform": "linux", 367 "stream": "default", 368 "meta": {"facet": "work"}, 369 } 370 371 service._handle_message(message) 372 373 # Check pending record was written with metadata 374 records = load_sync_state(day) 375 assert len(records) == 1 376 assert records[0]["status"] == "pending" 377 assert records[0]["segment"] == "120000_300" 378 assert len(records[0]["files"]) == 2 379 # Verify metadata was extracted and merged 380 assert records[0]["meta"]["host"] == "testhost" 381 assert records[0]["meta"]["platform"] == "linux" 382 assert records[0]["meta"]["facet"] == "work" 383 384 # Check segment was queued with metadata 385 assert service._queue.qsize() == 1 386 seg_info = service._queue.get_nowait() 387 assert seg_info.meta["host"] == "testhost" 388 assert seg_info.meta["facet"] == "work" 389 390 391def test_sync_service_startup_with_pending(sync_journal, monkeypatch): 392 """Test that startup loads pending segments into the queue with metadata.""" 393 from observe.sync import SyncService, append_sync_record 394 395 journal = sync_journal["path"] 396 day = sync_journal["day"] 397 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 398 399 # Add pending segment with metadata 400 append_sync_record( 401 day, 402 { 403 "ts": 1234567890000, 404 "segment": "120000_300", 405 "status": "pending", 406 "files": [{"name": "audio.flac", "sha256": "abc123"}], 407 "meta": {"host": "remote-host", "platform": "darwin"}, 408 }, 409 ) 410 411 with patch("observe.sync.RemoteClient"), patch("observe.sync.CallosumConnection"): 412 service = SyncService("https://server/ingest/key") 413 # Replace worker with no-op so thread exits immediately 414 service._sync_worker = lambda: None 415 service.start() 416 417 # Pending segment should have been queued with metadata 418 assert service._queue.qsize() == 1 419 seg_info = service._queue.get_nowait() 420 assert seg_info.segment == "120000_300" 421 assert seg_info.day == day 422 assert seg_info.meta["host"] == "remote-host" 423 assert seg_info.meta["platform"] == "darwin" 424 425 service.stop() 426 427 428def test_process_segment_skips_upload_if_already_confirmed(sync_journal, monkeypatch): 429 """Test that segment already on server is skipped without upload.""" 430 from observe.sync import SegmentInfo, SyncService, UploadResult 431 432 journal = sync_journal["path"] 433 day = sync_journal["day"] 434 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 435 436 # Create SegmentInfo 437 seg_info = SegmentInfo( 438 day=day, 439 segment="120000_300", 440 files=[ 441 {"name": "audio.flac", "sha256": "abc123"}, 442 {"name": "screen.webm", "sha256": "def456"}, 443 ], 444 meta={"stream": "default"}, 445 ) 446 447 with patch("observe.sync.CallosumConnection") as mock_callosum_class: 448 mock_callosum = MagicMock() 449 mock_callosum_class.return_value = mock_callosum 450 451 with patch("observe.sync.RemoteClient") as mock_client_class: 452 mock_client = MagicMock() 453 mock_session = MagicMock() 454 455 # Simulate server already has the segment with matching SHA256 456 server_response = MagicMock() 457 server_response.status_code = 200 458 server_response.json.return_value = [ 459 { 460 "key": "120000_300", 461 "files": [ 462 {"name": "audio.flac", "sha256": "abc123"}, 463 {"name": "screen.webm", "sha256": "def456"}, 464 ], 465 } 466 ] 467 mock_session.get.return_value = server_response 468 mock_client.session = mock_session 469 mock_client.upload_segment = MagicMock(return_value=UploadResult(True)) 470 mock_client_class.return_value = mock_client 471 472 service = SyncService("https://server/ingest/key") 473 474 # Call _process_segment directly (internal method) 475 service._process_segment(seg_info) 476 477 # Upload should NOT have been called (already confirmed) 478 mock_client.upload_segment.assert_not_called() 479 480 481def test_process_segment_uploads_if_not_on_server(sync_journal, monkeypatch): 482 """Test that segment not on server is uploaded.""" 483 from observe.sync import SegmentInfo, SyncService, UploadResult 484 485 journal = sync_journal["path"] 486 day = sync_journal["day"] 487 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 488 489 seg_info = SegmentInfo( 490 day=day, 491 segment="120000_300", 492 files=[ 493 {"name": "audio.flac", "sha256": "abc123"}, 494 ], 495 meta={"stream": "default"}, 496 ) 497 498 with patch("observe.sync.CallosumConnection") as mock_callosum_class: 499 mock_callosum = MagicMock() 500 mock_callosum_class.return_value = mock_callosum 501 502 with patch("observe.sync.RemoteClient") as mock_client_class: 503 mock_client = MagicMock() 504 mock_session = MagicMock() 505 506 # First call: server doesn't have segment (pre-check) 507 # Second call: server has segment (post-upload confirm) 508 responses = [ 509 MagicMock(status_code=200, json=MagicMock(return_value=[])), 510 MagicMock( 511 status_code=200, 512 json=MagicMock( 513 return_value=[ 514 { 515 "key": "120000_300", 516 "files": [{"name": "audio.flac", "sha256": "abc123"}], 517 } 518 ] 519 ), 520 ), 521 ] 522 mock_session.get.side_effect = responses 523 mock_client.session = mock_session 524 mock_client.upload_segment = MagicMock(return_value=UploadResult(True)) 525 mock_client_class.return_value = mock_client 526 527 service = SyncService("https://server/ingest/key") 528 service._process_segment(seg_info) 529 530 # Upload SHOULD have been called 531 mock_client.upload_segment.assert_called_once() 532 533 534def test_process_segment_passes_metadata_to_upload(sync_journal, monkeypatch): 535 """Test that metadata is passed through to upload_segment call.""" 536 from observe.sync import SegmentInfo, SyncService, UploadResult 537 538 journal = sync_journal["path"] 539 day = sync_journal["day"] 540 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 541 542 # Create SegmentInfo with metadata 543 seg_info = SegmentInfo( 544 day=day, 545 segment="120000_300", 546 files=[ 547 {"name": "audio.flac", "sha256": "abc123"}, 548 ], 549 meta={ 550 "host": "laptop", 551 "platform": "linux", 552 "facet": "meetings", 553 "stream": "default", 554 }, 555 ) 556 557 with patch("observe.sync.CallosumConnection") as mock_callosum_class: 558 mock_callosum = MagicMock() 559 mock_callosum_class.return_value = mock_callosum 560 561 with patch("observe.sync.RemoteClient") as mock_client_class: 562 mock_client = MagicMock() 563 mock_session = MagicMock() 564 565 # First call: server doesn't have segment (pre-check) 566 # Second call: server has segment (post-upload confirm) 567 responses = [ 568 MagicMock(status_code=200, json=MagicMock(return_value=[])), 569 MagicMock( 570 status_code=200, 571 json=MagicMock( 572 return_value=[ 573 { 574 "key": "120000_300", 575 "files": [{"name": "audio.flac", "sha256": "abc123"}], 576 } 577 ] 578 ), 579 ), 580 ] 581 mock_session.get.side_effect = responses 582 mock_client.session = mock_session 583 mock_client.upload_segment = MagicMock(return_value=UploadResult(True)) 584 mock_client_class.return_value = mock_client 585 586 service = SyncService("https://server/ingest/key") 587 service._process_segment(seg_info) 588 589 # Verify upload was called with metadata 590 mock_client.upload_segment.assert_called_once() 591 call_kwargs = mock_client.upload_segment.call_args.kwargs 592 assert call_kwargs["meta"] == { 593 "host": "laptop", 594 "platform": "linux", 595 "facet": "meetings", 596 "stream": "default", 597 } 598 599 600def test_handle_message_skips_zero_byte_files(sync_journal, monkeypatch): 601 """Test that 0-byte files are skipped during message handling.""" 602 from observe.sync import SyncService, load_sync_state 603 604 journal = sync_journal["path"] 605 day = sync_journal["day"] 606 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 607 608 # Create segment directory with mixed files 609 seg_dir = journal / day / "default" / "120000_300" 610 seg_dir.mkdir(parents=True, exist_ok=True) 611 (seg_dir / "audio.flac").write_bytes(b"real audio data") 612 (seg_dir / "screen.webm").write_bytes(b"") # 0-byte 613 614 with patch("observe.sync.RemoteClient"), patch("observe.sync.CallosumConnection"): 615 service = SyncService("https://server/ingest/key") 616 617 message = { 618 "tract": "observe", 619 "event": "observing", 620 "day": day, 621 "segment": "120000_300", 622 "files": ["audio.flac", "screen.webm"], 623 "stream": "default", 624 } 625 service._handle_message(message) 626 627 # Only valid file should be queued 628 assert service._queue.qsize() == 1 629 seg_info = service._queue.get_nowait() 630 assert len(seg_info.files) == 1 631 assert seg_info.files[0]["name"] == "audio.flac" 632 633 # Pending record should only have 1 file 634 records = load_sync_state(day) 635 assert len(records) == 1 636 assert len(records[0]["files"]) == 1 637 638 639def test_handle_message_skips_all_zero_byte_files(sync_journal, monkeypatch): 640 """Test that segment is not queued when all files are 0-byte.""" 641 from observe.sync import SyncService, load_sync_state 642 643 journal = sync_journal["path"] 644 day = sync_journal["day"] 645 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 646 647 # Create segment directory with only 0-byte files 648 seg_dir = journal / day / "default" / "120000_300" 649 seg_dir.mkdir(parents=True, exist_ok=True) 650 (seg_dir / "audio.flac").write_bytes(b"") 651 (seg_dir / "screen.webm").write_bytes(b"") 652 653 with patch("observe.sync.RemoteClient"), patch("observe.sync.CallosumConnection"): 654 service = SyncService("https://server/ingest/key") 655 656 message = { 657 "tract": "observe", 658 "event": "observing", 659 "day": day, 660 "segment": "120000_300", 661 "files": ["audio.flac", "screen.webm"], 662 "stream": "default", 663 } 664 service._handle_message(message) 665 666 # No segment should be queued 667 assert service._queue.qsize() == 0 668 669 # No pending record should be written 670 records = load_sync_state(day) 671 assert len(records) == 0 672 673 674def test_process_segment_duplicate_skips_confirmation(sync_journal, monkeypatch): 675 """Test that duplicate upload skips confirmation polling.""" 676 from observe.sync import SegmentInfo, SyncService, UploadResult, load_sync_state 677 678 journal = sync_journal["path"] 679 day = sync_journal["day"] 680 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 681 682 seg_info = SegmentInfo( 683 day=day, 684 segment="120000_300", 685 files=[ 686 {"name": "audio.flac", "sha256": "abc123"}, 687 ], 688 meta={"stream": "default"}, 689 ) 690 691 # Create the segment file so cleanup has something to work with 692 seg_dir = journal / day / "default" / "120000_300" 693 seg_dir.mkdir(parents=True, exist_ok=True) 694 (seg_dir / "audio.flac").write_bytes(b"audio data") 695 696 with patch("observe.sync.CallosumConnection") as mock_callosum_class: 697 mock_callosum = MagicMock() 698 mock_callosum_class.return_value = mock_callosum 699 700 with patch("observe.sync.RemoteClient") as mock_client_class: 701 mock_client = MagicMock() 702 mock_session = MagicMock() 703 704 # Pre-check: server doesn't have segment yet 705 mock_session.get.return_value = MagicMock( 706 status_code=200, json=MagicMock(return_value=[]) 707 ) 708 mock_client.session = mock_session 709 # Upload returns duplicate 710 mock_client.upload_segment = MagicMock( 711 return_value=UploadResult(True, duplicate=True) 712 ) 713 mock_client_class.return_value = mock_client 714 715 service = SyncService("https://server/ingest/key") 716 service._process_segment(seg_info) 717 718 # Upload should have been called 719 mock_client.upload_segment.assert_called_once() 720 721 # Confirmation polling should NOT have happened 722 # (only 1 GET call for pre-check, no additional confirmation GETs) 723 assert mock_session.get.call_count == 1 724 725 # Confirmed record should have been written 726 records = load_sync_state(day) 727 confirmed = [r for r in records if r.get("status") == "confirmed"] 728 assert len(confirmed) == 1 729 assert confirmed[0]["segment"] == "120000_300" 730 731 732def test_sync_service_stop_drains_queue(sync_journal, monkeypatch): 733 """stop() should drain queued segments before returning.""" 734 from observe.sync import SegmentInfo, SyncService 735 736 journal = sync_journal["path"] 737 day = sync_journal["day"] 738 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 739 740 with ( 741 patch("observe.sync.RemoteClient"), 742 patch("observe.sync.CallosumConnection"), 743 ): 744 service = SyncService("https://server/ingest/key") 745 # Prevent start() from running the real worker 746 real_worker = service._sync_worker 747 service._sync_worker = lambda: None 748 service.start() 749 750 assert service._worker_thread is not None 751 service._worker_thread.join(timeout=1.0) 752 assert not service._worker_thread.is_alive() 753 754 service._queue.put( 755 SegmentInfo( 756 day=day, segment="120000_301", files=[{"name": "audio.flac"}], meta={} 757 ) 758 ) 759 service._queue.put( 760 SegmentInfo( 761 day=day, segment="120000_302", files=[{"name": "audio.flac"}], meta={} 762 ) 763 ) 764 765 service._sync_worker = real_worker 766 service._stop_event.clear() 767 768 with patch.object(service, "_process_segment") as mock_process: 769 mock_process.return_value = None 770 771 service._worker_thread = threading.Thread( 772 target=service._sync_worker, daemon=False 773 ) 774 service._worker_thread.start() 775 776 service.stop() 777 778 assert mock_process.call_count == 2 779 service._worker_thread.join(timeout=1.0) 780 781 782def test_sync_service_stop_disconnects_callosum_first(sync_journal, monkeypatch): 783 """stop() should stop callosum before setting the stop event.""" 784 from observe.sync import SyncService 785 786 journal = sync_journal["path"] 787 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 788 789 mock_callosum = MagicMock() 790 shutdown_order: list[str] = [] 791 service = SyncService("https://server/ingest/key") 792 service._callosum = mock_callosum 793 service._worker_thread = None 794 service._stop_event = MagicMock() 795 796 service._callosum.stop.side_effect = lambda: shutdown_order.append("callosum") 797 service._stop_event.set.side_effect = lambda: shutdown_order.append("stop_event") 798 799 service.stop() 800 801 assert shutdown_order == ["callosum", "stop_event"] 802 803 804def test_sync_service_worker_thread_not_daemon(sync_journal, monkeypatch): 805 """Worker thread should be non-daemon.""" 806 from observe.sync import SyncService 807 808 journal = sync_journal["path"] 809 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 810 811 with ( 812 patch("observe.sync.get_pending_segments", return_value=[]), 813 patch("observe.sync.RemoteClient"), 814 patch("observe.sync.CallosumConnection"), 815 ): 816 service = SyncService("https://server/ingest/key") 817 service.start() 818 819 assert service._worker_thread is not None 820 assert service._worker_thread.daemon is False 821 822 service.stop() 823 824 825def test_sync_service_drain_completes_current_segment(sync_journal, monkeypatch): 826 """Current segment should finish when stop_event is set and draining is active.""" 827 from observe.sync import SegmentInfo, SyncService, UploadResult 828 829 journal = sync_journal["path"] 830 day = sync_journal["day"] 831 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 832 833 service = SyncService("https://server/ingest/key") 834 service._client.upload_segment = MagicMock(return_value=UploadResult(True)) 835 service._check_confirmation = MagicMock(side_effect=[False, True]) 836 837 segment_info = SegmentInfo( 838 day=day, 839 segment="120000_300", 840 files=[{"name": "audio.flac", "sha256": "abc123"}], 841 meta={"stream": "default"}, 842 ) 843 service._queue.put(segment_info) 844 845 service._draining = True 846 service._stop_event.set() 847 848 def upload_side_effect(*_args, **_kwargs): 849 service._stop_event.set() 850 return UploadResult(True) 851 852 service._client.upload_segment.side_effect = upload_side_effect 853 854 service._process_segment(service._queue.get_nowait()) 855 856 assert service._client.upload_segment.call_count == 1 857 assert not (journal / day / "default" / "120000_300" / "audio.flac").exists() 858 859 860def test_main_sigterm_triggers_stop(monkeypatch): 861 """main() should stop the service on SIGTERM.""" 862 import argparse 863 864 import observe.sync 865 866 handlers: dict[int, object] = {} 867 868 def capture_signal(signum, handler): 869 handlers[signum] = handler 870 return handler 871 872 args = argparse.Namespace(remote="https://server/ingest/key", days_back=7) 873 874 with ( 875 patch("observe.sync.setup_cli", return_value=args), 876 patch("observe.sync.SyncService") as mock_service, 877 patch("observe.sync.signal.signal", side_effect=capture_signal), 878 ): 879 service_instance = MagicMock() 880 mock_service.return_value = service_instance 881 882 thread = threading.Thread(target=observe.sync.main) 883 thread.start() 884 885 deadline = time.time() + 1.0 886 while signal.SIGTERM not in handlers and time.time() < deadline: 887 time.sleep(0.01) 888 assert signal.SIGTERM in handlers 889 assert signal.SIGINT in handlers 890 891 handlers[signal.SIGTERM](signal.SIGTERM, None) 892 thread.join(timeout=1.0) 893 894 assert thread.is_alive() is False 895 service_instance.start.assert_called_once() 896 service_instance.stop.assert_called_once() 897 898 899class TestCheckRemoteHealth: 900 """Tests for check_remote_health() function.""" 901 902 def test_health_check_success(self): 903 """Test successful health check returns True with connection info.""" 904 from observe.sync import check_remote_health 905 906 with patch("observe.sync.requests.get") as mock_get: 907 mock_response = MagicMock() 908 mock_response.status_code = 200 909 mock_get.return_value = mock_response 910 911 success, message = check_remote_health( 912 "http://server.local:8000/app/remote/ingest/abc12345xyz" 913 ) 914 915 assert success is True 916 assert "server.local:8000" in message 917 assert "abc12345" in message # Key prefix 918 919 def test_health_check_invalid_key(self): 920 """Test 401 response returns False with appropriate message.""" 921 from observe.sync import check_remote_health 922 923 with patch("observe.sync.requests.get") as mock_get: 924 mock_response = MagicMock() 925 mock_response.status_code = 401 926 mock_get.return_value = mock_response 927 928 success, message = check_remote_health( 929 "http://server.local:8000/app/remote/ingest/badkey" 930 ) 931 932 assert success is False 933 assert "401" in message or "Invalid key" in message 934 935 def test_health_check_revoked_key(self): 936 """Test 403 response returns False with error details.""" 937 from observe.sync import check_remote_health 938 939 with patch("observe.sync.requests.get") as mock_get: 940 mock_response = MagicMock() 941 mock_response.status_code = 403 942 mock_response.text = '{"error": "Remote revoked"}' 943 mock_response.json.return_value = {"error": "Remote revoked"} 944 mock_get.return_value = mock_response 945 946 success, message = check_remote_health( 947 "http://server.local:8000/app/remote/ingest/revokedkey" 948 ) 949 950 assert success is False 951 assert "403" in message or "revoked" in message.lower() 952 953 def test_health_check_403_non_json_body(self): 954 """Test 403 with non-JSON body doesn't crash.""" 955 from observe.sync import check_remote_health 956 957 with patch("observe.sync.requests.get") as mock_get: 958 mock_response = MagicMock() 959 mock_response.status_code = 403 960 mock_response.text = "Forbidden" 961 mock_response.json.side_effect = requests.exceptions.JSONDecodeError( 962 "", "", 0 963 ) 964 mock_get.return_value = mock_response 965 966 success, message = check_remote_health( 967 "http://server.local:8000/app/remote/ingest/badkey" 968 ) 969 970 assert success is False 971 assert "403" in message 972 973 def test_health_check_connection_refused(self): 974 """Test connection refused returns False with clear message.""" 975 from observe.sync import check_remote_health 976 977 with patch("observe.sync.requests.get") as mock_get: 978 mock_get.side_effect = requests.exceptions.ConnectionError( 979 "Connection refused" 980 ) 981 982 success, message = check_remote_health( 983 "http://server.local:8000/app/remote/ingest/key123" 984 ) 985 986 assert success is False 987 assert "refused" in message.lower() or "connection" in message.lower() 988 989 def test_health_check_timeout(self): 990 """Test timeout returns False with timeout message.""" 991 from observe.sync import check_remote_health 992 993 with patch("observe.sync.requests.get") as mock_get: 994 mock_get.side_effect = requests.exceptions.Timeout("timed out") 995 996 success, message = check_remote_health( 997 "http://server.local:8000/app/remote/ingest/key123", 998 timeout=5.0, 999 ) 1000 1001 assert success is False 1002 assert "timeout" in message.lower() 1003 1004 def test_health_check_host_not_found(self): 1005 """Test DNS failure returns False with host not found message.""" 1006 from observe.sync import check_remote_health 1007 1008 with patch("observe.sync.requests.get") as mock_get: 1009 mock_get.side_effect = requests.exceptions.ConnectionError( 1010 "Name or service not known" 1011 ) 1012 1013 success, message = check_remote_health( 1014 "http://nonexistent.invalid/app/remote/ingest/key123" 1015 ) 1016 1017 assert success is False 1018 assert "not found" in message.lower() or "connection" in message.lower()