a digital person for bluesky
1#!/usr/bin/env python3
2"""Test script for X user block management functionality."""
3
4import sys
5import json
6from pathlib import Path
7
8# Add the current directory to path so we can import modules
9sys.path.append(str(Path(__file__).parent))
10
11from x import ensure_x_user_blocks_attached, get_cached_thread_context
12from tools.blocks import x_user_note_view
13
14def test_x_user_blocks():
15 """Test X user block creation and attachment."""
16
17 # Ensure we're using config file, not environment variable
18 import os
19 if 'LETTA_API_KEY' in os.environ:
20 print("⚠️ Removing LETTA_API_KEY environment variable to use config file")
21 del os.environ['LETTA_API_KEY']
22
23 # Use the cached thread data for testing
24 conversation_id = "1950690566909710618"
25 thread_data = get_cached_thread_context(conversation_id)
26
27 if not thread_data:
28 print("❌ No cached thread data found")
29 return False
30
31 print(f"✅ Loaded cached thread data for conversation {conversation_id}")
32 print(f"📊 Users in thread: {list(thread_data.get('users', {}).keys())}")
33
34 # Get the configured void agent ID
35 from config_loader import get_letta_config
36 letta_config = get_letta_config()
37 agent_id = letta_config['agent_id']
38 print(f"🎯 Using configured void agent {agent_id} for testing")
39
40 try:
41 # Test user block attachment
42 print(f"\n🔗 Testing X user block attachment for agent {agent_id}")
43 ensure_x_user_blocks_attached(thread_data, agent_id)
44 print("✅ X user block attachment completed")
45
46 # Test viewing created blocks
47 print("\n👀 Testing X user block viewing:")
48 for user_id in thread_data.get('users', {}):
49 try:
50 # Create mock agent state for testing
51 class MockAgentState:
52 def __init__(self, agent_id):
53 self.id = agent_id
54
55 agent_state = MockAgentState(agent_id)
56 content = x_user_note_view(user_id, agent_state)
57 print(f"📋 Block content for user {user_id}:")
58 print(content)
59 print("-" * 50)
60 except Exception as e:
61 print(f"❌ Error viewing block for user {user_id}: {e}")
62
63 return True
64
65 except Exception as e:
66 print(f"❌ Error testing X user blocks: {e}")
67 return False
68
69if __name__ == "__main__":
70 print("🧪 Testing X User Block Management")
71 print("=" * 40)
72
73 success = test_x_user_blocks()
74
75 if success:
76 print("\n✅ All X user block tests completed successfully!")
77 else:
78 print("\n❌ X user block tests failed!")
79 sys.exit(1)