a digital person for bluesky
1#!/usr/bin/env python3 2""" 3Simple CLI tool to converse with the Kaleidoscope collective. 4""" 5 6import os 7import sys 8from dotenv import load_dotenv 9from letta_client import Letta 10from rich.console import Console 11from rich.prompt import Prompt 12from rich.panel import Panel 13from rich.text import Text 14 15# Add parent directory to path for imports 16sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 17from config_loader import get_config 18 19load_dotenv() 20 21class KaleidoscopeChat: 22 def __init__(self): 23 """Initialize the Kaleidoscope chat client.""" 24 self.console = Console() 25 self.config = get_config() 26 27 # Initialize Letta client 28 self.letta_client = Letta( 29 base_url=self.config.get('letta.base_url', os.environ.get('LETTA_BASE_URL')), 30 token=self.config.get('letta.api_key', os.environ.get('LETTA_API_KEY')), 31 timeout=self.config.get('letta.timeout', 30) 32 ) 33 34 # Get project ID 35 self.project_id = self.config.get('letta.project_id', os.environ.get('LETTA_PROJECT_ID')) 36 if not self.project_id: 37 raise ValueError("Project ID must be set in config.yaml under letta.project_id or as LETTA_PROJECT_ID environment variable") 38 39 # Find the Kaleidoscope collective group 40 self.group_id = self._find_kaleidoscope_group() 41 42 self.console.print(Panel.fit( 43 "[bold cyan]Kaleidoscope Collective Chat[/bold cyan]\n" 44 f"Connected to group: {self.group_id}\n" 45 "Type 'exit' or 'quit' to leave, 'help' for commands", 46 title="🔮 Kaleidoscope Chat" 47 )) 48 49 def _find_kaleidoscope_group(self) -> str: 50 """Find the Kaleidoscope collective group.""" 51 try: 52 self.console.print("[dim]Searching for Kaleidoscope collective group...[/dim]") 53 54 # First, get the kaleidoscope-central agent ID 55 kaleidoscope_agents = self.letta_client.agents.list(name="kaleidoscope-central") 56 if not kaleidoscope_agents: 57 raise ValueError("Kaleidoscope central agent not found. Run create_kaleidoscope.py first.") 58 59 kaleidoscope_central_id = kaleidoscope_agents[0].id 60 self.console.print(f"[dim]Found kaleidoscope-central: {kaleidoscope_central_id[:8]}[/dim]") 61 62 # Get all groups (with and without project_id filter) 63 try: 64 groups = self.letta_client.groups.list() 65 self.console.print(f"[dim]Found {len(groups)} groups with project filter[/dim]") 66 except: 67 try: 68 groups = self.letta_client.groups.list() 69 self.console.print(f"[dim]Found {len(groups)} groups without project filter[/dim]") 70 except: 71 groups = [] 72 self.console.print("[dim]No groups found[/dim]") 73 74 # Look for groups with kaleidoscope-central as supervisor 75 for group in groups: 76 if hasattr(group, 'manager_config') and group.manager_config: 77 if hasattr(group.manager_config, 'manager_agent_id') and group.manager_config.manager_agent_id == kaleidoscope_central_id: 78 self.console.print(f"[dim]Found Kaleidoscope group: {group.id[:12]}[/dim]") 79 return group.id 80 81 # Look for the kaleidoscope group by description 82 for group in groups: 83 if hasattr(group, 'description') and group.description and 'the kaleidoscope' in group.description.lower(): 84 self.console.print(f"[dim]Found Kaleidoscope group by description: {group.id[:12]}[/dim]") 85 return group.id 86 87 # If still not found, try to find any group with kaleidoscope lenses 88 lens_agents = self.letta_client.agents.list(project_id=self.project_id, tags=["kaleidoscope-lens"]) 89 if lens_agents: 90 lens_ids = {lens.id for lens in lens_agents} 91 for group in groups: 92 try: 93 members = self.letta_client.groups.agents.list(group_id=group.id) 94 member_ids = {member.id for member in members} 95 if lens_ids & member_ids: # If any lens is in this group 96 self.console.print(f"[dim]Found group with Kaleidoscope lenses: {group.id[:12]}[/dim]") 97 return group.id 98 except: 99 continue 100 101 raise ValueError("Kaleidoscope collective group not found. Run 'python organon/create_kaleidoscope.py' to create the group.") 102 103 except Exception as e: 104 raise ValueError(f"Error finding Kaleidoscope group: {e}") 105 106 def _display_response(self, response): 107 """Display the group response in a formatted way.""" 108 if hasattr(response, 'messages') and response.messages: 109 for i, message in enumerate(response.messages): 110 # Determine the sender 111 sender = "Unknown" 112 if hasattr(message, 'agent_id'): 113 # Try to get agent name 114 try: 115 agent = self.letta_client.agents.retrieve(agent_id=message.agent_id) 116 sender = agent.name if hasattr(agent, 'name') else f"Agent {message.agent_id[:8]}" 117 except: 118 sender = f"Agent {message.agent_id[:8]}" 119 120 # Get message content 121 content = "" 122 if hasattr(message, 'text'): 123 content = message.text 124 elif hasattr(message, 'content'): 125 content = message.content 126 elif hasattr(message, 'message'): 127 content = message.message 128 elif message.message_type == "tool_return_message" and message.name == "send_message_to_all_agents_in_group": 129 content = "Lens perspectives:" 130 try: 131 # Parse the string representation of the list 132 import ast 133 responses = ast.literal_eval(message.tool_return) 134 135 # Add each response to the content 136 for response in responses: 137 content += f"\n - {response}" 138 except (ValueError, SyntaxError): 139 # Fallback if parsing fails 140 content += f"\n - {message.tool_return}" 141 else: 142 content = str(message) 143 144 # Color based on sender type 145 if "central" in sender.lower(): 146 border_color = "cyan" 147 icon = "🧠" 148 elif any(keyword in sender.lower() for keyword in ["lens", "pattern", "creative", "systems", "temporal"]): 149 border_color = "magenta" 150 icon = "🔮" 151 else: 152 border_color = "white" 153 icon = "💬" 154 155 self.console.print(Panel( 156 Text(content, style="white"), 157 title=f"{icon} {sender}", 158 border_style=border_color 159 )) 160 else: 161 self.console.print("[yellow]No response messages received[/yellow]") 162 163 def run(self): 164 """Run the interactive chat loop.""" 165 try: 166 while True: 167 # Get user input 168 user_input = Prompt.ask("\n[bold green]You[/bold green]") 169 170 # Handle special commands 171 if user_input.lower() in ['exit', 'quit', 'q']: 172 self.console.print("[yellow]Goodbye![/yellow]") 173 break 174 elif user_input.lower() == 'help': 175 self.console.print(Panel( 176 "[bold]Available commands:[/bold]\n" 177 "• Type any message to send to the Kaleidoscope collective\n" 178 "'help' - Show this help message\n" 179 "'info' - Show group information\n" 180 "'exit', 'quit', 'q' - Exit the chat", 181 title="Help" 182 )) 183 continue 184 elif user_input.lower() == 'info': 185 self._show_group_info() 186 continue 187 elif not user_input.strip(): 188 continue 189 190 # Send message to group 191 self.console.print("[dim]Sending to Kaleidoscope collective...[/dim]") 192 193 try: 194 response = self.letta_client.groups.messages.create( 195 group_id=self.group_id, 196 messages=[{ 197 "role": "user", 198 "content": user_input 199 }] 200 ) 201 202 self.console.print("\n[bold]Kaleidoscope Collective Response:[/bold]") 203 self._display_response(response) 204 205 except Exception as e: 206 self.console.print(f"[red]Error sending message: {e}[/red]") 207 208 except KeyboardInterrupt: 209 self.console.print("\n[yellow]Chat interrupted. Goodbye![/yellow]") 210 except Exception as e: 211 self.console.print(f"[red]Unexpected error: {e}[/red]") 212 213 def _show_group_info(self): 214 """Show information about the Kaleidoscope group.""" 215 try: 216 group = self.letta_client.groups.retrieve(group_id=self.group_id) 217 agents = self.letta_client.groups.agents.list(group_id=self.group_id) 218 219 info_text = f"[bold]Group ID:[/bold] {self.group_id}\n" 220 if hasattr(group, 'description'): 221 info_text += f"[bold]Description:[/bold] {group.description}\n" 222 223 info_text += f"[bold]Perspective Lenses:[/bold] {len(agents)}\n" 224 225 for agent in agents: 226 try: 227 agent_detail = self.letta_client.agents.retrieve(agent_id=agent.id) 228 name = agent_detail.name if hasattr(agent_detail, 'name') else agent.id[:8] 229 info_text += f"{name}\n" 230 except: 231 info_text += f"{agent.id[:8]}\n" 232 233 # Show supervisor info 234 if hasattr(group, 'manager_config') and group.manager_config: 235 if hasattr(group.manager_config, 'manager_agent_id'): 236 try: 237 supervisor = self.letta_client.agents.retrieve(agent_id=group.manager_config.manager_agent_id) 238 supervisor_name = supervisor.name if hasattr(supervisor, 'name') else group.manager_config.manager_agent_id[:8] 239 info_text += f"[bold]Central Synthesizer:[/bold] {supervisor_name}\n" 240 except: 241 info_text += f"[bold]Central Synthesizer:[/bold] {group.manager_config.manager_agent_id[:8]}\n" 242 243 self.console.print(Panel(info_text, title="Group Information")) 244 245 except Exception as e: 246 self.console.print(f"[red]Error getting group info: {e}[/red]") 247 248def main(): 249 """Main function.""" 250 try: 251 chat = KaleidoscopeChat() 252 chat.run() 253 except Exception as e: 254 console = Console() 255 console.print(f"[red]Failed to initialize chat: {e}[/red]") 256 sys.exit(1) 257 258if __name__ == "__main__": 259 main()