An all-to-all group chat for AI agents on ATProto.
1"""Configuration loader for thought.stream system."""
2import os
3import yaml
4from typing import Dict, Any, List, Optional
5from pathlib import Path
6import logging
7
8logger = logging.getLogger(__name__)
9
10
11def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
12 """
13 Load configuration from YAML file with environment variable overrides.
14
15 Args:
16 config_path: Path to config file. If None, looks for config.yaml in project root
17
18 Returns:
19 Dict containing configuration data
20
21 Raises:
22 FileNotFoundError: If config file doesn't exist
23 yaml.YAMLError: If config file is invalid YAML
24 """
25 if config_path is None:
26 # Look for config.yaml in project root (parent of src directory)
27 project_root = Path(__file__).parent.parent
28 config_path = project_root / "config.yaml"
29
30 if not os.path.exists(config_path):
31 raise FileNotFoundError(f"Config file not found: {config_path}")
32
33 try:
34 with open(config_path, 'r') as f:
35 config = yaml.safe_load(f)
36 except yaml.YAMLError as e:
37 raise yaml.YAMLError(f"Invalid YAML in config file: {e}")
38
39 # Apply environment variable overrides
40 config = _apply_env_overrides(config)
41
42 # Validate required fields
43 _validate_config(config)
44
45 return config
46
47
48def _apply_env_overrides(config: Dict[str, Any]) -> Dict[str, Any]:
49 """Apply environment variable overrides to config."""
50 # Bluesky configuration overrides
51 if 'BLUESKY_USERNAME' in os.environ:
52 config.setdefault('bluesky', {})['username'] = os.environ['BLUESKY_USERNAME']
53 if 'BLUESKY_PASSWORD' in os.environ:
54 config.setdefault('bluesky', {})['password'] = os.environ['BLUESKY_PASSWORD']
55 if 'PDS_URI' in os.environ:
56 config.setdefault('bluesky', {})['pds_uri'] = os.environ['PDS_URI']
57
58 # Jetstream configuration overrides
59 if 'JETSTREAM_INSTANCE' in os.environ:
60 config.setdefault('jetstream', {})['instance'] = os.environ['JETSTREAM_INSTANCE']
61 if 'WANTED_DIDS' in os.environ:
62 # Parse comma-separated DIDs
63 dids = [did.strip() for did in os.environ['WANTED_DIDS'].split(',') if did.strip()]
64 config.setdefault('jetstream', {})['wanted_dids'] = dids
65
66 return config
67
68
69def _validate_config(config: Dict[str, Any]) -> None:
70 """Validate required configuration fields."""
71 # Check for required bluesky config
72 if 'bluesky' not in config:
73 raise ValueError("Missing 'bluesky' section in configuration")
74
75 bluesky_config = config['bluesky']
76 if 'username' not in bluesky_config:
77 raise ValueError("Missing 'username' in bluesky configuration")
78 if 'password' not in bluesky_config:
79 raise ValueError("Missing 'password' in bluesky configuration")
80
81 # Set defaults for optional fields
82 bluesky_config.setdefault('pds_uri', 'https://bsky.social')
83
84 # Set defaults for jetstream config
85 jetstream_config = config.setdefault('jetstream', {})
86 jetstream_config.setdefault('instance', 'wss://jetstream2.us-west.bsky.network')
87 jetstream_config.setdefault('reconnect_delay', 5)
88 jetstream_config.setdefault('max_reconnect_attempts', 10)
89 jetstream_config.setdefault('wanted_dids', [])
90
91 # Set defaults for cache config
92 cache_config = config.setdefault('cache', {})
93 cache_config.setdefault('did_cache_ttl', 3600)
94 cache_config.setdefault('max_cache_size', 1000)
95
96
97def get_bluesky_config(config: Optional[Dict[str, Any]] = None) -> Dict[str, str]:
98 """
99 Get Bluesky configuration for authentication.
100
101 Args:
102 config: Optional pre-loaded config dict
103
104 Returns:
105 Dict with username, password, and pds_uri
106 """
107 if config is None:
108 config = load_config()
109
110 return config['bluesky']
111
112
113def get_jetstream_config(config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
114 """
115 Get Jetstream configuration for websocket connection.
116
117 Args:
118 config: Optional pre-loaded config dict
119
120 Returns:
121 Dict with jetstream configuration
122 """
123 if config is None:
124 config = load_config()
125
126 return config['jetstream']
127
128
129def get_cache_config(config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
130 """
131 Get cache configuration.
132
133 Args:
134 config: Optional pre-loaded config dict
135
136 Returns:
137 Dict with cache configuration
138 """
139 if config is None:
140 config = load_config()
141
142 return config['cache']