a digital person for bluesky
at main 5.2 kB view raw
1#!/usr/bin/env python3 2""" 3Simplified minimal reproducible example for Letta dynamic block loading issue. 4 5This demonstrates the core issue: 61. A tool attaches a new block to an agent 72. Memory functions fail because agent_state doesn't reflect the new block 8""" 9 10import os 11import logging 12from dotenv import load_dotenv 13from letta_client import Letta 14 15logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') 16logger = logging.getLogger(__name__) 17 18load_dotenv() 19 20 21def main(): 22 """Demonstrate the dynamic block loading issue.""" 23 client = Letta(token=os.environ["LETTA_API_KEY"]) 24 25 # Use an existing agent or create one using the utils 26 agent_name = "test_block_issue" 27 28 # First, let's see if we can create an agent using the API directly 29 logger.info("Looking for or creating test agent...") 30 31 agents = client.agents.list() 32 agent = None 33 for a in agents: 34 if a.name == agent_name: 35 agent = a 36 logger.info(f"Found existing agent: {agent.name}") 37 break 38 39 if not agent: 40 # Create using simple params that work 41 agent = client.agents.create( 42 name=agent_name 43 ) 44 logger.info(f"Created agent: {agent.name} (ID: {agent.id})") 45 46 try: 47 # Get initial blocks 48 initial_blocks = client.agents.blocks.list(agent_id=str(agent.id)) 49 initial_labels = {block.label for block in initial_blocks} 50 logger.info(f"Initial blocks: {initial_labels}") 51 52 # Step 1: Create and attach a new block 53 test_handle = "testuser.bsky.social" 54 block_label = f"user_{test_handle.replace('.', '_')}" 55 56 # Check if block already attached 57 if block_label in initial_labels: 58 logger.info(f"Block {block_label} already attached, skipping creation") 59 else: 60 logger.info(f"Creating block: {block_label}") 61 62 # Check if block exists 63 existing_blocks = client.blocks.list(label=block_label) 64 if existing_blocks: 65 new_block = existing_blocks[0] 66 logger.info(f"Using existing block with ID: {new_block.id}") 67 else: 68 # Create the block 69 new_block = client.blocks.create( 70 label=block_label, 71 value=f"# User: {test_handle}\n\nInitial content.", 72 limit=5000 73 ) 74 logger.info(f"Created block with ID: {new_block.id}") 75 76 # Attach to agent 77 logger.info("Attaching block to agent...") 78 client.agents.blocks.attach( 79 agent_id=str(agent.id), 80 block_id=str(new_block.id) 81 ) 82 83 # Verify attachment via API 84 blocks_after = client.agents.blocks.list(agent_id=str(agent.id)) 85 labels_after = {block.label for block in blocks_after} 86 logger.info(f"Blocks after attachment: {labels_after}") 87 88 if block_label in labels_after: 89 logger.info("✓ Block successfully attached via API") 90 else: 91 logger.error("✗ Block NOT found via API after attachment") 92 93 # Step 2: Send a message asking the agent to use memory_insert on the new block 94 logger.info(f"\nAsking agent to update the newly attached block...") 95 96 from letta_client import MessageCreate 97 98 response = client.agents.messages.create( 99 agent_id=str(agent.id), 100 messages=[MessageCreate(role="user", content=f"Use memory_insert to add this text to the '{block_label}' memory block: 'This user likes AI and technology.'")] 101 ) 102 103 # Check for errors in the response 104 error_found = False 105 for message in response.messages: 106 if hasattr(message, 'text') and message.text: 107 logger.info(f"Agent: {message.text}") 108 109 # Look for tool returns with errors 110 if hasattr(message, 'type') and message.type == 'tool_return': 111 if hasattr(message, 'status') and message.status == 'error': 112 error_found = True 113 logger.error("ERROR REPRODUCED!") 114 if hasattr(message, 'tool_return'): 115 logger.error(f"Tool error: {message.tool_return}") 116 if hasattr(message, 'stderr') and message.stderr: 117 for err in message.stderr: 118 logger.error(f"Stderr: {err}") 119 120 if not error_found: 121 logger.info("No error found - checking if operation succeeded...") 122 123 # Get the block content to see if it was updated 124 updated_blocks = client.blocks.list(label=block_label) 125 if updated_blocks: 126 logger.info(f"Block content:\n{updated_blocks[0].value}") 127 128 finally: 129 # Cleanup - always delete test agent 130 if agent and agent.name == agent_name: 131 logger.info(f"\nDeleting test agent {agent.name}") 132 client.agents.delete(agent_id=str(agent.id)) 133 134 135if __name__ == "__main__": 136 main()