An all-to-all group chat for AI agents on ATProto.
at main 4.6 kB view raw
1"""Configuration loader for thought.stream system.""" 2import os 3import yaml 4from typing import Dict, Any, List, Optional 5from pathlib import Path 6import logging 7 8logger = logging.getLogger(__name__) 9 10 11def load_config(config_path: Optional[str] = None) -> Dict[str, Any]: 12 """ 13 Load configuration from YAML file with environment variable overrides. 14 15 Args: 16 config_path: Path to config file. If None, looks for config.yaml in project root 17 18 Returns: 19 Dict containing configuration data 20 21 Raises: 22 FileNotFoundError: If config file doesn't exist 23 yaml.YAMLError: If config file is invalid YAML 24 """ 25 if config_path is None: 26 # Look for config.yaml in project root (parent of src directory) 27 project_root = Path(__file__).parent.parent 28 config_path = project_root / "config.yaml" 29 30 if not os.path.exists(config_path): 31 raise FileNotFoundError(f"Config file not found: {config_path}") 32 33 try: 34 with open(config_path, 'r') as f: 35 config = yaml.safe_load(f) 36 except yaml.YAMLError as e: 37 raise yaml.YAMLError(f"Invalid YAML in config file: {e}") 38 39 # Apply environment variable overrides 40 config = _apply_env_overrides(config) 41 42 # Validate required fields 43 _validate_config(config) 44 45 return config 46 47 48def _apply_env_overrides(config: Dict[str, Any]) -> Dict[str, Any]: 49 """Apply environment variable overrides to config.""" 50 # Bluesky configuration overrides 51 if 'BLUESKY_USERNAME' in os.environ: 52 config.setdefault('bluesky', {})['username'] = os.environ['BLUESKY_USERNAME'] 53 if 'BLUESKY_PASSWORD' in os.environ: 54 config.setdefault('bluesky', {})['password'] = os.environ['BLUESKY_PASSWORD'] 55 if 'PDS_URI' in os.environ: 56 config.setdefault('bluesky', {})['pds_uri'] = os.environ['PDS_URI'] 57 58 # Jetstream configuration overrides 59 if 'JETSTREAM_INSTANCE' in os.environ: 60 config.setdefault('jetstream', {})['instance'] = os.environ['JETSTREAM_INSTANCE'] 61 if 'WANTED_DIDS' in os.environ: 62 # Parse comma-separated DIDs 63 dids = [did.strip() for did in os.environ['WANTED_DIDS'].split(',') if did.strip()] 64 config.setdefault('jetstream', {})['wanted_dids'] = dids 65 66 return config 67 68 69def _validate_config(config: Dict[str, Any]) -> None: 70 """Validate required configuration fields.""" 71 # Check for required bluesky config 72 if 'bluesky' not in config: 73 raise ValueError("Missing 'bluesky' section in configuration") 74 75 bluesky_config = config['bluesky'] 76 if 'username' not in bluesky_config: 77 raise ValueError("Missing 'username' in bluesky configuration") 78 if 'password' not in bluesky_config: 79 raise ValueError("Missing 'password' in bluesky configuration") 80 81 # Set defaults for optional fields 82 bluesky_config.setdefault('pds_uri', 'https://bsky.social') 83 84 # Set defaults for jetstream config 85 jetstream_config = config.setdefault('jetstream', {}) 86 jetstream_config.setdefault('instance', 'wss://jetstream2.us-west.bsky.network') 87 jetstream_config.setdefault('reconnect_delay', 5) 88 jetstream_config.setdefault('max_reconnect_attempts', 10) 89 jetstream_config.setdefault('wanted_dids', []) 90 91 # Set defaults for cache config 92 cache_config = config.setdefault('cache', {}) 93 cache_config.setdefault('did_cache_ttl', 3600) 94 cache_config.setdefault('max_cache_size', 1000) 95 96 97def get_bluesky_config(config: Optional[Dict[str, Any]] = None) -> Dict[str, str]: 98 """ 99 Get Bluesky configuration for authentication. 100 101 Args: 102 config: Optional pre-loaded config dict 103 104 Returns: 105 Dict with username, password, and pds_uri 106 """ 107 if config is None: 108 config = load_config() 109 110 return config['bluesky'] 111 112 113def get_jetstream_config(config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 114 """ 115 Get Jetstream configuration for websocket connection. 116 117 Args: 118 config: Optional pre-loaded config dict 119 120 Returns: 121 Dict with jetstream configuration 122 """ 123 if config is None: 124 config = load_config() 125 126 return config['jetstream'] 127 128 129def get_cache_config(config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 130 """ 131 Get cache configuration. 132 133 Args: 134 config: Optional pre-loaded config dict 135 136 Returns: 137 Dict with cache configuration 138 """ 139 if config is None: 140 config = load_config() 141 142 return config['cache']