personal memory agent
at main 876 lines 32 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Tests for think.utils module.""" 5 6import argparse 7import json 8import os 9import sys 10import tempfile 11from datetime import time 12from pathlib import Path 13 14import pytest 15 16from think.entities import load_entity_names 17from think.utils import ( 18 DEFAULT_STREAM, 19 day_from_path, 20 iter_segments, 21 segment_parse, 22 segment_key, 23 setup_cli, 24) 25 26 27class TestDayFromPath: 28 def test_file_in_segment(self): 29 """Standard 3-level path: day/stream/segment/file.""" 30 p = Path("/journal/20260212/fedora/150304_300/audio.flac") 31 assert day_from_path(p) == "20260212" 32 33 def test_file_in_day(self): 34 """File directly in day dir.""" 35 p = Path("/journal/20260212/somefile.txt") 36 assert day_from_path(p) == "20260212" 37 38 def test_day_dir_itself(self): 39 """Path IS the day directory.""" 40 p = Path("/journal/20260212") 41 assert day_from_path(p) == "20260212" 42 43 def test_no_day_in_path(self): 44 """Path with no YYYYMMDD ancestor returns None.""" 45 p = Path("/tmp/random/file.txt") 46 assert day_from_path(p) is None 47 48 def test_segment_dir(self): 49 """Segment directory (no file).""" 50 p = Path("/journal/20260212/default/150304_300") 51 assert day_from_path(p) == "20260212" 52 53 54def setup_entities_new_structure( 55 journal_path: Path, 56 facet: str, 57 entities: list[tuple[str, str, str]] | list[dict], 58): 59 """Helper to set up entities using the new structure for tests. 60 61 Creates both journal-level entity files and facet relationship files. 62 63 Args: 64 journal_path: Path to journal root 65 facet: Facet name (e.g., "test") 66 entities: Either list of (type, name, desc) tuples or list of entity dicts 67 """ 68 from slugify import slugify 69 70 for item in entities: 71 if isinstance(item, dict): 72 etype = item.get("type", "") 73 name = item.get("name", "") 74 desc = item.get("description", "") 75 aka = item.get("aka", []) 76 else: 77 etype, name, desc = item 78 aka = [] 79 80 entity_id = slugify(name, separator="_") 81 if not entity_id: 82 continue 83 84 # Create journal-level entity 85 journal_entity_dir = journal_path / "entities" / entity_id 86 journal_entity_dir.mkdir(parents=True, exist_ok=True) 87 journal_entity = {"id": entity_id, "name": name, "type": etype} 88 if aka: 89 journal_entity["aka"] = aka 90 with open(journal_entity_dir / "entity.json", "w", encoding="utf-8") as f: 91 json.dump(journal_entity, f) 92 93 # Create facet relationship 94 facet_entity_dir = journal_path / "facets" / facet / "entities" / entity_id 95 facet_entity_dir.mkdir(parents=True, exist_ok=True) 96 relationship = {"entity_id": entity_id, "description": desc} 97 with open(facet_entity_dir / "entity.json", "w", encoding="utf-8") as f: 98 json.dump(relationship, f) 99 100 101def test_load_entity_names_with_valid_file(monkeypatch): 102 """Test loading entity names from entities.""" 103 with tempfile.TemporaryDirectory() as tmpdir: 104 setup_entities_new_structure( 105 Path(tmpdir), 106 "test", 107 [ 108 ("Person", "John Smith", "A software engineer at Google"), 109 ("Company", "Acme Corp", "Technology company based in SF"), 110 ("Project", "Project X", "Secret internal project"), 111 ("Tool", "Hammer", "For hitting things"), 112 ("Person", "Jane Doe", "Product manager at Meta"), 113 ("Company", "Widget Inc", "Manufacturing company"), 114 ], 115 ) 116 117 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 118 result = load_entity_names() 119 120 # Check that names are extracted without duplicates 121 names = result.split("; ") 122 assert len(names) == 6 123 assert "John Smith" in names 124 assert "Acme Corp" in names 125 assert "Project X" in names 126 assert "Hammer" in names 127 assert "Jane Doe" in names 128 assert "Widget Inc" in names 129 130 131def test_load_entity_names_missing_file(monkeypatch): 132 """Test that missing file returns None.""" 133 with tempfile.TemporaryDirectory() as tmpdir: 134 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 135 result = load_entity_names() 136 assert result is None 137 138 139def test_load_entity_names_empty_facet(monkeypatch): 140 """Test that empty facet returns None.""" 141 with tempfile.TemporaryDirectory() as tmpdir: 142 # Create facet directory but no entities 143 facet_dir = Path(tmpdir) / "facets" / "test" 144 facet_dir.mkdir(parents=True, exist_ok=True) 145 146 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 147 result = load_entity_names() 148 assert result is None 149 150 151def test_load_entity_names_no_valid_entries(monkeypatch): 152 """Test empty entities directory returns None.""" 153 with tempfile.TemporaryDirectory() as tmpdir: 154 # Create entities directory but no entity subdirectories 155 entities_dir = Path(tmpdir) / "facets" / "test" / "entities" 156 entities_dir.mkdir(parents=True, exist_ok=True) 157 158 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 159 result = load_entity_names() 160 assert result is None 161 162 163def test_load_entity_names_with_duplicates(monkeypatch): 164 """Test that duplicate names are filtered out (by entity id).""" 165 with tempfile.TemporaryDirectory() as tmpdir: 166 # With new structure, same entity_id means same entity 167 # Can't have true duplicates - just test two entities 168 setup_entities_new_structure( 169 Path(tmpdir), 170 "test", 171 [ 172 ("Person", "John Smith", "Engineer"), 173 ("Company", "Acme Corp", "Tech company"), 174 ], 175 ) 176 177 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 178 result = load_entity_names() 179 180 names = result.split("; ") 181 assert len(names) == 2 182 assert "John Smith" in names 183 assert "Acme Corp" in names 184 185 186def test_load_entity_names_handles_special_characters(monkeypatch): 187 """Test that names with special characters are handled correctly.""" 188 with tempfile.TemporaryDirectory() as tmpdir: 189 setup_entities_new_structure( 190 Path(tmpdir), 191 "test", 192 [ 193 ("Person", "Jean-Pierre O'Malley", "Engineer"), 194 ("Company", "AT&T", "Telecom company"), 195 ("Project", "C++ Compiler", "Development tool"), 196 ("Tool", "Node.js", "JavaScript runtime"), 197 ], 198 ) 199 200 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 201 result = load_entity_names() 202 assert "Jean-Pierre O'Malley" in result 203 assert "AT&T" in result 204 assert "C++ Compiler" in result 205 assert "Node.js" in result 206 207 208def test_load_entity_names_with_env_var(monkeypatch): 209 """Test loading using _SOLSTONE_JOURNAL_OVERRIDE environment variable.""" 210 with tempfile.TemporaryDirectory() as tmpdir: 211 setup_entities_new_structure( 212 Path(tmpdir), 213 "test", 214 [("Person", "Test User", "A test person")], 215 ) 216 217 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 218 219 # Should use env var 220 result = load_entity_names() 221 assert result == "Test User" 222 223 224def test_load_entity_names_empty_journal(tmp_path, monkeypatch): 225 """Test that empty journal directory returns None.""" 226 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 227 228 result = load_entity_names() 229 assert result is None 230 231 232def test_load_entity_names_spoken_mode(monkeypatch): 233 """Test spoken mode returns shortened forms with uniform processing for all types.""" 234 with tempfile.TemporaryDirectory() as tmpdir: 235 setup_entities_new_structure( 236 Path(tmpdir), 237 "test", 238 [ 239 ("Person", "Jeremie Miller (Jer)", "Software engineer"), 240 ("Person", "Jane Elizabeth Doe", "Product manager"), 241 ("Company", "Acme Corporation (ACME)", "Tech company"), 242 ("Company", "Widget Inc", "Manufacturing company"), 243 ("Company", "Google", "Search engine"), 244 ("Project", "solstone Project (SUN)", "AI journaling"), 245 ("Project", "Project X", "Secret project"), 246 ("Tool", "Hammer", "For hitting things"), 247 ("Tool", "Docker", "Container runtime"), 248 ], 249 ) 250 251 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 252 result = load_entity_names(spoken=True) 253 254 # Should return a list, not a string 255 assert isinstance(result, list) 256 257 # Person: "Jeremie Miller (Jer)" -> ["Jeremie", "Jer"] 258 assert "Jeremie" in result 259 assert "Jer" in result 260 261 # Person: "Jane Elizabeth Doe" -> ["Jane"] 262 assert "Jane" in result 263 # Should not include middle/last names 264 assert "Elizabeth" not in result 265 assert "Doe" not in result 266 267 # Company: "Acme Corporation (ACME)" -> ["Acme", "ACME"] (uniform processing) 268 assert "Acme" in result # First word 269 assert "ACME" in result # From parens 270 271 # Company: "Widget Inc" (multi-word) -> ["Widget"] 272 assert "Widget" in result 273 274 # Company: "Google" (single word) -> ["Google"] 275 assert "Google" in result 276 277 # Project: "solstone Project (SUN)" -> ["solstone", "SUN"] (uniform processing) 278 assert "solstone" in result # First word 279 assert "SUN" in result # From parens 280 281 # Project: "Project X" (no parens) -> ["Project"] (first word only) 282 assert "Project" in result 283 284 # Tools are now included (uniform processing for all types) 285 assert "Hammer" in result 286 assert "Docker" in result 287 288 289def test_load_entity_names_spoken_mode_with_tools(monkeypatch): 290 """Test spoken mode includes tools with uniform processing.""" 291 with tempfile.TemporaryDirectory() as tmpdir: 292 setup_entities_new_structure( 293 Path(tmpdir), 294 "test", 295 [ 296 ("Tool", "Hammer", "For hitting things"), 297 ("Tool", "Docker", "Container runtime"), 298 ], 299 ) 300 301 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 302 result = load_entity_names(spoken=True) 303 # Tools are now included (uniform processing) 304 assert isinstance(result, list) 305 assert "Hammer" in result 306 assert "Docker" in result 307 308 309def test_load_entity_names_spoken_mode_duplicates(monkeypatch): 310 """Test spoken mode filters out duplicate shortened forms.""" 311 with tempfile.TemporaryDirectory() as tmpdir: 312 setup_entities_new_structure( 313 Path(tmpdir), 314 "test", 315 [ 316 ("Person", "John Smith", "Engineer"), 317 ("Person", "John Doe", "Manager"), 318 ("Company", "Acme Corp", "Tech"), 319 ("Company", "Acme Industries", "Manufacturing"), 320 ], 321 ) 322 323 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 324 result = load_entity_names(spoken=True) 325 326 # Should have only one "John" and one "Acme" even though there are two of each 327 assert result.count("John") == 1 328 assert result.count("Acme") == 1 329 330 331def test_load_entity_names_uniform_processing(monkeypatch): 332 """Test that uniform processing works correctly for all entity types.""" 333 with tempfile.TemporaryDirectory() as tmpdir: 334 setup_entities_new_structure( 335 Path(tmpdir), 336 "test", 337 [ 338 ("Person", "Ryan Reed (R2)", "Software developer"), 339 ( 340 "Company", 341 "Federal Aviation Administration (FAA)", 342 "Government agency", 343 ), 344 ("Project", "Backend API (API)", "Core service"), 345 ("Tool", "pytest", "Testing framework"), 346 ("Location", "New York City (NYC)", "Metropolitan area"), 347 ], 348 ) 349 350 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 351 result = load_entity_names(spoken=True) 352 353 assert isinstance(result, list) 354 355 # "Ryan Reed (R2)" -> ["Ryan", "R2"] (digits allowed if has letter) 356 assert "Ryan" in result 357 assert "R2" in result 358 assert "Reed" not in result 359 360 # "Federal Aviation Administration (FAA)" -> ["Federal", "FAA"] 361 assert "Federal" in result 362 assert "FAA" in result 363 assert "Aviation" not in result 364 assert "Administration" not in result 365 366 # "Backend API (API)" -> ["Backend", "API"] 367 assert "Backend" in result 368 assert "API" in result 369 370 # "pytest" -> ["pytest"] 371 assert "pytest" in result 372 373 # "New York City (NYC)" -> ["New", "NYC"] 374 assert "New" in result 375 assert "NYC" in result 376 assert "York" not in result 377 assert "City" not in result 378 379 380def test_load_entity_names_with_aka_field(monkeypatch): 381 """Test that aka field values are included in spoken mode.""" 382 with tempfile.TemporaryDirectory() as tmpdir: 383 setup_entities_new_structure( 384 Path(tmpdir), 385 "test", 386 [ 387 { 388 "type": "Person", 389 "name": "Alice Johnson", 390 "description": "Lead engineer", 391 "aka": ["Ali", "AJ"], 392 }, 393 { 394 "type": "Company", 395 "name": "PostgreSQL", 396 "description": "Database system", 397 "aka": ["Postgres", "PG"], 398 }, 399 { 400 "type": "Tool", 401 "name": "Docker Container (Docker)", 402 "description": "Container runtime", 403 "aka": ["Dock"], 404 }, 405 ], 406 ) 407 408 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 409 result = load_entity_names(spoken=True) 410 411 assert isinstance(result, list) 412 413 # Main name: "Alice Johnson" -> ["Alice"] 414 assert "Alice" in result 415 # aka entries: ["Ali", "AJ"] 416 assert "Ali" in result 417 assert "AJ" in result 418 419 # Main name: "PostgreSQL" -> ["PostgreSQL"] 420 assert "PostgreSQL" in result 421 # aka entries: ["Postgres", "PG"] 422 assert "Postgres" in result 423 assert "PG" in result 424 425 # Main name: "Docker Container (Docker)" -> ["Docker", "Docker"] 426 # aka entries: ["Dock"] 427 assert "Docker" in result 428 assert "Dock" in result 429 # Should be deduplicated - only one "Docker" 430 assert result.count("Docker") == 1 431 432 433def test_load_entity_names_aka_with_parens(monkeypatch): 434 """Test that aka entries with parentheses are processed correctly.""" 435 with tempfile.TemporaryDirectory() as tmpdir: 436 setup_entities_new_structure( 437 Path(tmpdir), 438 "test", 439 [ 440 { 441 "type": "Person", 442 "name": "Robert Smith", 443 "description": "Manager", 444 "aka": ["Bob Smith (Bobby)", "Rob"], 445 }, 446 ], 447 ) 448 449 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 450 result = load_entity_names(spoken=True) 451 452 assert isinstance(result, list) 453 454 # Main name: "Robert Smith" -> ["Robert"] 455 assert "Robert" in result 456 457 # aka entry: "Bob Smith (Bobby)" -> ["Bob", "Bobby"] 458 assert "Bob" in result 459 assert "Bobby" in result 460 461 # aka entry: "Rob" -> ["Rob"] 462 assert "Rob" in result 463 464 465def test_load_entity_names_aka_deduplication(monkeypatch): 466 """Test that aka values are deduplicated with main names.""" 467 with tempfile.TemporaryDirectory() as tmpdir: 468 setup_entities_new_structure( 469 Path(tmpdir), 470 "test", 471 [ 472 # First entity has "John" in aka 473 { 474 "type": "Person", 475 "name": "Alice", 476 "description": "Person 1", 477 "aka": ["John"], 478 }, 479 # Second entity has "John" as main name 480 {"type": "Person", "name": "John Smith", "description": "Person 2"}, 481 ], 482 ) 483 484 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 485 result = load_entity_names(spoken=True) 486 487 # Should have only one "John" even though it appears in aka and as main name 488 assert result.count("John") == 1 489 assert "Alice" in result 490 491 492def test_load_entity_names_non_spoken_with_aka(monkeypatch): 493 """Test non-spoken mode includes aka values in parentheses.""" 494 with tempfile.TemporaryDirectory() as tmpdir: 495 setup_entities_new_structure( 496 Path(tmpdir), 497 "test", 498 [ 499 # Entity with aka values 500 { 501 "type": "Person", 502 "name": "Alice Johnson", 503 "description": "Lead engineer", 504 "aka": ["Ali", "AJ"], 505 }, 506 # Entity without aka 507 { 508 "type": "Company", 509 "name": "TechCorp", 510 "description": "Tech company", 511 }, 512 # Entity with multiple aka 513 { 514 "type": "Tool", 515 "name": "PostgreSQL", 516 "description": "Database", 517 "aka": ["Postgres", "PG"], 518 }, 519 ], 520 ) 521 522 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 523 result = load_entity_names(spoken=False) 524 525 # Check all entities are present with their aka 526 assert "Alice Johnson (Ali, AJ)" in result 527 assert "TechCorp" in result 528 assert "PostgreSQL (Postgres, PG)" in result 529 530 531class TestTruncatedEcho: 532 """Tests for truncated_echo output helper.""" 533 534 def test_under_limit_passes_through(self, capsys): 535 """Text under the limit is printed without truncation.""" 536 from think.utils import truncated_echo 537 538 truncated_echo("hello world", max_bytes=1024) 539 captured = capsys.readouterr() 540 assert captured.out == "hello world\n" 541 assert captured.err == "" 542 543 def test_over_limit_truncates_and_warns(self, capsys): 544 """Text over the limit is truncated with stderr warning.""" 545 from think.utils import truncated_echo 546 547 text = "a" * 200 548 truncated_echo(text, max_bytes=50) 549 captured = capsys.readouterr() 550 # stdout should have exactly 50 bytes of content + newline 551 assert captured.out == "a" * 50 + "\n" 552 assert "truncated" in captured.err 553 assert "200" in captured.err 554 assert "50" in captured.err 555 556 def test_zero_means_unlimited(self, capsys): 557 """max_bytes=0 disables truncation.""" 558 from think.utils import truncated_echo 559 560 text = "b" * 100_000 561 truncated_echo(text, max_bytes=0) 562 captured = capsys.readouterr() 563 assert captured.out == text + "\n" 564 assert captured.err == "" 565 566 def test_utf8_boundary_safe(self, capsys): 567 """Truncation at a multibyte UTF-8 boundary drops partial chars.""" 568 from think.utils import truncated_echo 569 570 # Each emoji is 4 bytes in UTF-8 571 text = "\U0001f600" * 10 # 40 bytes total 572 truncated_echo(text, max_bytes=6) # mid-second emoji 573 captured = capsys.readouterr() 574 # Should get only the first complete emoji (4 bytes) since bytes 5-6 575 # form an incomplete character that gets dropped by errors="ignore" 576 assert captured.out == "\U0001f600\n" 577 assert "truncated" in captured.err 578 579 def test_exact_limit_no_truncation(self, capsys): 580 """Text exactly at the byte limit is not truncated.""" 581 from think.utils import truncated_echo 582 583 text = "x" * 100 584 truncated_echo(text, max_bytes=100) 585 captured = capsys.readouterr() 586 assert captured.out == text + "\n" 587 assert captured.err == "" 588 589 590def test_segment_key_hhmmss_with_duration(): 591 """Test segment_key with HHMMSS_LEN format.""" 592 assert segment_key("143022_300") == "143022_300" 593 assert segment_key("095604_303") == "095604_303" 594 assert segment_key("120000_3600") == "120000_3600" 595 assert segment_key("000000_1") == "000000_1" 596 597 598def test_segment_key_hhmmss_len_with_suffix(): 599 """Test segment_key with HHMMSS_LEN_suffix format.""" 600 assert segment_key("143022_300_audio") == "143022_300" 601 assert segment_key("095604_303_screen") == "095604_303" 602 assert segment_key("120000_3600_recording") == "120000_3600" 603 assert segment_key("000000_1_mic_sys") == "000000_1" 604 605 606def test_segment_key_with_file_extension(): 607 """Test segment_key with various file extensions.""" 608 assert segment_key("143022_300_audio.flac") == "143022_300" 609 assert segment_key("095604_303_screen.webm") == "095604_303" 610 assert segment_key("143022_300.jsonl") == "143022_300" 611 612 613def test_segment_key_in_path(): 614 """Test segment_key extraction from full paths.""" 615 assert segment_key("/journal/20250109/143022_300/audio.jsonl") == "143022_300" 616 assert segment_key("/home/user/20250110/095604_303_screen.webm") == "095604_303" 617 assert segment_key("20250110/143022_300_audio.flac") == "143022_300" 618 619 620def test_segment_key_invalid_formats(): 621 """Test segment_key with invalid formats returns None.""" 622 assert segment_key("invalid") is None 623 assert segment_key("12345") is None # Too short 624 assert segment_key("1234567") is None # Too long 625 assert segment_key("abcdef") is None # Not digits 626 assert segment_key("14:30:22") is None # Wrong separator 627 assert segment_key("") is None 628 assert segment_key("_143022") is None 629 # Legacy formats without duration now return None 630 assert segment_key("143022") is None 631 assert segment_key("143022_audio") is None 632 assert segment_key("143022_screen") is None 633 634 635def test_segment_key_edge_cases(): 636 """Test segment_key with edge cases.""" 637 # Multiple underscores in suffix 638 assert segment_key("143022_300_mic_sys_audio") == "143022_300" 639 # Segment key with non-word boundary prefix (should not match) 640 assert segment_key("prefix_143022_300_suffix") is None 641 # Segment key with space/path separator (word boundary - should match) 642 assert segment_key("prefix/143022_300/suffix") == "143022_300" 643 assert segment_key("prefix 143022_300 suffix") == "143022_300" 644 # Multiple potential matches (should match first) 645 assert segment_key("143022_300 and 150000_600") == "143022_300" 646 647 648def test_segment_parse_clamps_midnight_crossing(): 649 """Test segment_parse clamps end time when a segment crosses midnight.""" 650 assert segment_parse("235900_300") == (time(23, 59, 0), time(23, 59, 59)) 651 assert segment_parse("143022_300") == (time(14, 30, 22), time(14, 35, 22)) 652 653 654class TestSetupCliConfigEnv: 655 """Tests for config env injection via setup_cli().""" 656 657 @pytest.fixture 658 def cli_env(self, monkeypatch, tmp_path): 659 """Set up a journal with config and mock sys.argv for setup_cli tests. 660 661 Returns a helper function to write config and run setup_cli. 662 """ 663 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 664 monkeypatch.setattr(sys, "argv", ["test"]) 665 666 def write_config_and_run(config: dict | None = None): 667 """Write config to journal and run setup_cli.""" 668 if config is not None: 669 config_dir = tmp_path / "config" 670 config_dir.mkdir(exist_ok=True) 671 config_file = config_dir / "journal.json" 672 config_file.write_text(json.dumps(config)) 673 674 parser = argparse.ArgumentParser() 675 setup_cli(parser) 676 677 return write_config_and_run 678 679 def test_config_env_injected_into_os_environ(self, monkeypatch, cli_env): 680 """Test that config env values are injected into os.environ.""" 681 monkeypatch.delenv("TEST_API_KEY", raising=False) 682 monkeypatch.delenv("ANOTHER_VAR", raising=False) 683 684 cli_env( 685 { 686 "identity": {"name": "Test"}, 687 "env": { 688 "TEST_API_KEY": "from_config", 689 "ANOTHER_VAR": "also_from_config", 690 }, 691 } 692 ) 693 694 assert os.environ.get("TEST_API_KEY") == "from_config" 695 assert os.environ.get("ANOTHER_VAR") == "also_from_config" 696 697 def test_journal_config_overrides_shell_env(self, monkeypatch, cli_env): 698 """Test that journal.json config is the strict source for env vars.""" 699 monkeypatch.setenv("EXISTING_VAR", "from_shell") 700 701 cli_env( 702 { 703 "identity": {"name": "Test"}, 704 "env": {"EXISTING_VAR": "from_config"}, 705 } 706 ) 707 708 assert os.environ.get("EXISTING_VAR") == "from_config" 709 710 def test_empty_shell_env_allows_config_override(self, monkeypatch, cli_env): 711 """Test that empty shell env values are overridden by config.""" 712 monkeypatch.setenv("EMPTY_VAR", "") 713 714 cli_env( 715 { 716 "identity": {"name": "Test"}, 717 "env": {"EMPTY_VAR": "from_config"}, 718 } 719 ) 720 721 assert os.environ.get("EMPTY_VAR") == "from_config" 722 723 def test_missing_env_section_is_safe(self, cli_env): 724 """Test that missing env section in config doesn't cause errors.""" 725 cli_env({"identity": {"name": "Test"}}) 726 727 def test_missing_config_file_is_safe(self, cli_env): 728 """Test that missing config file doesn't cause errors.""" 729 cli_env(None) # No config file 730 731 def test_config_env_converts_non_string_values(self, monkeypatch, cli_env): 732 """Test that non-string config values are converted to strings.""" 733 monkeypatch.delenv("INT_VAR", raising=False) 734 monkeypatch.delenv("BOOL_VAR", raising=False) 735 736 cli_env( 737 { 738 "identity": {"name": "Test"}, 739 "env": { 740 "INT_VAR": 42, 741 "BOOL_VAR": True, 742 }, 743 } 744 ) 745 746 assert os.environ.get("INT_VAR") == "42" 747 assert os.environ.get("BOOL_VAR") == "True" 748 749 750class TestPortDiscovery: 751 """Tests for service port discovery utilities.""" 752 753 def test_find_available_port_returns_valid_port(self): 754 """Test that find_available_port returns a valid port number.""" 755 from think.utils import find_available_port 756 757 port = find_available_port() 758 assert isinstance(port, int) 759 assert 1024 <= port <= 65535 # User-space port range 760 761 def test_find_available_port_different_each_call(self): 762 """Test that multiple calls can return different ports.""" 763 from think.utils import find_available_port 764 765 # Get multiple ports - they may or may not be unique, but should all be valid 766 ports = [find_available_port() for _ in range(3)] 767 for port in ports: 768 assert isinstance(port, int) 769 assert 1024 <= port <= 65535 770 771 def test_write_and_read_service_port(self, monkeypatch, tmp_path): 772 """Test writing and reading a service port file.""" 773 from think.utils import read_service_port, write_service_port 774 775 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 776 777 # Write port 778 write_service_port("test_service", 12345) 779 780 # Read port back 781 port = read_service_port("test_service") 782 assert port == 12345 783 784 # Verify file exists in correct location 785 port_file = tmp_path / "health" / "test_service.port" 786 assert port_file.exists() 787 assert port_file.read_text() == "12345" 788 789 def test_read_service_port_missing_file(self, monkeypatch, tmp_path): 790 """Test that reading missing port file returns None.""" 791 from think.utils import read_service_port 792 793 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 794 795 port = read_service_port("nonexistent") 796 assert port is None 797 798 def test_read_service_port_invalid_content(self, monkeypatch, tmp_path): 799 """Test that reading invalid port file content returns None.""" 800 from think.utils import read_service_port 801 802 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 803 804 # Create port file with invalid content 805 health_dir = tmp_path / "health" 806 health_dir.mkdir() 807 port_file = health_dir / "bad_service.port" 808 port_file.write_text("not a number") 809 810 port = read_service_port("bad_service") 811 assert port is None 812 813 def test_write_service_port_creates_health_dir(self, monkeypatch, tmp_path): 814 """Test that write_service_port creates health directory if needed.""" 815 from think.utils import write_service_port 816 817 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 818 819 # Health dir doesn't exist yet 820 health_dir = tmp_path / "health" 821 assert not health_dir.exists() 822 823 write_service_port("new_service", 9999) 824 825 # Now it should exist 826 assert health_dir.exists() 827 assert (health_dir / "new_service.port").read_text() == "9999" 828 829 830class TestIterSegments: 831 def test_skips_health_directory(self, tmp_path): 832 """iter_segments does not return segments from health/ dirs.""" 833 day_dir = tmp_path / "20240101" 834 day_dir.mkdir() 835 health_seg = day_dir / "health" / "120000_300" 836 health_seg.mkdir(parents=True) 837 normal_seg = day_dir / "default" / "130000_300" 838 normal_seg.mkdir(parents=True) 839 840 results = iter_segments(day_dir) 841 stream_names = [r[0] for r in results] 842 assert "health" not in stream_names 843 assert "default" in stream_names 844 845 def test_toplevel_segments_as_default_stream(self, tmp_path): 846 """Top-level segment dirs are returned with _default stream name.""" 847 day_dir = tmp_path / "20240101" 848 day_dir.mkdir() 849 toplevel_seg = day_dir / "143022_300" 850 toplevel_seg.mkdir() 851 normal_seg = day_dir / "default" / "150000_300" 852 normal_seg.mkdir(parents=True) 853 854 results = iter_segments(day_dir) 855 assert len(results) == 2 856 default_results = [(s, k, p) for s, k, p in results if s == DEFAULT_STREAM] 857 assert len(default_results) == 1 858 assert default_results[0][1] == "143022_300" 859 normal_results = [(s, k, p) for s, k, p in results if s == "default"] 860 assert len(normal_results) == 1 861 862 def test_normal_stream_discovery_unchanged(self, tmp_path): 863 """Normal stream/segment discovery still works correctly.""" 864 day_dir = tmp_path / "20240101" 865 day_dir.mkdir() 866 (day_dir / "default" / "100000_300").mkdir(parents=True) 867 (day_dir / "default" / "110000_300").mkdir(parents=True) 868 (day_dir / "import.apple" / "120000_600").mkdir(parents=True) 869 870 results = iter_segments(day_dir) 871 assert len(results) == 3 872 assert results[0][1] == "100000_300" 873 assert results[1][1] == "110000_300" 874 assert results[2][1] == "120000_600" 875 assert results[0][0] == "default" 876 assert results[2][0] == "import.apple"