#!/usr/bin/env python3 """Interactive setup wizard for Jetstream-Letta bridge configuration.""" import asyncio import getpass import json import os import re import sys import yaml from pathlib import Path from typing import Dict, Any, Optional, List from urllib.parse import urlparse import requests from rich.console import Console from rich.prompt import Prompt, Confirm from rich.table import Table from rich.panel import Panel from rich import print as rprint console = Console() class SetupWizard: """Interactive setup wizard for agent configuration.""" def __init__(self): self.config = {} self.script_dir = Path(__file__).parent.parent self.agents_dir = self.script_dir / "agents" # Ensure agents directory exists self.agents_dir.mkdir(exist_ok=True) def welcome(self): """Display welcome message.""" console.clear() rprint(Panel.fit( "[bold blue]🤖 Jetstream-Letta Bridge Setup Wizard[/bold blue]\n\n" "This wizard will help you configure a new agent for the\n" "Jetstream-Letta bridge system.\n\n" "[dim]You'll need:[/dim]\n" "• Bluesky account with app password\n" "• Letta API key and agent ID\n" "• (Optional) Specific DIDs to monitor", title="Welcome" )) console.print() if not Confirm.ask("Ready to set up your agent?", default=True): console.print("👋 Setup cancelled. Run again when ready!") sys.exit(0) def get_agent_basic_info(self): """Get basic agent information.""" console.print("\n[bold blue]📝 Configuration Name[/bold blue]") # Configuration filename console.print("[dim]This name will be used for your configuration filename[/dim]") agent_name = Prompt.ask( "Configuration name", default="agent" ) self.config_filename = f"{agent_name}.yaml" def get_bluesky_config(self): """Get Bluesky authentication configuration.""" console.print("\n[bold blue]🦋 Bluesky Configuration[/bold blue]") console.print("Configure your Bluesky account for posting responses.\n") # Username username = Prompt.ask("Bluesky handle or email (e.g., alice.bsky.social)") # App password console.print("\n[yellow]⚠️ Use an app password, not your main password![/yellow]") console.print("Generate one at: https://bsky.app/settings/app-passwords\n") password = getpass.getpass("Bluesky app password: ") if not password: password = Prompt.ask("App password (will be visible)", password=True) # PDS URI pds_uri = Prompt.ask( "PDS URI", default="https://bsky.social" ) # Validate PDS URI format if not self.validate_url(pds_uri): console.print("[red]⚠️ Invalid URL format, using default[/red]") pds_uri = "https://bsky.social" self.config["bluesky"] = { "username": username, "password": password, "pds_uri": pds_uri } # Test connection if requested if Confirm.ask("Test Bluesky connection now?", default=True): if self.test_bluesky_auth(): console.print("[green]✅ Bluesky authentication successful![/green]") else: console.print("[red]❌ Bluesky authentication failed[/red]") if not Confirm.ask("Continue anyway?", default=False): console.print("Please check your credentials and try again.") sys.exit(1) def get_letta_config(self): """Get Letta API configuration.""" console.print("\n[bold blue]🧠 Letta Configuration[/bold blue]") console.print("Configure connection to your Letta agent.\n") # API Key api_key = getpass.getpass("Letta API key (sk-let-...): ") if not api_key: api_key = Prompt.ask("API key (will be visible)", password=True) # Validate API key format if not api_key.startswith("sk-let-"): console.print("[yellow]⚠️ API key should start with 'sk-let-'[/yellow]") # Test API connection and get agent list agents = self.get_letta_agents(api_key) if agents: console.print(f"\n[green]✅ Found {len(agents)} agents in your Letta account:[/green]") # Display agents in a table table = Table(show_header=True, header_style="bold blue") table.add_column("#", style="dim", width=3) table.add_column("Agent ID", style="cyan") table.add_column("Name", style="white") table.add_column("Model", style="yellow") table.add_column("Tools", style="green") for i, agent in enumerate(agents, 1): tools_str = ", ".join(agent.get("tools", [])[:3]) if len(agent.get("tools", [])) > 3: tools_str += "..." table.add_row( str(i), agent["id"], agent.get("name", "Unknown"), agent.get("model", "Unknown"), tools_str ) console.print(table) # Let user select agent while True: try: choice = Prompt.ask(f"\nSelect agent (1-{len(agents)})") agent_idx = int(choice) - 1 if 0 <= agent_idx < len(agents): selected_agent = agents[agent_idx] break else: console.print("[red]Invalid choice, please try again[/red]") except ValueError: console.print("[red]Please enter a number[/red]") agent_id = selected_agent["id"] console.print(f"\n[green]Selected agent: {selected_agent.get('name', 'Unknown')} ({agent_id})[/green]") else: # Manual agent ID entry console.print("\n[yellow]⚠️ Could not fetch agent list or no agents found[/yellow]") agent_id = Prompt.ask("Agent ID (uuid format)") # Timeout setting timeout = int(Prompt.ask("Request timeout (seconds)", default="600")) self.config["letta"] = { "api_key": api_key, "timeout": timeout, "agent_id": agent_id } # Create agent section with the ID and defaults self.config["agent"] = { "agent_id": agent_id, "batch_size": 1, "max_steps": 100 } # Add default jetstream and cache configs self.config["jetstream"] = { "instance": "wss://jetstream2.us-west.bsky.network", "wanted_dids": [], "reconnect_delay": 5, "max_reconnect_attempts": 10 } self.config["cache"] = { "did_cache_ttl": 3600, "max_cache_size": 1000 } self.config["bridge"] = { "prompt_template": "[@{author}] {content}", "include_metadata": True } def get_jetstream_config(self): """Get Jetstream monitoring configuration.""" console.print("\n[bold blue]🌊 Jetstream Configuration[/bold blue]") console.print("Configure which communities your agent should monitor.\n") # Jetstream instance instance = Prompt.ask( "Jetstream instance", default="wss://jetstream2.us-west.bsky.network" ) # DIDs to monitor console.print("\n[bold]Community Monitoring:[/bold]") console.print("You can monitor specific DIDs (users/communities) or leave empty to monitor all.") console.print("DIDs look like: did:plc:abcd1234efgh5678...") wanted_dids = [] if Confirm.ask("Monitor specific DIDs only?", default=False): console.print("\nEnter DIDs one by one (press Enter with empty input to finish):") while True: did = Prompt.ask("DID (or press Enter to finish)", default="") if not did: break if self.validate_did(did): wanted_dids.append(did) console.print(f"[green]✅ Added: {did}[/green]") else: console.print("[red]❌ Invalid DID format[/red]") self.config["jetstream"] = { "instance": instance, "wanted_dids": wanted_dids, "reconnect_delay": 5, "max_reconnect_attempts": 10 } def get_listener_config(self): """Get listener configuration for agent behavior.""" console.print("\n[bold blue]🔄 Agent Listening Mode[/bold blue]") console.print("Configure how your agent will listen and respond.\n") # Listener mode selection console.print("[bold]Available modes:[/bold]") console.print("1. [cyan]event[/cyan] - Only responds when messages are queued (efficient)") console.print("2. [cyan]poll[/cyan] - Sends prompts at regular intervals (periodic messaging)") console.print("3. [cyan]interactive[/cyan] - Manual prompt entry for testing") while True: mode_choice = Prompt.ask("\nSelect mode", choices=["1", "2", "3", "event", "poll", "interactive"], default="1") if mode_choice in ["1", "event"]: mode = "event" break elif mode_choice in ["2", "poll"]: mode = "poll" break elif mode_choice in ["3", "interactive"]: mode = "interactive" break console.print(f"[green]Selected mode: {mode}[/green]") # Default listener config listener_config = { "mode": mode, "queue_check_interval": 5, "prompt_template": "What's on your mind? Feel free to share any thoughts using send_message." } # Poll-specific configuration if mode == "poll": console.print("\n[bold]Periodic Messaging Configuration:[/bold]") # Poll interval poll_interval = int(Prompt.ask("How often to prompt agent (seconds)", default="60")) listener_config["poll_interval"] = poll_interval # Auto prompts if Confirm.ask("Configure custom prompts to cycle through?", default=False): console.print("\nEnter prompts (press Enter with empty input to finish):") auto_prompts = [] while True: prompt = Prompt.ask("Prompt (or press Enter to finish)", default="") if not prompt: break auto_prompts.append(prompt) console.print(f"[green]✅ Added: {prompt[:50]}{'...' if len(prompt) > 50 else ''}[/green]") if auto_prompts: listener_config["auto_prompts"] = auto_prompts else: # Add default auto prompts listener_config["auto_prompts"] = [ "What's happening in your world today?", "Any interesting thoughts to share?", "How are you feeling about recent events?", "What would you like to tell the network?" ] elif mode == "event": # Event mode specific settings queue_check = int(Prompt.ask("Queue check interval (seconds)", default="5")) listener_config["queue_check_interval"] = queue_check self.config["listener"] = listener_config console.print(f"\n[green]✅ Listener configured for {mode} mode[/green]") if mode == "poll": console.print(f"[dim]Agent will send prompts every {listener_config['poll_interval']} seconds[/dim]") def get_additional_config(self): """Get additional configuration options.""" console.print("\n[bold blue]⚙️ Additional Settings[/bold blue]") # Cache settings self.config["cache"] = { "did_cache_ttl": 3600, "max_cache_size": 1000 } # Bridge settings self.config["bridge"] = { "prompt_template": "[@{author}] {content}", "include_metadata": True } def save_config(self): """Save the configuration to a file.""" console.print("\n[bold blue]💾 Saving Configuration[/bold blue]") # Confirm filename filename = Prompt.ask( "Configuration filename", default=self.config_filename ) if not filename.endswith(('.yaml', '.yml')): filename += '.yaml' config_path = self.agents_dir / filename # Check if file exists if config_path.exists(): if not Confirm.ask(f"File {filename} already exists. Overwrite?", default=False): filename = Prompt.ask("Enter a different filename") config_path = self.agents_dir / filename try: with open(config_path, 'w') as f: yaml.dump(self.config, f, default_flow_style=False, indent=2, sort_keys=False) console.print(f"[green]✅ Configuration saved to: {config_path}[/green]") # Show usage instructions self.show_usage_instructions(filename) except Exception as e: console.print(f"[red]❌ Error saving configuration: {e}[/red]") sys.exit(1) def show_usage_instructions(self, filename): """Show instructions for using the new configuration.""" console.print("\n[bold green]🎉 Setup Complete![/bold green]") usage_panel = Panel( f"[bold]Your agent is ready to use![/bold]\n\n" f"[cyan]Start your agent:[/cyan]\n" f" python src/jetstream_letta_bridge.py --agent agents/{filename}\n" f" # or\n" f" ./run_agent.sh {filename.replace('.yaml', '')}\n\n" f"[cyan]Test with verbose output:[/cyan]\n" f" ./run_agent.sh {filename.replace('.yaml', '')} --verbose\n\n" f"[cyan]List all available agents:[/cyan]\n" f" ./run_agent.sh list\n\n" f"[dim]Configuration file: agents/{filename}[/dim]", title="Next Steps", title_align="left" ) console.print(usage_panel) # Validation and testing methods def validate_url(self, url: str) -> bool: """Validate URL format.""" try: result = urlparse(url) return all([result.scheme, result.netloc]) except: return False def validate_did(self, did: str) -> bool: """Validate DID format.""" # Basic DID format validation return did.startswith("did:") and len(did) > 10 def test_bluesky_auth(self) -> bool: """Test Bluesky authentication.""" try: auth_data = { "identifier": self.config["bluesky"]["username"], "password": self.config["bluesky"]["password"] } pds_uri = self.config["bluesky"]["pds_uri"] auth_url = f"{pds_uri}/xrpc/com.atproto.server.createSession" response = requests.post( auth_url, json=auth_data, timeout=10, headers={"Content-Type": "application/json"} ) return response.status_code == 200 except Exception as e: console.print(f"[red]Connection error: {e}[/red]") return False def get_letta_agents(self, api_key: str) -> Optional[List[Dict[str, Any]]]: """Get list of agents from Letta API.""" try: headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } # Try to get agents (adjust URL as needed for your Letta instance) response = requests.get( "https://api.letta.ai/v1/agents", # Adjust URL as needed headers=headers, timeout=10 ) if response.status_code == 200: agents_data = response.json() # Handle different response formats if isinstance(agents_data, list): return agents_data elif isinstance(agents_data, dict) and "agents" in agents_data: return agents_data["agents"] elif isinstance(agents_data, dict) and "data" in agents_data: return agents_data["data"] return None except Exception as e: console.print(f"[yellow]Could not fetch agents: {e}[/yellow]") return None async def run(self): """Run the complete setup wizard.""" try: self.welcome() self.get_agent_basic_info() self.get_bluesky_config() self.get_letta_config() self.get_listener_config() self.save_config() except KeyboardInterrupt: console.print("\n\n[yellow]🛑 Setup cancelled by user[/yellow]") sys.exit(0) except Exception as e: console.print(f"\n[red]❌ Setup error: {e}[/red]") sys.exit(1) def main(): """Main entry point.""" wizard = SetupWizard() asyncio.run(wizard.run()) if __name__ == "__main__": main()