a digital person for bluesky

Add X thread context caching system to avoid rate limits

- Implement x_cache/ directory for caching thread data (1 hour TTL)
- Add get_cached_thread_context() and save_cached_thread_context()
- Update get_thread_context() with use_cache parameter
- Enhanced Letta integration test with config loading and error handling
- Rich formatting for better test output display

Caching prevents repeated API calls during development and testing

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+260 -3
x_cache
+196 -3
x.py
··· 8 8 from datetime import datetime 9 9 from pathlib import Path 10 10 from requests_oauthlib import OAuth1 11 + from rich import print as rprint 12 + from rich.panel import Panel 13 + from rich.text import Text 14 + 11 15 12 16 # Configure logging 13 17 logging.basicConfig( ··· 17 21 18 22 # X-specific file paths 19 23 X_QUEUE_DIR = Path("x_queue") 24 + X_CACHE_DIR = Path("x_cache") 20 25 X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json") 21 26 X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json") 22 27 ··· 192 197 logger.warning("Search request failed") 193 198 return [] 194 199 195 - def get_thread_context(self, conversation_id: str) -> Optional[List[Dict]]: 200 + def get_thread_context(self, conversation_id: str, use_cache: bool = True) -> Optional[List[Dict]]: 196 201 """ 197 202 Get all tweets in a conversation thread. 198 203 199 204 Args: 200 205 conversation_id: The conversation ID to fetch (should be the original tweet ID) 206 + use_cache: Whether to use cached data if available 201 207 202 208 Returns: 203 209 List of tweets in the conversation, ordered chronologically 204 210 """ 211 + # Check cache first if enabled 212 + if use_cache: 213 + cached_data = get_cached_thread_context(conversation_id) 214 + if cached_data: 215 + return cached_data 216 + 205 217 # First, get the original tweet directly since it might not appear in conversation search 206 218 original_tweet = None 207 219 try: ··· 254 266 # Sort chronologically (oldest first) 255 267 tweets.sort(key=lambda x: x.get('created_at', '')) 256 268 logger.info(f"Retrieved {len(tweets)} tweets in thread") 257 - return {"tweets": tweets, "users": users_data} 269 + 270 + thread_data = {"tweets": tweets, "users": users_data} 271 + 272 + # Cache the result 273 + if use_cache: 274 + save_cached_thread_context(conversation_id, thread_data) 275 + 276 + return thread_data 258 277 else: 259 278 logger.warning("No tweets found for thread context") 260 279 return None ··· 470 489 except Exception as e: 471 490 logger.error(f"Error saving mention to queue: {e}") 472 491 492 + # X Cache Functions 493 + def get_cached_thread_context(conversation_id: str) -> Optional[Dict]: 494 + """Load cached thread context if available.""" 495 + cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 496 + if cache_file.exists(): 497 + try: 498 + with open(cache_file, 'r') as f: 499 + cached_data = json.load(f) 500 + # Check if cache is recent (within 1 hour) 501 + from datetime import datetime, timedelta 502 + cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 503 + if datetime.now() - cached_time < timedelta(hours=1): 504 + logger.info(f"Using cached thread context for {conversation_id}") 505 + return cached_data.get('thread_data') 506 + except Exception as e: 507 + logger.warning(f"Error loading cached thread context: {e}") 508 + return None 509 + 510 + def save_cached_thread_context(conversation_id: str, thread_data: Dict): 511 + """Save thread context to cache.""" 512 + try: 513 + X_CACHE_DIR.mkdir(exist_ok=True) 514 + cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 515 + 516 + cache_data = { 517 + 'conversation_id': conversation_id, 518 + 'thread_data': thread_data, 519 + 'cached_at': datetime.now().isoformat() 520 + } 521 + 522 + with open(cache_file, 'w') as f: 523 + json.dump(cache_data, f, indent=2) 524 + 525 + logger.debug(f"Cached thread context for {conversation_id}") 526 + except Exception as e: 527 + logger.error(f"Error caching thread context: {e}") 528 + 473 529 def fetch_and_queue_mentions(username: str) -> int: 474 530 """ 475 531 Single-pass function to fetch new mentions and queue them. ··· 661 717 except Exception as e: 662 718 print(f"Thread context test failed: {e}") 663 719 720 + def test_letta_integration(agent_id: str = "agent-94a01ee3-3023-46e9-baf8-6172f09bee99"): 721 + """Test sending X thread context to Letta agent.""" 722 + try: 723 + from letta_client import Letta 724 + import json 725 + import yaml 726 + 727 + # Load full config to access letta section 728 + try: 729 + with open("config.yaml", 'r') as f: 730 + full_config = yaml.safe_load(f) 731 + 732 + letta_config = full_config.get('letta', {}) 733 + api_key = letta_config.get('api_key') 734 + 735 + if not api_key: 736 + # Try loading from environment as fallback 737 + import os 738 + api_key = os.getenv('LETTA_API_KEY') 739 + if not api_key: 740 + print("❌ LETTA_API_KEY not found in config.yaml or environment") 741 + print("Expected config structure:") 742 + print(" letta:") 743 + print(" api_key: your-letta-api-key") 744 + return 745 + else: 746 + print("ℹ️ Using LETTA_API_KEY from environment") 747 + else: 748 + print("ℹ️ Using LETTA_API_KEY from config.yaml") 749 + 750 + except Exception as e: 751 + print(f"❌ Error loading config: {e}") 752 + return 753 + 754 + letta_client = Letta(token=api_key, timeout=600) 755 + print(f"🤖 Connected to Letta, using agent: {agent_id}") 756 + 757 + # Find a queued mention file 758 + queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 759 + if not queue_files: 760 + print("❌ No queued mentions found. Run 'python x.py queue' first.") 761 + return 762 + 763 + # Read the first mention 764 + mention_file = queue_files[0] 765 + with open(mention_file, 'r') as f: 766 + mention_data = json.load(f) 767 + 768 + mention = mention_data['mention'] 769 + conversation_id = mention.get('conversation_id') 770 + 771 + if not conversation_id: 772 + print("❌ No conversation_id found in mention.") 773 + return 774 + 775 + print(f"🧵 Getting thread context for conversation: {conversation_id}") 776 + 777 + # Get thread context 778 + x_client = create_x_client() 779 + thread_data = x_client.get_thread_context(conversation_id) 780 + 781 + if not thread_data: 782 + print("❌ Failed to retrieve thread context") 783 + return 784 + 785 + # Convert to YAML 786 + yaml_thread = thread_to_yaml_string(thread_data) 787 + 788 + # Create prompt for the agent 789 + prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately. 790 + 791 + Here is the thread context: 792 + 793 + {yaml_thread} 794 + 795 + Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona.""" 796 + 797 + print(f"📤 Sending thread context to Letta agent...") 798 + 799 + # Print the prompt in a rich panel 800 + rprint(Panel(prompt, title="Prompt", border_style="blue")) 801 + 802 + # List out all available agents 803 + try: 804 + agents_response = letta_client.projects.list() 805 + print("📋 Available projects:") 806 + if isinstance(agents_response, tuple) and len(agents_response) > 1: 807 + projects = agents_response[1] # The actual projects list 808 + for project in projects: 809 + print(f" - {project.name} (ID: {project.id})") 810 + else: 811 + print(" No projects found or unexpected response format") 812 + except Exception as e: 813 + print(f"❌ Error listing projects: {e}") 814 + 815 + print(f"\n🤖 Using agent ID: {agent_id}") 816 + 817 + # Send to Letta agent using streaming 818 + message_stream = letta_client.agents.messages.create_stream( 819 + agent_id=agent_id, 820 + messages=[{"role": "user", "content": prompt}], 821 + stream_tokens=False, 822 + max_steps=10 823 + ) 824 + 825 + print("🔄 Streaming response from agent...") 826 + response_text = "" 827 + 828 + for chunk in message_stream: 829 + print(chunk) 830 + if hasattr(chunk, 'message_type'): 831 + if chunk.message_type == 'assistant_message': 832 + print(f"🤖 Agent response: {chunk.content}") 833 + response_text = chunk.content 834 + elif chunk.message_type == 'reasoning_message': 835 + print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...") 836 + elif chunk.message_type == 'tool_call_message': 837 + print(f"🔧 Agent tool call: {chunk.tool_call.name}") 838 + 839 + if response_text: 840 + print(f"\n✅ Agent generated response:") 841 + print(f"📝 Response: {response_text}") 842 + else: 843 + print("❌ No response generated by agent") 844 + 845 + except Exception as e: 846 + print(f"Letta integration test failed: {e}") 847 + import traceback 848 + traceback.print_exc() 849 + 664 850 def test_x_client(): 665 851 """Test the X client by fetching mentions.""" 666 852 try: ··· 795 981 796 982 if __name__ == "__main__": 797 983 import sys 984 + 798 985 if len(sys.argv) > 1: 799 986 if sys.argv[1] == "loop": 800 987 x_notification_loop() ··· 808 995 test_fetch_and_queue() 809 996 elif sys.argv[1] == "thread": 810 997 test_thread_context() 998 + elif sys.argv[1] == "letta": 999 + # Use specific agent ID if provided, otherwise use default 1000 + agent_id = sys.argv[2] if len(sys.argv) > 2 else "agent-94a01ee3-3023-46e9-baf8-6172f09bee99" 1001 + test_letta_integration(agent_id) 811 1002 else: 812 - print("Usage: python x.py [loop|reply|me|search|queue|thread]") 1003 + print("Usage: python x.py [loop|reply|me|search|queue|thread|letta]") 813 1004 print(" loop - Run the notification monitoring loop") 814 1005 print(" reply - Reply to Cameron's specific post") 815 1006 print(" me - Get authenticated user info and correct user ID") 816 1007 print(" search - Test search-based mention detection") 817 1008 print(" queue - Test fetch and queue mentions (single pass)") 818 1009 print(" thread - Test thread context retrieval from queued mention") 1010 + print(" letta - Test sending thread context to Letta agent") 1011 + print(" Optional: python x.py letta <agent-id>") 819 1012 else: 820 1013 test_x_client()
+64
x_cache/thread_1950690566909710618.json
··· 1 + { 2 + "conversation_id": "1950690566909710618", 3 + "thread_data": { 4 + "tweets": [ 5 + { 6 + "text": "hey @void_comind", 7 + "conversation_id": "1950690566909710618", 8 + "created_at": "2025-07-30T22:50:47.000Z", 9 + "author_id": "1232326955652931584", 10 + "edit_history_tweet_ids": [ 11 + "1950690566909710618" 12 + ], 13 + "id": "1950690566909710618" 14 + }, 15 + { 16 + "created_at": "2025-07-30T23:56:31.000Z", 17 + "in_reply_to_user_id": "1232326955652931584", 18 + "id": "1950707109240373317", 19 + "text": "@cameron_pfiffer Hello from void! \ud83e\udd16 Testing X integration.", 20 + "referenced_tweets": [ 21 + { 22 + "type": "replied_to", 23 + "id": "1950690566909710618" 24 + } 25 + ], 26 + "conversation_id": "1950690566909710618", 27 + "author_id": "1950680610282094592", 28 + "edit_history_tweet_ids": [ 29 + "1950707109240373317" 30 + ] 31 + }, 32 + { 33 + "created_at": "2025-07-31T00:26:17.000Z", 34 + "in_reply_to_user_id": "1950680610282094592", 35 + "id": "1950714596828061885", 36 + "text": "@void_comind sup", 37 + "referenced_tweets": [ 38 + { 39 + "type": "replied_to", 40 + "id": "1950707109240373317" 41 + } 42 + ], 43 + "conversation_id": "1950690566909710618", 44 + "author_id": "1232326955652931584", 45 + "edit_history_tweet_ids": [ 46 + "1950714596828061885" 47 + ] 48 + } 49 + ], 50 + "users": { 51 + "1232326955652931584": { 52 + "id": "1232326955652931584", 53 + "name": "Cameron Pfiffer the \ud835\udc04\ud835\udc22\ud835\udc20\ud835\udc1e\ud835\udc27\ud835\udc1a\ud835\udc1d\ud835\udc26\ud835\udc22\ud835\udc27", 54 + "username": "cameron_pfiffer" 55 + }, 56 + "1950680610282094592": { 57 + "id": "1950680610282094592", 58 + "name": "void", 59 + "username": "void_comind" 60 + } 61 + } 62 + }, 63 + "cached_at": "2025-07-30T17:44:47.805330" 64 + }