"""Configuration loader for thought.stream system.""" import os import yaml from typing import Dict, Any, List, Optional from pathlib import Path import logging logger = logging.getLogger(__name__) def load_config(config_path: Optional[str] = None) -> Dict[str, Any]: """ Load configuration from YAML file with environment variable overrides. Args: config_path: Path to config file. If None, looks for config.yaml in project root Returns: Dict containing configuration data Raises: FileNotFoundError: If config file doesn't exist yaml.YAMLError: If config file is invalid YAML """ if config_path is None: # Look for config.yaml in project root (parent of src directory) project_root = Path(__file__).parent.parent config_path = project_root / "config.yaml" if not os.path.exists(config_path): raise FileNotFoundError(f"Config file not found: {config_path}") try: with open(config_path, 'r') as f: config = yaml.safe_load(f) except yaml.YAMLError as e: raise yaml.YAMLError(f"Invalid YAML in config file: {e}") # Apply environment variable overrides config = _apply_env_overrides(config) # Validate required fields _validate_config(config) return config def _apply_env_overrides(config: Dict[str, Any]) -> Dict[str, Any]: """Apply environment variable overrides to config.""" # Bluesky configuration overrides if 'BLUESKY_USERNAME' in os.environ: config.setdefault('bluesky', {})['username'] = os.environ['BLUESKY_USERNAME'] if 'BLUESKY_PASSWORD' in os.environ: config.setdefault('bluesky', {})['password'] = os.environ['BLUESKY_PASSWORD'] if 'PDS_URI' in os.environ: config.setdefault('bluesky', {})['pds_uri'] = os.environ['PDS_URI'] # Jetstream configuration overrides if 'JETSTREAM_INSTANCE' in os.environ: config.setdefault('jetstream', {})['instance'] = os.environ['JETSTREAM_INSTANCE'] if 'WANTED_DIDS' in os.environ: # Parse comma-separated DIDs dids = [did.strip() for did in os.environ['WANTED_DIDS'].split(',') if did.strip()] config.setdefault('jetstream', {})['wanted_dids'] = dids return config def _validate_config(config: Dict[str, Any]) -> None: """Validate required configuration fields.""" # Check for required bluesky config if 'bluesky' not in config: raise ValueError("Missing 'bluesky' section in configuration") bluesky_config = config['bluesky'] if 'username' not in bluesky_config: raise ValueError("Missing 'username' in bluesky configuration") if 'password' not in bluesky_config: raise ValueError("Missing 'password' in bluesky configuration") # Set defaults for optional fields bluesky_config.setdefault('pds_uri', 'https://bsky.social') # Set defaults for jetstream config jetstream_config = config.setdefault('jetstream', {}) jetstream_config.setdefault('instance', 'wss://jetstream2.us-west.bsky.network') jetstream_config.setdefault('reconnect_delay', 5) jetstream_config.setdefault('max_reconnect_attempts', 10) jetstream_config.setdefault('wanted_dids', []) # Set defaults for cache config cache_config = config.setdefault('cache', {}) cache_config.setdefault('did_cache_ttl', 3600) cache_config.setdefault('max_cache_size', 1000) def get_bluesky_config(config: Optional[Dict[str, Any]] = None) -> Dict[str, str]: """ Get Bluesky configuration for authentication. Args: config: Optional pre-loaded config dict Returns: Dict with username, password, and pds_uri """ if config is None: config = load_config() return config['bluesky'] def get_jetstream_config(config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Get Jetstream configuration for websocket connection. Args: config: Optional pre-loaded config dict Returns: Dict with jetstream configuration """ if config is None: config = load_config() return config['jetstream'] def get_cache_config(config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Get cache configuration. Args: config: Optional pre-loaded config dict Returns: Dict with cache configuration """ if config is None: config = load_config() return config['cache']