personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Tests for think.utils module."""
5
6import argparse
7import json
8import os
9import sys
10import tempfile
11from datetime import time
12from pathlib import Path
13
14import pytest
15
16from think.entities import load_entity_names
17from think.utils import (
18 DEFAULT_STREAM,
19 day_from_path,
20 iter_segments,
21 segment_parse,
22 segment_key,
23 setup_cli,
24)
25
26
27class TestDayFromPath:
28 def test_file_in_segment(self):
29 """Standard 3-level path: day/stream/segment/file."""
30 p = Path("/journal/20260212/fedora/150304_300/audio.flac")
31 assert day_from_path(p) == "20260212"
32
33 def test_file_in_day(self):
34 """File directly in day dir."""
35 p = Path("/journal/20260212/somefile.txt")
36 assert day_from_path(p) == "20260212"
37
38 def test_day_dir_itself(self):
39 """Path IS the day directory."""
40 p = Path("/journal/20260212")
41 assert day_from_path(p) == "20260212"
42
43 def test_no_day_in_path(self):
44 """Path with no YYYYMMDD ancestor returns None."""
45 p = Path("/tmp/random/file.txt")
46 assert day_from_path(p) is None
47
48 def test_segment_dir(self):
49 """Segment directory (no file)."""
50 p = Path("/journal/20260212/default/150304_300")
51 assert day_from_path(p) == "20260212"
52
53
54def setup_entities_new_structure(
55 journal_path: Path,
56 facet: str,
57 entities: list[tuple[str, str, str]] | list[dict],
58):
59 """Helper to set up entities using the new structure for tests.
60
61 Creates both journal-level entity files and facet relationship files.
62
63 Args:
64 journal_path: Path to journal root
65 facet: Facet name (e.g., "test")
66 entities: Either list of (type, name, desc) tuples or list of entity dicts
67 """
68 from slugify import slugify
69
70 for item in entities:
71 if isinstance(item, dict):
72 etype = item.get("type", "")
73 name = item.get("name", "")
74 desc = item.get("description", "")
75 aka = item.get("aka", [])
76 else:
77 etype, name, desc = item
78 aka = []
79
80 entity_id = slugify(name, separator="_")
81 if not entity_id:
82 continue
83
84 # Create journal-level entity
85 journal_entity_dir = journal_path / "entities" / entity_id
86 journal_entity_dir.mkdir(parents=True, exist_ok=True)
87 journal_entity = {"id": entity_id, "name": name, "type": etype}
88 if aka:
89 journal_entity["aka"] = aka
90 with open(journal_entity_dir / "entity.json", "w", encoding="utf-8") as f:
91 json.dump(journal_entity, f)
92
93 # Create facet relationship
94 facet_entity_dir = journal_path / "facets" / facet / "entities" / entity_id
95 facet_entity_dir.mkdir(parents=True, exist_ok=True)
96 relationship = {"entity_id": entity_id, "description": desc}
97 with open(facet_entity_dir / "entity.json", "w", encoding="utf-8") as f:
98 json.dump(relationship, f)
99
100
101def test_load_entity_names_with_valid_file(monkeypatch):
102 """Test loading entity names from entities."""
103 with tempfile.TemporaryDirectory() as tmpdir:
104 setup_entities_new_structure(
105 Path(tmpdir),
106 "test",
107 [
108 ("Person", "John Smith", "A software engineer at Google"),
109 ("Company", "Acme Corp", "Technology company based in SF"),
110 ("Project", "Project X", "Secret internal project"),
111 ("Tool", "Hammer", "For hitting things"),
112 ("Person", "Jane Doe", "Product manager at Meta"),
113 ("Company", "Widget Inc", "Manufacturing company"),
114 ],
115 )
116
117 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
118 result = load_entity_names()
119
120 # Check that names are extracted without duplicates
121 names = result.split("; ")
122 assert len(names) == 6
123 assert "John Smith" in names
124 assert "Acme Corp" in names
125 assert "Project X" in names
126 assert "Hammer" in names
127 assert "Jane Doe" in names
128 assert "Widget Inc" in names
129
130
131def test_load_entity_names_missing_file(monkeypatch):
132 """Test that missing file returns None."""
133 with tempfile.TemporaryDirectory() as tmpdir:
134 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
135 result = load_entity_names()
136 assert result is None
137
138
139def test_load_entity_names_empty_facet(monkeypatch):
140 """Test that empty facet returns None."""
141 with tempfile.TemporaryDirectory() as tmpdir:
142 # Create facet directory but no entities
143 facet_dir = Path(tmpdir) / "facets" / "test"
144 facet_dir.mkdir(parents=True, exist_ok=True)
145
146 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
147 result = load_entity_names()
148 assert result is None
149
150
151def test_load_entity_names_no_valid_entries(monkeypatch):
152 """Test empty entities directory returns None."""
153 with tempfile.TemporaryDirectory() as tmpdir:
154 # Create entities directory but no entity subdirectories
155 entities_dir = Path(tmpdir) / "facets" / "test" / "entities"
156 entities_dir.mkdir(parents=True, exist_ok=True)
157
158 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
159 result = load_entity_names()
160 assert result is None
161
162
163def test_load_entity_names_with_duplicates(monkeypatch):
164 """Test that duplicate names are filtered out (by entity id)."""
165 with tempfile.TemporaryDirectory() as tmpdir:
166 # With new structure, same entity_id means same entity
167 # Can't have true duplicates - just test two entities
168 setup_entities_new_structure(
169 Path(tmpdir),
170 "test",
171 [
172 ("Person", "John Smith", "Engineer"),
173 ("Company", "Acme Corp", "Tech company"),
174 ],
175 )
176
177 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
178 result = load_entity_names()
179
180 names = result.split("; ")
181 assert len(names) == 2
182 assert "John Smith" in names
183 assert "Acme Corp" in names
184
185
186def test_load_entity_names_handles_special_characters(monkeypatch):
187 """Test that names with special characters are handled correctly."""
188 with tempfile.TemporaryDirectory() as tmpdir:
189 setup_entities_new_structure(
190 Path(tmpdir),
191 "test",
192 [
193 ("Person", "Jean-Pierre O'Malley", "Engineer"),
194 ("Company", "AT&T", "Telecom company"),
195 ("Project", "C++ Compiler", "Development tool"),
196 ("Tool", "Node.js", "JavaScript runtime"),
197 ],
198 )
199
200 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
201 result = load_entity_names()
202 assert "Jean-Pierre O'Malley" in result
203 assert "AT&T" in result
204 assert "C++ Compiler" in result
205 assert "Node.js" in result
206
207
208def test_load_entity_names_with_env_var(monkeypatch):
209 """Test loading using _SOLSTONE_JOURNAL_OVERRIDE environment variable."""
210 with tempfile.TemporaryDirectory() as tmpdir:
211 setup_entities_new_structure(
212 Path(tmpdir),
213 "test",
214 [("Person", "Test User", "A test person")],
215 )
216
217 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
218
219 # Should use env var
220 result = load_entity_names()
221 assert result == "Test User"
222
223
224def test_load_entity_names_empty_journal(tmp_path, monkeypatch):
225 """Test that empty journal directory returns None."""
226 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
227
228 result = load_entity_names()
229 assert result is None
230
231
232def test_load_entity_names_spoken_mode(monkeypatch):
233 """Test spoken mode returns shortened forms with uniform processing for all types."""
234 with tempfile.TemporaryDirectory() as tmpdir:
235 setup_entities_new_structure(
236 Path(tmpdir),
237 "test",
238 [
239 ("Person", "Jeremie Miller (Jer)", "Software engineer"),
240 ("Person", "Jane Elizabeth Doe", "Product manager"),
241 ("Company", "Acme Corporation (ACME)", "Tech company"),
242 ("Company", "Widget Inc", "Manufacturing company"),
243 ("Company", "Google", "Search engine"),
244 ("Project", "solstone Project (SUN)", "AI journaling"),
245 ("Project", "Project X", "Secret project"),
246 ("Tool", "Hammer", "For hitting things"),
247 ("Tool", "Docker", "Container runtime"),
248 ],
249 )
250
251 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
252 result = load_entity_names(spoken=True)
253
254 # Should return a list, not a string
255 assert isinstance(result, list)
256
257 # Person: "Jeremie Miller (Jer)" -> ["Jeremie", "Jer"]
258 assert "Jeremie" in result
259 assert "Jer" in result
260
261 # Person: "Jane Elizabeth Doe" -> ["Jane"]
262 assert "Jane" in result
263 # Should not include middle/last names
264 assert "Elizabeth" not in result
265 assert "Doe" not in result
266
267 # Company: "Acme Corporation (ACME)" -> ["Acme", "ACME"] (uniform processing)
268 assert "Acme" in result # First word
269 assert "ACME" in result # From parens
270
271 # Company: "Widget Inc" (multi-word) -> ["Widget"]
272 assert "Widget" in result
273
274 # Company: "Google" (single word) -> ["Google"]
275 assert "Google" in result
276
277 # Project: "solstone Project (SUN)" -> ["solstone", "SUN"] (uniform processing)
278 assert "solstone" in result # First word
279 assert "SUN" in result # From parens
280
281 # Project: "Project X" (no parens) -> ["Project"] (first word only)
282 assert "Project" in result
283
284 # Tools are now included (uniform processing for all types)
285 assert "Hammer" in result
286 assert "Docker" in result
287
288
289def test_load_entity_names_spoken_mode_with_tools(monkeypatch):
290 """Test spoken mode includes tools with uniform processing."""
291 with tempfile.TemporaryDirectory() as tmpdir:
292 setup_entities_new_structure(
293 Path(tmpdir),
294 "test",
295 [
296 ("Tool", "Hammer", "For hitting things"),
297 ("Tool", "Docker", "Container runtime"),
298 ],
299 )
300
301 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
302 result = load_entity_names(spoken=True)
303 # Tools are now included (uniform processing)
304 assert isinstance(result, list)
305 assert "Hammer" in result
306 assert "Docker" in result
307
308
309def test_load_entity_names_spoken_mode_duplicates(monkeypatch):
310 """Test spoken mode filters out duplicate shortened forms."""
311 with tempfile.TemporaryDirectory() as tmpdir:
312 setup_entities_new_structure(
313 Path(tmpdir),
314 "test",
315 [
316 ("Person", "John Smith", "Engineer"),
317 ("Person", "John Doe", "Manager"),
318 ("Company", "Acme Corp", "Tech"),
319 ("Company", "Acme Industries", "Manufacturing"),
320 ],
321 )
322
323 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
324 result = load_entity_names(spoken=True)
325
326 # Should have only one "John" and one "Acme" even though there are two of each
327 assert result.count("John") == 1
328 assert result.count("Acme") == 1
329
330
331def test_load_entity_names_uniform_processing(monkeypatch):
332 """Test that uniform processing works correctly for all entity types."""
333 with tempfile.TemporaryDirectory() as tmpdir:
334 setup_entities_new_structure(
335 Path(tmpdir),
336 "test",
337 [
338 ("Person", "Ryan Reed (R2)", "Software developer"),
339 (
340 "Company",
341 "Federal Aviation Administration (FAA)",
342 "Government agency",
343 ),
344 ("Project", "Backend API (API)", "Core service"),
345 ("Tool", "pytest", "Testing framework"),
346 ("Location", "New York City (NYC)", "Metropolitan area"),
347 ],
348 )
349
350 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
351 result = load_entity_names(spoken=True)
352
353 assert isinstance(result, list)
354
355 # "Ryan Reed (R2)" -> ["Ryan", "R2"] (digits allowed if has letter)
356 assert "Ryan" in result
357 assert "R2" in result
358 assert "Reed" not in result
359
360 # "Federal Aviation Administration (FAA)" -> ["Federal", "FAA"]
361 assert "Federal" in result
362 assert "FAA" in result
363 assert "Aviation" not in result
364 assert "Administration" not in result
365
366 # "Backend API (API)" -> ["Backend", "API"]
367 assert "Backend" in result
368 assert "API" in result
369
370 # "pytest" -> ["pytest"]
371 assert "pytest" in result
372
373 # "New York City (NYC)" -> ["New", "NYC"]
374 assert "New" in result
375 assert "NYC" in result
376 assert "York" not in result
377 assert "City" not in result
378
379
380def test_load_entity_names_with_aka_field(monkeypatch):
381 """Test that aka field values are included in spoken mode."""
382 with tempfile.TemporaryDirectory() as tmpdir:
383 setup_entities_new_structure(
384 Path(tmpdir),
385 "test",
386 [
387 {
388 "type": "Person",
389 "name": "Alice Johnson",
390 "description": "Lead engineer",
391 "aka": ["Ali", "AJ"],
392 },
393 {
394 "type": "Company",
395 "name": "PostgreSQL",
396 "description": "Database system",
397 "aka": ["Postgres", "PG"],
398 },
399 {
400 "type": "Tool",
401 "name": "Docker Container (Docker)",
402 "description": "Container runtime",
403 "aka": ["Dock"],
404 },
405 ],
406 )
407
408 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
409 result = load_entity_names(spoken=True)
410
411 assert isinstance(result, list)
412
413 # Main name: "Alice Johnson" -> ["Alice"]
414 assert "Alice" in result
415 # aka entries: ["Ali", "AJ"]
416 assert "Ali" in result
417 assert "AJ" in result
418
419 # Main name: "PostgreSQL" -> ["PostgreSQL"]
420 assert "PostgreSQL" in result
421 # aka entries: ["Postgres", "PG"]
422 assert "Postgres" in result
423 assert "PG" in result
424
425 # Main name: "Docker Container (Docker)" -> ["Docker", "Docker"]
426 # aka entries: ["Dock"]
427 assert "Docker" in result
428 assert "Dock" in result
429 # Should be deduplicated - only one "Docker"
430 assert result.count("Docker") == 1
431
432
433def test_load_entity_names_aka_with_parens(monkeypatch):
434 """Test that aka entries with parentheses are processed correctly."""
435 with tempfile.TemporaryDirectory() as tmpdir:
436 setup_entities_new_structure(
437 Path(tmpdir),
438 "test",
439 [
440 {
441 "type": "Person",
442 "name": "Robert Smith",
443 "description": "Manager",
444 "aka": ["Bob Smith (Bobby)", "Rob"],
445 },
446 ],
447 )
448
449 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
450 result = load_entity_names(spoken=True)
451
452 assert isinstance(result, list)
453
454 # Main name: "Robert Smith" -> ["Robert"]
455 assert "Robert" in result
456
457 # aka entry: "Bob Smith (Bobby)" -> ["Bob", "Bobby"]
458 assert "Bob" in result
459 assert "Bobby" in result
460
461 # aka entry: "Rob" -> ["Rob"]
462 assert "Rob" in result
463
464
465def test_load_entity_names_aka_deduplication(monkeypatch):
466 """Test that aka values are deduplicated with main names."""
467 with tempfile.TemporaryDirectory() as tmpdir:
468 setup_entities_new_structure(
469 Path(tmpdir),
470 "test",
471 [
472 # First entity has "John" in aka
473 {
474 "type": "Person",
475 "name": "Alice",
476 "description": "Person 1",
477 "aka": ["John"],
478 },
479 # Second entity has "John" as main name
480 {"type": "Person", "name": "John Smith", "description": "Person 2"},
481 ],
482 )
483
484 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
485 result = load_entity_names(spoken=True)
486
487 # Should have only one "John" even though it appears in aka and as main name
488 assert result.count("John") == 1
489 assert "Alice" in result
490
491
492def test_load_entity_names_non_spoken_with_aka(monkeypatch):
493 """Test non-spoken mode includes aka values in parentheses."""
494 with tempfile.TemporaryDirectory() as tmpdir:
495 setup_entities_new_structure(
496 Path(tmpdir),
497 "test",
498 [
499 # Entity with aka values
500 {
501 "type": "Person",
502 "name": "Alice Johnson",
503 "description": "Lead engineer",
504 "aka": ["Ali", "AJ"],
505 },
506 # Entity without aka
507 {
508 "type": "Company",
509 "name": "TechCorp",
510 "description": "Tech company",
511 },
512 # Entity with multiple aka
513 {
514 "type": "Tool",
515 "name": "PostgreSQL",
516 "description": "Database",
517 "aka": ["Postgres", "PG"],
518 },
519 ],
520 )
521
522 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir)
523 result = load_entity_names(spoken=False)
524
525 # Check all entities are present with their aka
526 assert "Alice Johnson (Ali, AJ)" in result
527 assert "TechCorp" in result
528 assert "PostgreSQL (Postgres, PG)" in result
529
530
531class TestTruncatedEcho:
532 """Tests for truncated_echo output helper."""
533
534 def test_under_limit_passes_through(self, capsys):
535 """Text under the limit is printed without truncation."""
536 from think.utils import truncated_echo
537
538 truncated_echo("hello world", max_bytes=1024)
539 captured = capsys.readouterr()
540 assert captured.out == "hello world\n"
541 assert captured.err == ""
542
543 def test_over_limit_truncates_and_warns(self, capsys):
544 """Text over the limit is truncated with stderr warning."""
545 from think.utils import truncated_echo
546
547 text = "a" * 200
548 truncated_echo(text, max_bytes=50)
549 captured = capsys.readouterr()
550 # stdout should have exactly 50 bytes of content + newline
551 assert captured.out == "a" * 50 + "\n"
552 assert "truncated" in captured.err
553 assert "200" in captured.err
554 assert "50" in captured.err
555
556 def test_zero_means_unlimited(self, capsys):
557 """max_bytes=0 disables truncation."""
558 from think.utils import truncated_echo
559
560 text = "b" * 100_000
561 truncated_echo(text, max_bytes=0)
562 captured = capsys.readouterr()
563 assert captured.out == text + "\n"
564 assert captured.err == ""
565
566 def test_utf8_boundary_safe(self, capsys):
567 """Truncation at a multibyte UTF-8 boundary drops partial chars."""
568 from think.utils import truncated_echo
569
570 # Each emoji is 4 bytes in UTF-8
571 text = "\U0001f600" * 10 # 40 bytes total
572 truncated_echo(text, max_bytes=6) # mid-second emoji
573 captured = capsys.readouterr()
574 # Should get only the first complete emoji (4 bytes) since bytes 5-6
575 # form an incomplete character that gets dropped by errors="ignore"
576 assert captured.out == "\U0001f600\n"
577 assert "truncated" in captured.err
578
579 def test_exact_limit_no_truncation(self, capsys):
580 """Text exactly at the byte limit is not truncated."""
581 from think.utils import truncated_echo
582
583 text = "x" * 100
584 truncated_echo(text, max_bytes=100)
585 captured = capsys.readouterr()
586 assert captured.out == text + "\n"
587 assert captured.err == ""
588
589
590def test_segment_key_hhmmss_with_duration():
591 """Test segment_key with HHMMSS_LEN format."""
592 assert segment_key("143022_300") == "143022_300"
593 assert segment_key("095604_303") == "095604_303"
594 assert segment_key("120000_3600") == "120000_3600"
595 assert segment_key("000000_1") == "000000_1"
596
597
598def test_segment_key_hhmmss_len_with_suffix():
599 """Test segment_key with HHMMSS_LEN_suffix format."""
600 assert segment_key("143022_300_audio") == "143022_300"
601 assert segment_key("095604_303_screen") == "095604_303"
602 assert segment_key("120000_3600_recording") == "120000_3600"
603 assert segment_key("000000_1_mic_sys") == "000000_1"
604
605
606def test_segment_key_with_file_extension():
607 """Test segment_key with various file extensions."""
608 assert segment_key("143022_300_audio.flac") == "143022_300"
609 assert segment_key("095604_303_screen.webm") == "095604_303"
610 assert segment_key("143022_300.jsonl") == "143022_300"
611
612
613def test_segment_key_in_path():
614 """Test segment_key extraction from full paths."""
615 assert segment_key("/journal/20250109/143022_300/audio.jsonl") == "143022_300"
616 assert segment_key("/home/user/20250110/095604_303_screen.webm") == "095604_303"
617 assert segment_key("20250110/143022_300_audio.flac") == "143022_300"
618
619
620def test_segment_key_invalid_formats():
621 """Test segment_key with invalid formats returns None."""
622 assert segment_key("invalid") is None
623 assert segment_key("12345") is None # Too short
624 assert segment_key("1234567") is None # Too long
625 assert segment_key("abcdef") is None # Not digits
626 assert segment_key("14:30:22") is None # Wrong separator
627 assert segment_key("") is None
628 assert segment_key("_143022") is None
629 # Legacy formats without duration now return None
630 assert segment_key("143022") is None
631 assert segment_key("143022_audio") is None
632 assert segment_key("143022_screen") is None
633
634
635def test_segment_key_edge_cases():
636 """Test segment_key with edge cases."""
637 # Multiple underscores in suffix
638 assert segment_key("143022_300_mic_sys_audio") == "143022_300"
639 # Segment key with non-word boundary prefix (should not match)
640 assert segment_key("prefix_143022_300_suffix") is None
641 # Segment key with space/path separator (word boundary - should match)
642 assert segment_key("prefix/143022_300/suffix") == "143022_300"
643 assert segment_key("prefix 143022_300 suffix") == "143022_300"
644 # Multiple potential matches (should match first)
645 assert segment_key("143022_300 and 150000_600") == "143022_300"
646
647
648def test_segment_parse_clamps_midnight_crossing():
649 """Test segment_parse clamps end time when a segment crosses midnight."""
650 assert segment_parse("235900_300") == (time(23, 59, 0), time(23, 59, 59))
651 assert segment_parse("143022_300") == (time(14, 30, 22), time(14, 35, 22))
652
653
654class TestSetupCliConfigEnv:
655 """Tests for config env injection via setup_cli()."""
656
657 @pytest.fixture
658 def cli_env(self, monkeypatch, tmp_path):
659 """Set up a journal with config and mock sys.argv for setup_cli tests.
660
661 Returns a helper function to write config and run setup_cli.
662 """
663 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
664 monkeypatch.setattr(sys, "argv", ["test"])
665
666 def write_config_and_run(config: dict | None = None):
667 """Write config to journal and run setup_cli."""
668 if config is not None:
669 config_dir = tmp_path / "config"
670 config_dir.mkdir(exist_ok=True)
671 config_file = config_dir / "journal.json"
672 config_file.write_text(json.dumps(config))
673
674 parser = argparse.ArgumentParser()
675 setup_cli(parser)
676
677 return write_config_and_run
678
679 def test_config_env_injected_into_os_environ(self, monkeypatch, cli_env):
680 """Test that config env values are injected into os.environ."""
681 monkeypatch.delenv("TEST_API_KEY", raising=False)
682 monkeypatch.delenv("ANOTHER_VAR", raising=False)
683
684 cli_env(
685 {
686 "identity": {"name": "Test"},
687 "env": {
688 "TEST_API_KEY": "from_config",
689 "ANOTHER_VAR": "also_from_config",
690 },
691 }
692 )
693
694 assert os.environ.get("TEST_API_KEY") == "from_config"
695 assert os.environ.get("ANOTHER_VAR") == "also_from_config"
696
697 def test_journal_config_overrides_shell_env(self, monkeypatch, cli_env):
698 """Test that journal.json config is the strict source for env vars."""
699 monkeypatch.setenv("EXISTING_VAR", "from_shell")
700
701 cli_env(
702 {
703 "identity": {"name": "Test"},
704 "env": {"EXISTING_VAR": "from_config"},
705 }
706 )
707
708 assert os.environ.get("EXISTING_VAR") == "from_config"
709
710 def test_empty_shell_env_allows_config_override(self, monkeypatch, cli_env):
711 """Test that empty shell env values are overridden by config."""
712 monkeypatch.setenv("EMPTY_VAR", "")
713
714 cli_env(
715 {
716 "identity": {"name": "Test"},
717 "env": {"EMPTY_VAR": "from_config"},
718 }
719 )
720
721 assert os.environ.get("EMPTY_VAR") == "from_config"
722
723 def test_missing_env_section_is_safe(self, cli_env):
724 """Test that missing env section in config doesn't cause errors."""
725 cli_env({"identity": {"name": "Test"}})
726
727 def test_missing_config_file_is_safe(self, cli_env):
728 """Test that missing config file doesn't cause errors."""
729 cli_env(None) # No config file
730
731 def test_config_env_converts_non_string_values(self, monkeypatch, cli_env):
732 """Test that non-string config values are converted to strings."""
733 monkeypatch.delenv("INT_VAR", raising=False)
734 monkeypatch.delenv("BOOL_VAR", raising=False)
735
736 cli_env(
737 {
738 "identity": {"name": "Test"},
739 "env": {
740 "INT_VAR": 42,
741 "BOOL_VAR": True,
742 },
743 }
744 )
745
746 assert os.environ.get("INT_VAR") == "42"
747 assert os.environ.get("BOOL_VAR") == "True"
748
749
750class TestPortDiscovery:
751 """Tests for service port discovery utilities."""
752
753 def test_find_available_port_returns_valid_port(self):
754 """Test that find_available_port returns a valid port number."""
755 from think.utils import find_available_port
756
757 port = find_available_port()
758 assert isinstance(port, int)
759 assert 1024 <= port <= 65535 # User-space port range
760
761 def test_find_available_port_different_each_call(self):
762 """Test that multiple calls can return different ports."""
763 from think.utils import find_available_port
764
765 # Get multiple ports - they may or may not be unique, but should all be valid
766 ports = [find_available_port() for _ in range(3)]
767 for port in ports:
768 assert isinstance(port, int)
769 assert 1024 <= port <= 65535
770
771 def test_write_and_read_service_port(self, monkeypatch, tmp_path):
772 """Test writing and reading a service port file."""
773 from think.utils import read_service_port, write_service_port
774
775 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
776
777 # Write port
778 write_service_port("test_service", 12345)
779
780 # Read port back
781 port = read_service_port("test_service")
782 assert port == 12345
783
784 # Verify file exists in correct location
785 port_file = tmp_path / "health" / "test_service.port"
786 assert port_file.exists()
787 assert port_file.read_text() == "12345"
788
789 def test_read_service_port_missing_file(self, monkeypatch, tmp_path):
790 """Test that reading missing port file returns None."""
791 from think.utils import read_service_port
792
793 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
794
795 port = read_service_port("nonexistent")
796 assert port is None
797
798 def test_read_service_port_invalid_content(self, monkeypatch, tmp_path):
799 """Test that reading invalid port file content returns None."""
800 from think.utils import read_service_port
801
802 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
803
804 # Create port file with invalid content
805 health_dir = tmp_path / "health"
806 health_dir.mkdir()
807 port_file = health_dir / "bad_service.port"
808 port_file.write_text("not a number")
809
810 port = read_service_port("bad_service")
811 assert port is None
812
813 def test_write_service_port_creates_health_dir(self, monkeypatch, tmp_path):
814 """Test that write_service_port creates health directory if needed."""
815 from think.utils import write_service_port
816
817 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
818
819 # Health dir doesn't exist yet
820 health_dir = tmp_path / "health"
821 assert not health_dir.exists()
822
823 write_service_port("new_service", 9999)
824
825 # Now it should exist
826 assert health_dir.exists()
827 assert (health_dir / "new_service.port").read_text() == "9999"
828
829
830class TestIterSegments:
831 def test_skips_health_directory(self, tmp_path):
832 """iter_segments does not return segments from health/ dirs."""
833 day_dir = tmp_path / "20240101"
834 day_dir.mkdir()
835 health_seg = day_dir / "health" / "120000_300"
836 health_seg.mkdir(parents=True)
837 normal_seg = day_dir / "default" / "130000_300"
838 normal_seg.mkdir(parents=True)
839
840 results = iter_segments(day_dir)
841 stream_names = [r[0] for r in results]
842 assert "health" not in stream_names
843 assert "default" in stream_names
844
845 def test_toplevel_segments_as_default_stream(self, tmp_path):
846 """Top-level segment dirs are returned with _default stream name."""
847 day_dir = tmp_path / "20240101"
848 day_dir.mkdir()
849 toplevel_seg = day_dir / "143022_300"
850 toplevel_seg.mkdir()
851 normal_seg = day_dir / "default" / "150000_300"
852 normal_seg.mkdir(parents=True)
853
854 results = iter_segments(day_dir)
855 assert len(results) == 2
856 default_results = [(s, k, p) for s, k, p in results if s == DEFAULT_STREAM]
857 assert len(default_results) == 1
858 assert default_results[0][1] == "143022_300"
859 normal_results = [(s, k, p) for s, k, p in results if s == "default"]
860 assert len(normal_results) == 1
861
862 def test_normal_stream_discovery_unchanged(self, tmp_path):
863 """Normal stream/segment discovery still works correctly."""
864 day_dir = tmp_path / "20240101"
865 day_dir.mkdir()
866 (day_dir / "default" / "100000_300").mkdir(parents=True)
867 (day_dir / "default" / "110000_300").mkdir(parents=True)
868 (day_dir / "import.apple" / "120000_600").mkdir(parents=True)
869
870 results = iter_segments(day_dir)
871 assert len(results) == 3
872 assert results[0][1] == "100000_300"
873 assert results[1][1] == "110000_300"
874 assert results[2][1] == "120000_600"
875 assert results[0][0] == "default"
876 assert results[2][0] == "import.apple"