a digital person for bluesky

Add complete X (Twitter) integration with Letta agent support

## Core X Integration Features
- **X API Client**: Full X API v2 integration with OAuth 1.0a support
- **Queue System**: Fetch mentions and process separately for rate limit management
- **Thread Context**: Convert X conversations to YAML for agent comprehension
- **User Block Management**: X-specific user blocks with format `x_user_<user_id>`

## X Tool System
- **register_x_tools.py**: X-specific tool registration following Bluesky patterns
- **tools/x_thread.py**: X thread tool with 280 character limit validation
- **X_TOOL_APPROACH.md**: Documentation explaining tool-as-signal pattern

## Bot Functionality
- **process_x_mention()**: Complete X mention processing with Letta agent integration
- **X Reply Handling**: Thread construction and posting with proper reply chaining
- **Acknowledgment System**: X post acknowledgment tracking (file-based)
- **Testing Mode**: Safe testing without actual X posts

## Manual Queue Management
- `python x.py queue` - Fetch mentions only (rate limit safe)
- `python x.py process` - Process queued mentions only
- `python x.py bot` - Full bot loop (like bsky.py)

## Configuration Fix
- **config_loader.py**: Fixed to use config.yaml only, ignore environment variables

## Key Differences from Bluesky
- Character limit: 280 vs 300
- User identification: Numeric IDs vs handles
- Block format: `x_user_<user_id>` vs `user_<handle>`
- Acknowledgments: File-based vs stream.thought.ack API

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

+112
X_TOOL_APPROACH.md
··· 1 + # X Tool Approach 2 + 3 + This document explains how X tools work differently from Bluesky tools, following the "keep it simple, stupid" principle. 4 + 5 + ## Core Philosophy 6 + 7 + X tools follow the same pattern as Bluesky tools in `bsky.py`: 8 + 9 + 1. **Tools are signals, not actions** - They don't perform network operations 10 + 2. **Handler processes tool calls** - The main bot loop (like `bsky.py`) processes tool results 11 + 3. **Atomic operations** - Each tool call represents one discrete action 12 + 4. **Thread construction** - Multiple tool calls are aggregated into threads by the handler 13 + 14 + ## X Tool Set 15 + 16 + The X tool set is intentionally minimal: 17 + 18 + ### Core Tools (Kept from Bluesky) 19 + - `halt_activity` - Signal to terminate the bot 20 + - `ignore_notification` - Explicitly ignore a notification without replying 21 + - `annotate_ack` - Add notes to acknowledgment records 22 + - `create_whitewind_blog_post` - Create blog posts (platform-agnostic) 23 + - `fetch_webpage` - Fetch and process web content (platform-agnostic) 24 + 25 + ### X-Specific User Block Tools 26 + - `attach_x_user_blocks` - Attach X user memory blocks (format: `x_user_<user_id>`) 27 + - `detach_x_user_blocks` - Detach X user memory blocks 28 + - `x_user_note_append` - Append to X user memory blocks 29 + - `x_user_note_replace` - Replace text in X user memory blocks 30 + - `x_user_note_set` - Set complete content of X user memory blocks 31 + - `x_user_note_view` - View X user memory block content 32 + 33 + ### X Thread Tool 34 + - `add_post_to_x_thread` - Signal to add a post to the reply thread (max 280 chars) 35 + 36 + ## Implementation Pattern 37 + 38 + ### Tool Implementation (`tools/x_thread.py`) 39 + ```python 40 + def add_post_to_x_thread(text: str) -> str: 41 + # Validate input 42 + if len(text) > 280: 43 + raise Exception(f"Text exceeds 280 character limit...") 44 + 45 + # Return confirmation - actual posting handled by x_bot.py 46 + return f"X post queued for reply thread: {text[:50]}..." 47 + ``` 48 + 49 + ### Handler Implementation (Future `x_bot.py`) 50 + ```python 51 + # Extract successful tool calls from agent response 52 + reply_candidates = [] 53 + for message in message_response.messages: 54 + if message.tool_call.name == 'add_post_to_x_thread': 55 + if tool_status == 'success': 56 + reply_candidates.append(text) 57 + 58 + # Aggregate into thread and post to X 59 + if reply_candidates: 60 + # Build thread structure 61 + # Post to X using x.py client 62 + # Handle threading/replies properly 63 + ``` 64 + 65 + ## Key Differences from Bluesky 66 + 67 + 1. **Character Limit**: X uses 280 characters vs Bluesky's 300 68 + 2. **User Identification**: X uses numeric user IDs vs Bluesky handles 69 + 3. **Block Format**: `x_user_<user_id>` vs `user_<handle>` 70 + 4. **No Language Parameter**: X doesn't use language codes like Bluesky 71 + 72 + ## Thread Context Integration 73 + 74 + X threads include user IDs for block management: 75 + 76 + ```yaml 77 + conversation: 78 + - text: "hey @void_comind" 79 + author: 80 + username: "cameron_pfiffer" 81 + name: "Cameron Pfiffer" 82 + author_id: "1232326955652931584" # Used for x_user_1232326955652931584 block 83 + ``` 84 + 85 + The `ensure_x_user_blocks_attached()` function in `x.py` automatically creates and attaches user blocks based on thread participants. 86 + 87 + ## Registration 88 + 89 + Use `register_x_tools.py` to register X-specific tools: 90 + 91 + ```bash 92 + # Register all X tools 93 + python register_x_tools.py 94 + 95 + # Register specific tools 96 + python register_x_tools.py --tools add_post_to_x_thread x_user_note_append 97 + 98 + # List available tools 99 + python register_x_tools.py --list 100 + ``` 101 + 102 + ## Future Implementation 103 + 104 + When creating the X bot handler (similar to `bsky.py`): 105 + 106 + 1. Parse X mentions/replies 107 + 2. Get thread context using `x.py` 108 + 3. Attach user blocks with `ensure_x_user_blocks_attached()` 109 + 4. Send to Letta agent with X tools 110 + 5. Process `add_post_to_x_thread` tool calls 111 + 6. Post replies to X using `x.py` client 112 + 7. Handle threading and reply context properly
+1 -1
config_loader.py
··· 172 172 """Get Letta configuration.""" 173 173 config = get_config() 174 174 return { 175 - 'api_key': config.get_required('letta.api_key', 'LETTA_API_KEY'), 175 + 'api_key': config.get_required('letta.api_key'), 176 176 'timeout': config.get('letta.timeout', 600), 177 177 'project_id': config.get_required('letta.project_id'), 178 178 'agent_id': config.get_required('letta.agent_id'),
+226
register_x_tools.py
··· 1 + #!/usr/bin/env python3 2 + """Register X-specific tools with a Letta agent.""" 3 + import logging 4 + from typing import List 5 + from letta_client import Letta 6 + from rich.console import Console 7 + from rich.table import Table 8 + from config_loader import get_letta_config 9 + 10 + # Import standalone functions and their schemas 11 + from tools.blocks import ( 12 + attach_x_user_blocks, detach_x_user_blocks, 13 + x_user_note_append, x_user_note_replace, x_user_note_set, x_user_note_view, 14 + AttachXUserBlocksArgs, DetachXUserBlocksArgs, 15 + XUserNoteAppendArgs, XUserNoteReplaceArgs, XUserNoteSetArgs, XUserNoteViewArgs 16 + ) 17 + from tools.halt import halt_activity, HaltArgs 18 + from tools.ignore import ignore_notification, IgnoreNotificationArgs 19 + from tools.whitewind import create_whitewind_blog_post, WhitewindPostArgs 20 + from tools.ack import annotate_ack, AnnotateAckArgs 21 + from tools.webpage import fetch_webpage, WebpageArgs 22 + 23 + # Import X thread tool 24 + from tools.x_thread import add_post_to_x_thread, XThreadPostArgs 25 + 26 + letta_config = get_letta_config() 27 + logging.basicConfig(level=logging.INFO) 28 + logger = logging.getLogger(__name__) 29 + console = Console() 30 + 31 + # X-specific tool configurations 32 + X_TOOL_CONFIGS = [ 33 + # Keep these core tools 34 + { 35 + "func": halt_activity, 36 + "args_schema": HaltArgs, 37 + "description": "Signal to halt all bot activity and terminate X bot", 38 + "tags": ["control", "halt", "terminate"] 39 + }, 40 + { 41 + "func": ignore_notification, 42 + "args_schema": IgnoreNotificationArgs, 43 + "description": "Explicitly ignore an X notification without replying", 44 + "tags": ["notification", "ignore", "control", "bot"] 45 + }, 46 + { 47 + "func": annotate_ack, 48 + "args_schema": AnnotateAckArgs, 49 + "description": "Add a note to the acknowledgment record for the current X interaction", 50 + "tags": ["acknowledgment", "note", "annotation", "metadata"] 51 + }, 52 + { 53 + "func": create_whitewind_blog_post, 54 + "args_schema": WhitewindPostArgs, 55 + "description": "Create a blog post on Whitewind with markdown support", 56 + "tags": ["whitewind", "blog", "post", "markdown"] 57 + }, 58 + { 59 + "func": fetch_webpage, 60 + "args_schema": WebpageArgs, 61 + "description": "Fetch a webpage and convert it to markdown/text format using Jina AI reader", 62 + "tags": ["web", "fetch", "webpage", "markdown", "jina"] 63 + }, 64 + 65 + # X user block management tools 66 + { 67 + "func": attach_x_user_blocks, 68 + "args_schema": AttachXUserBlocksArgs, 69 + "description": "Attach X user-specific memory blocks to the agent. Creates blocks if they don't exist.", 70 + "tags": ["memory", "blocks", "user", "x", "twitter"] 71 + }, 72 + { 73 + "func": detach_x_user_blocks, 74 + "args_schema": DetachXUserBlocksArgs, 75 + "description": "Detach X user-specific memory blocks from the agent. Blocks are preserved for later use.", 76 + "tags": ["memory", "blocks", "user", "x", "twitter"] 77 + }, 78 + { 79 + "func": x_user_note_append, 80 + "args_schema": XUserNoteAppendArgs, 81 + "description": "Append a note to an X user's memory block. Creates the block if it doesn't exist.", 82 + "tags": ["memory", "blocks", "user", "append", "x", "twitter"] 83 + }, 84 + { 85 + "func": x_user_note_replace, 86 + "args_schema": XUserNoteReplaceArgs, 87 + "description": "Replace text in an X user's memory block.", 88 + "tags": ["memory", "blocks", "user", "replace", "x", "twitter"] 89 + }, 90 + { 91 + "func": x_user_note_set, 92 + "args_schema": XUserNoteSetArgs, 93 + "description": "Set the complete content of an X user's memory block.", 94 + "tags": ["memory", "blocks", "user", "set", "x", "twitter"] 95 + }, 96 + { 97 + "func": x_user_note_view, 98 + "args_schema": XUserNoteViewArgs, 99 + "description": "View the content of an X user's memory block.", 100 + "tags": ["memory", "blocks", "user", "view", "x", "twitter"] 101 + }, 102 + 103 + # X thread tool 104 + { 105 + "func": add_post_to_x_thread, 106 + "args_schema": XThreadPostArgs, 107 + "description": "Add a single post to the current X reply thread atomically", 108 + "tags": ["x", "twitter", "reply", "thread", "atomic"] 109 + } 110 + ] 111 + 112 + def register_x_tools(agent_id: str = None, tools: List[str] = None): 113 + """Register X-specific tools with a Letta agent. 114 + 115 + Args: 116 + agent_id: ID of the agent to attach tools to. If None, uses config default. 117 + tools: List of tool names to register. If None, registers all tools. 118 + """ 119 + # Use agent ID from config if not provided 120 + if agent_id is None: 121 + agent_id = letta_config['agent_id'] 122 + 123 + try: 124 + # Initialize Letta client with API key from config 125 + client = Letta(token=letta_config['api_key'], timeout=letta_config['timeout']) 126 + 127 + # Get the agent by ID 128 + try: 129 + agent = client.agents.retrieve(agent_id=agent_id) 130 + except Exception as e: 131 + console.print(f"[red]Error: Agent '{agent_id}' not found[/red]") 132 + console.print(f"Error details: {e}") 133 + return 134 + 135 + # Filter tools if specific ones requested 136 + tools_to_register = X_TOOL_CONFIGS 137 + if tools: 138 + tools_to_register = [t for t in X_TOOL_CONFIGS if t["func"] and t["func"].__name__ in tools] 139 + if len(tools_to_register) != len(tools): 140 + registered_names = {t["func"].__name__ for t in tools_to_register if t["func"]} 141 + missing = set(tools) - registered_names 142 + console.print(f"[yellow]Warning: Unknown tools: {missing}[/yellow]") 143 + 144 + # Create results table 145 + table = Table(title=f"X Tool Registration for Agent '{agent.name}' ({agent_id})") 146 + table.add_column("Tool", style="cyan") 147 + table.add_column("Status", style="green") 148 + table.add_column("Description") 149 + 150 + # Register each tool 151 + for tool_config in tools_to_register: 152 + func = tool_config["func"] 153 + if not func: 154 + continue 155 + 156 + tool_name = func.__name__ 157 + 158 + try: 159 + # Create or update the tool using the standalone function 160 + created_tool = client.tools.upsert_from_function( 161 + func=func, 162 + args_schema=tool_config["args_schema"], 163 + tags=tool_config["tags"] 164 + ) 165 + 166 + # Get current agent tools 167 + current_tools = client.agents.tools.list(agent_id=str(agent.id)) 168 + tool_names = [t.name for t in current_tools] 169 + 170 + # Check if already attached 171 + if created_tool.name in tool_names: 172 + table.add_row(tool_name, "Already Attached", tool_config["description"]) 173 + else: 174 + # Attach to agent 175 + client.agents.tools.attach( 176 + agent_id=str(agent.id), 177 + tool_id=str(created_tool.id) 178 + ) 179 + table.add_row(tool_name, "✓ Attached", tool_config["description"]) 180 + 181 + except Exception as e: 182 + table.add_row(tool_name, f"✗ Error: {str(e)}", tool_config["description"]) 183 + logger.error(f"Error registering tool {tool_name}: {e}") 184 + 185 + console.print(table) 186 + 187 + except Exception as e: 188 + console.print(f"[red]Error: {str(e)}[/red]") 189 + logger.error(f"Fatal error: {e}") 190 + 191 + 192 + def list_available_x_tools(): 193 + """List all available X tools.""" 194 + table = Table(title="Available X Tools") 195 + table.add_column("Tool Name", style="cyan") 196 + table.add_column("Description") 197 + table.add_column("Tags", style="dim") 198 + 199 + for tool_config in X_TOOL_CONFIGS: 200 + if tool_config["func"]: 201 + table.add_row( 202 + tool_config["func"].__name__, 203 + tool_config["description"], 204 + ", ".join(tool_config["tags"]) 205 + ) 206 + 207 + console.print(table) 208 + 209 + 210 + if __name__ == "__main__": 211 + import argparse 212 + 213 + parser = argparse.ArgumentParser(description="Register X tools with a Letta agent") 214 + parser.add_argument("--agent-id", help=f"Agent ID (default: from config)") 215 + parser.add_argument("--tools", nargs="+", help="Specific tools to register (default: all)") 216 + parser.add_argument("--list", action="store_true", help="List available tools") 217 + 218 + args = parser.parse_args() 219 + 220 + if args.list: 221 + list_available_x_tools() 222 + else: 223 + # Use config default if no agent specified 224 + agent_id = args.agent_id if args.agent_id else letta_config['agent_id'] 225 + console.print(f"\n[bold]Registering X tools for agent: {agent_id}[/bold]\n") 226 + register_x_tools(agent_id, args.tools)
+42
tools/x_thread.py
··· 1 + """Thread tool for adding posts to X threads atomically.""" 2 + from typing import Optional 3 + from pydantic import BaseModel, Field, validator 4 + 5 + 6 + class XThreadPostArgs(BaseModel): 7 + text: str = Field( 8 + ..., 9 + description="Text content for the X post (max 280 characters)" 10 + ) 11 + 12 + @validator('text') 13 + def validate_text_length(cls, v): 14 + if len(v) > 280: 15 + raise ValueError(f"Text exceeds 280 character limit (current: {len(v)} characters)") 16 + return v 17 + 18 + 19 + def add_post_to_x_thread(text: str) -> str: 20 + """ 21 + Add a single post to the current X reply thread. This tool indicates to the handler 22 + that it should add this post to the ongoing reply thread context when responding to a mention. 23 + 24 + This is an atomic operation - each call adds exactly one post. The handler (x_bot.py or similar) 25 + manages the thread state and ensures proper threading when multiple posts are queued. 26 + 27 + Args: 28 + text: Text content for the post (max 280 characters) 29 + 30 + Returns: 31 + Confirmation message that the post has been queued for the reply thread 32 + 33 + Raises: 34 + Exception: If text exceeds character limit. On failure, the post will be omitted 35 + from the reply thread and the agent may try again with corrected text. 36 + """ 37 + # Validate input 38 + if len(text) > 280: 39 + raise Exception(f"Text exceeds 280 character limit (current: {len(text)} characters). This post will be omitted from the thread. You may try again with shorter text.") 40 + 41 + # Return confirmation - the actual posting will be handled by x_bot.py or similar 42 + return f"X post queued for reply thread: {text[:50]}{'...' if len(text) > 50 else ''}"
+586 -86
x.py
··· 792 792 except Exception as e: 793 793 print(f"Thread context test failed: {e}") 794 794 795 - def test_letta_integration(agent_id: str = "agent-94a01ee3-3023-46e9-baf8-6172f09bee99"): 795 + def test_letta_integration(agent_id: str = None): 796 796 """Test sending X thread context to Letta agent.""" 797 797 try: 798 798 from letta_client import Letta ··· 806 806 807 807 letta_config = full_config.get('letta', {}) 808 808 api_key = letta_config.get('api_key') 809 + config_agent_id = letta_config.get('agent_id') 810 + 811 + # Use agent_id from config if not provided as parameter 812 + if not agent_id: 813 + if config_agent_id: 814 + agent_id = config_agent_id 815 + print(f"ℹ️ Using agent_id from config: {agent_id}") 816 + else: 817 + print("❌ No agent_id found in config.yaml") 818 + print("Expected config structure:") 819 + print(" letta:") 820 + print(" agent_id: your-agent-id") 821 + return 822 + else: 823 + print(f"ℹ️ Using provided agent_id: {agent_id}") 809 824 810 825 if not api_key: 811 826 # Try loading from environment as fallback ··· 874 889 # Print the prompt in a rich panel 875 890 rprint(Panel(prompt, title="Prompt", border_style="blue")) 876 891 877 - # List out all available agents 878 - try: 879 - agents_response = letta_client.projects.list() 880 - print("📋 Available projects:") 881 - if isinstance(agents_response, tuple) and len(agents_response) > 1: 882 - projects = agents_response[1] # The actual projects list 883 - for project in projects: 884 - print(f" - {project.name} (ID: {project.id})") 885 - else: 886 - print(" No projects found or unexpected response format") 887 - except Exception as e: 888 - print(f"❌ Error listing projects: {e}") 889 - 890 - print(f"\n🤖 Using agent ID: {agent_id}") 891 - 892 892 # Send to Letta agent using streaming 893 893 message_stream = letta_client.agents.messages.create_stream( 894 894 agent_id=agent_id, ··· 970 970 except Exception as e: 971 971 print(f"Reply failed: {e}") 972 972 973 - def x_notification_loop(): 973 + def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False): 974 974 """ 975 - X notification loop using search-based mention detection. 976 - Uses search endpoint instead of mentions endpoint for better rate limits. 975 + Process an X mention and generate a reply using the Letta agent. 976 + Similar to bsky.py process_mention but for X/Twitter. 977 + 978 + Args: 979 + void_agent: The Letta agent instance 980 + x_client: The X API client 981 + mention_data: The mention data dictionary 982 + queue_filepath: Optional Path object to the queue file (for cleanup on halt) 983 + testing_mode: If True, don't actually post to X 984 + 985 + Returns: 986 + True: Successfully processed, remove from queue 987 + False: Failed but retryable, keep in queue 988 + None: Failed with non-retryable error, move to errors directory 989 + "no_reply": No reply was generated, move to no_reply directory 977 990 """ 978 - import time 979 - import json 980 - from pathlib import Path 991 + try: 992 + logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}") 993 + 994 + # Extract mention details 995 + if isinstance(mention_data, dict): 996 + # Handle both raw mention and queued mention formats 997 + if 'mention' in mention_data: 998 + mention = mention_data['mention'] 999 + else: 1000 + mention = mention_data 1001 + else: 1002 + mention = mention_data 1003 + 1004 + mention_id = mention.get('id') 1005 + mention_text = mention.get('text', '') 1006 + author_id = mention.get('author_id') 1007 + conversation_id = mention.get('conversation_id') 1008 + 1009 + logger.debug(f"Extracted data - ID: {mention_id}, Author: {author_id}, Text: {mention_text[:50]}...") 1010 + 1011 + if not conversation_id: 1012 + logger.warning(f"No conversation_id found for mention {mention_id}") 1013 + return None 1014 + 1015 + # Get thread context 1016 + try: 1017 + thread_data = x_client.get_thread_context(conversation_id) 1018 + if not thread_data: 1019 + logger.error(f"Failed to get thread context for conversation {conversation_id}") 1020 + return False 1021 + except Exception as e: 1022 + logger.error(f"Error getting thread context: {e}") 1023 + return False 1024 + 1025 + # Convert to YAML string 1026 + thread_context = thread_to_yaml_string(thread_data) 1027 + logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 1028 + 1029 + # Check for #voidstop 1030 + if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower(): 1031 + logger.info("Found #voidstop, skipping this mention") 1032 + return True 1033 + 1034 + # Ensure X user blocks are attached 1035 + try: 1036 + ensure_x_user_blocks_attached(thread_data, void_agent.id) 1037 + except Exception as e: 1038 + logger.warning(f"Failed to ensure X user blocks: {e}") 1039 + # Continue without user blocks rather than failing completely 1040 + 1041 + # Create prompt for Letta agent 1042 + author_info = thread_data.get('users', {}).get(author_id, {}) 1043 + author_username = author_info.get('username', 'unknown') 1044 + author_name = author_info.get('name', author_username) 1045 + 1046 + prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}). 1047 + 1048 + MOST RECENT POST (the mention you're responding to): 1049 + "{mention_text}" 1050 + 1051 + FULL THREAD CONTEXT: 1052 + ```yaml 1053 + {thread_context} 1054 + ``` 1055 + 1056 + The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 1057 + 1058 + To reply, use the add_post_to_x_thread tool: 1059 + - Each call creates one post (max 280 characters) 1060 + - For most responses, a single call is sufficient 1061 + - Only use multiple calls for threaded replies when: 1062 + * The topic requires extended explanation that cannot fit in 280 characters 1063 + * You're explicitly asked for a detailed/long response 1064 + * The conversation naturally benefits from a structured multi-part answer 1065 + - Avoid unnecessary threads - be concise when possible""" 1066 + 1067 + # Log mention processing 1068 + title = f"X MENTION FROM @{author_username}" 1069 + print(f"\n▶ {title}") 1070 + print(f" {'═' * len(title)}") 1071 + for line in mention_text.split('\n'): 1072 + print(f" {line}") 1073 + 1074 + # Send to Letta agent 1075 + from config_loader import get_letta_config 1076 + from letta_client import Letta 1077 + 1078 + config = get_letta_config() 1079 + letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 1080 + 1081 + logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars") 1082 + 1083 + try: 1084 + # Use streaming to avoid timeout errors 1085 + message_stream = letta_client.agents.messages.create_stream( 1086 + agent_id=void_agent.id, 1087 + messages=[{"role": "user", "content": prompt}], 1088 + stream_tokens=False, 1089 + max_steps=100 1090 + ) 1091 + 1092 + # Collect streaming response (simplified version of bsky.py logic) 1093 + all_messages = [] 1094 + for chunk in message_stream: 1095 + if hasattr(chunk, 'message_type'): 1096 + if chunk.message_type == 'reasoning_message': 1097 + print("\n◆ Reasoning") 1098 + print(" ─────────") 1099 + for line in chunk.reasoning.split('\n'): 1100 + print(f" {line}") 1101 + elif chunk.message_type == 'tool_call_message': 1102 + tool_name = chunk.tool_call.name 1103 + if tool_name == 'add_post_to_x_thread': 1104 + try: 1105 + args = json.loads(chunk.tool_call.arguments) 1106 + text = args.get('text', '') 1107 + if text: 1108 + print("\n✎ X Post") 1109 + print(" ────────") 1110 + for line in text.split('\n'): 1111 + print(f" {line}") 1112 + except: 1113 + pass 1114 + elif tool_name == 'halt_activity': 1115 + logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT") 1116 + if queue_filepath and queue_filepath.exists(): 1117 + queue_filepath.unlink() 1118 + logger.info(f"Deleted queue file: {queue_filepath.name}") 1119 + logger.info("=== X BOT TERMINATED BY AGENT ===") 1120 + exit(0) 1121 + elif chunk.message_type == 'tool_return_message': 1122 + tool_name = chunk.name 1123 + status = chunk.status 1124 + if status == 'success' and tool_name == 'add_post_to_x_thread': 1125 + print("\n✓ X Post Queued") 1126 + print(" ──────────────") 1127 + print(" Post queued successfully") 1128 + elif chunk.message_type == 'assistant_message': 1129 + print("\n▶ Assistant Response") 1130 + print(" ──────────────────") 1131 + for line in chunk.content.split('\n'): 1132 + print(f" {line}") 1133 + 1134 + all_messages.append(chunk) 1135 + if str(chunk) == 'done': 1136 + break 1137 + 1138 + # Convert streaming response for compatibility 1139 + message_response = type('StreamingResponse', (), { 1140 + 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 1141 + })() 1142 + 1143 + except Exception as api_error: 1144 + logger.error(f"Letta API error: {api_error}") 1145 + raise 1146 + 1147 + # Extract successful add_post_to_x_thread tool calls 1148 + reply_candidates = [] 1149 + tool_call_results = {} 1150 + ignored_notification = False 1151 + ack_note = None # Track any note from annotate_ack tool 1152 + 1153 + # First pass: collect tool return statuses 1154 + for message in message_response.messages: 1155 + if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 1156 + if message.name == 'add_post_to_x_thread': 1157 + tool_call_results[message.tool_call_id] = message.status 1158 + elif message.name == 'ignore_notification': 1159 + if message.status == 'success': 1160 + ignored_notification = True 1161 + logger.info("🚫 X notification ignored") 1162 + 1163 + # Second pass: collect successful tool calls 1164 + for message in message_response.messages: 1165 + if hasattr(message, 'tool_call') and message.tool_call: 1166 + # Collect annotate_ack tool calls 1167 + if message.tool_call.name == 'annotate_ack': 1168 + try: 1169 + args = json.loads(message.tool_call.arguments) 1170 + note = args.get('note', '') 1171 + if note: 1172 + ack_note = note 1173 + logger.debug(f"Found annotate_ack with note: {note[:50]}...") 1174 + except json.JSONDecodeError as e: 1175 + logger.error(f"Failed to parse annotate_ack arguments: {e}") 1176 + 1177 + # Collect add_post_to_x_thread tool calls - only if they were successful 1178 + elif message.tool_call.name == 'add_post_to_x_thread': 1179 + tool_call_id = message.tool_call.tool_call_id 1180 + tool_status = tool_call_results.get(tool_call_id, 'unknown') 1181 + 1182 + if tool_status == 'success': 1183 + try: 1184 + args = json.loads(message.tool_call.arguments) 1185 + reply_text = args.get('text', '') 1186 + if reply_text: 1187 + reply_candidates.append(reply_text) 1188 + logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...") 1189 + except json.JSONDecodeError as e: 1190 + logger.error(f"Failed to parse tool call arguments: {e}") 1191 + 1192 + # Handle conflicts 1193 + if reply_candidates and ignored_notification: 1194 + logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!") 1195 + return False 1196 + 1197 + if reply_candidates: 1198 + # Post replies to X 1199 + logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X") 1200 + 1201 + if len(reply_candidates) == 1: 1202 + content = reply_candidates[0] 1203 + title = f"Reply to @{author_username}" 1204 + else: 1205 + content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)]) 1206 + title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)" 1207 + 1208 + print(f"\n✎ {title}") 1209 + print(f" {'─' * len(title)}") 1210 + for line in content.split('\n'): 1211 + print(f" {line}") 1212 + 1213 + if testing_mode: 1214 + logger.info("TESTING MODE: Skipping actual X post") 1215 + return True 1216 + else: 1217 + # Post to X using thread approach 1218 + success = post_x_thread_replies(x_client, mention_id, reply_candidates) 1219 + if success: 1220 + logger.info(f"Successfully replied to @{author_username} on X") 1221 + 1222 + # Acknowledge the post we're replying to 1223 + try: 1224 + ack_result = acknowledge_x_post(x_client, mention_id, ack_note) 1225 + if ack_result: 1226 + if ack_note: 1227 + logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")") 1228 + else: 1229 + logger.info(f"Successfully acknowledged X post from @{author_username}") 1230 + else: 1231 + logger.warning(f"Failed to acknowledge X post from @{author_username}") 1232 + except Exception as e: 1233 + logger.error(f"Error acknowledging X post from @{author_username}: {e}") 1234 + # Don't fail the entire operation if acknowledgment fails 1235 + 1236 + return True 1237 + else: 1238 + logger.error(f"Failed to send reply to @{author_username} on X") 1239 + return False 1240 + else: 1241 + if ignored_notification: 1242 + logger.info(f"X mention from @{author_username} was explicitly ignored") 1243 + return "ignored" 1244 + else: 1245 + logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username}") 1246 + return "no_reply" 1247 + 1248 + except Exception as e: 1249 + logger.error(f"Error processing X mention: {e}") 1250 + return False 1251 + 1252 + def acknowledge_x_post(x_client, post_id, note=None): 1253 + """ 1254 + Acknowledge an X post that we replied to. 1255 + For X, we could implement this as a private note/database entry since X doesn't have 1256 + a built-in acknowledgment system like Bluesky's stream.thought.ack. 981 1257 982 - logger.info("=== STARTING X SEARCH-BASED NOTIFICATION LOOP ===") 1258 + Args: 1259 + x_client: XClient instance (reserved for future X API acknowledgment features) 1260 + post_id: The X post ID we're acknowledging 1261 + note: Optional note to include with the acknowledgment 1262 + 1263 + Returns: 1264 + True if successful, False otherwise 1265 + """ 1266 + try: 1267 + # x_client reserved for future X API acknowledgment features 1268 + # For now, implement as a simple log entry 1269 + # In the future, this could write to a database or file system 1270 + ack_dir = X_QUEUE_DIR / "acknowledgments" 1271 + ack_dir.mkdir(exist_ok=True) 1272 + 1273 + ack_data = { 1274 + 'post_id': post_id, 1275 + 'acknowledged_at': datetime.now().isoformat(), 1276 + 'note': note 1277 + } 1278 + 1279 + ack_file = ack_dir / f"ack_{post_id}.json" 1280 + with open(ack_file, 'w') as f: 1281 + json.dump(ack_data, f, indent=2) 1282 + 1283 + logger.debug(f"Acknowledged X post {post_id}" + (f" with note: {note[:50]}..." if note else "")) 1284 + return True 1285 + 1286 + except Exception as e: 1287 + logger.error(f"Error acknowledging X post {post_id}: {e}") 1288 + return False 1289 + 1290 + def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages): 1291 + """ 1292 + Post a series of replies to X, threading them properly. 983 1293 1294 + Args: 1295 + x_client: XClient instance 1296 + in_reply_to_tweet_id: The original tweet ID to reply to 1297 + reply_messages: List of reply text strings 1298 + 1299 + Returns: 1300 + True if successful, False otherwise 1301 + """ 984 1302 try: 985 - client = create_x_client() 986 - logger.info("X client initialized") 1303 + current_reply_id = in_reply_to_tweet_id 987 1304 988 - # Get our username for searching 989 - user_info = client._make_request("/users/me", params={"user.fields": "username"}) 1305 + for i, reply_text in enumerate(reply_messages): 1306 + logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...") 1307 + 1308 + result = x_client.post_reply(reply_text, current_reply_id) 1309 + 1310 + if result and 'data' in result: 1311 + new_tweet_id = result['data']['id'] 1312 + logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}") 1313 + # For threading, the next reply should reply to this one 1314 + current_reply_id = new_tweet_id 1315 + else: 1316 + logger.error(f"Failed to post X reply {i+1}") 1317 + return False 1318 + 1319 + return True 1320 + 1321 + except Exception as e: 1322 + logger.error(f"Error posting X thread replies: {e}") 1323 + return False 1324 + 1325 + def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False): 1326 + """ 1327 + Load and process all X mentions from the queue. 1328 + Similar to bsky.py load_and_process_queued_notifications but for X. 1329 + """ 1330 + try: 1331 + # Get all X mention files in queue directory 1332 + queue_files = sorted(X_QUEUE_DIR.glob("x_mention_*.json")) 1333 + 1334 + if not queue_files: 1335 + return 1336 + 1337 + logger.info(f"Processing {len(queue_files)} queued X mentions") 1338 + 1339 + for i, filepath in enumerate(queue_files, 1): 1340 + logger.info(f"Processing X queue file {i}/{len(queue_files)}: {filepath.name}") 1341 + 1342 + try: 1343 + # Load mention data 1344 + with open(filepath, 'r') as f: 1345 + queue_data = json.load(f) 1346 + 1347 + mention_data = queue_data.get('mention', queue_data) 1348 + 1349 + # Process the mention 1350 + success = process_x_mention(void_agent, x_client, mention_data, 1351 + queue_filepath=filepath, testing_mode=testing_mode) 1352 + 1353 + # Handle file based on processing result 1354 + if success: 1355 + if testing_mode: 1356 + logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}") 1357 + else: 1358 + filepath.unlink() 1359 + logger.info(f"Successfully processed and removed X file: {filepath.name}") 1360 + 1361 + # Mark as processed 1362 + processed_mentions = load_processed_mentions() 1363 + processed_mentions.add(mention_data.get('id')) 1364 + save_processed_mentions(processed_mentions) 1365 + 1366 + elif success is None: # Move to error directory 1367 + error_dir = X_QUEUE_DIR / "errors" 1368 + error_dir.mkdir(exist_ok=True) 1369 + error_path = error_dir / filepath.name 1370 + filepath.rename(error_path) 1371 + logger.warning(f"Moved X file {filepath.name} to errors directory") 1372 + 1373 + elif success == "no_reply": # Move to no_reply directory 1374 + no_reply_dir = X_QUEUE_DIR / "no_reply" 1375 + no_reply_dir.mkdir(exist_ok=True) 1376 + no_reply_path = no_reply_dir / filepath.name 1377 + filepath.rename(no_reply_path) 1378 + logger.info(f"Moved X file {filepath.name} to no_reply directory") 1379 + 1380 + elif success == "ignored": # Delete ignored notifications 1381 + filepath.unlink() 1382 + logger.info(f"🚫 Deleted ignored X notification: {filepath.name}") 1383 + 1384 + else: 1385 + logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry") 1386 + 1387 + except Exception as e: 1388 + logger.error(f"💥 Error processing queued X mention {filepath.name}: {e}") 1389 + 1390 + except Exception as e: 1391 + logger.error(f"Error loading queued X mentions: {e}") 1392 + 1393 + def process_x_notifications(void_agent, x_client, testing_mode=False): 1394 + """ 1395 + Fetch new X mentions, queue them, and process the queue. 1396 + Similar to bsky.py process_notifications but for X. 1397 + """ 1398 + try: 1399 + # Get username for fetching mentions 1400 + user_info = x_client._make_request("/users/me", params={"user.fields": "username"}) 990 1401 if not user_info or "data" not in user_info: 991 - logger.error("Could not get username for search") 1402 + logger.error("Could not get username for X mentions") 992 1403 return 993 1404 994 1405 username = user_info["data"]["username"] 995 - logger.info(f"Monitoring mentions of @{username}") 1406 + 1407 + # Fetch and queue new mentions 1408 + new_count = fetch_and_queue_mentions(username) 1409 + 1410 + if new_count > 0: 1411 + logger.info(f"Found {new_count} new X mentions to process") 1412 + 1413 + # Process the entire queue 1414 + load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 996 1415 997 1416 except Exception as e: 998 - logger.error(f"Failed to initialize X client: {e}") 999 - return 1417 + logger.error(f"Error processing X notifications: {e}") 1418 + 1419 + def initialize_x_void(): 1420 + """Initialize the void agent for X operations.""" 1421 + logger.info("Starting void agent initialization for X...") 1000 1422 1001 - # Track the last seen mention ID to avoid duplicates 1002 - last_mention_id = None 1423 + from config_loader import get_letta_config 1424 + from letta_client import Letta 1425 + 1426 + # Get config 1427 + config = get_letta_config() 1428 + client = Letta(token=config['api_key'], timeout=config['timeout']) 1429 + agent_id = config['agent_id'] 1430 + 1431 + try: 1432 + void_agent = client.agents.retrieve(agent_id=agent_id) 1433 + logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})") 1434 + except Exception as e: 1435 + logger.error(f"Failed to load void agent {agent_id}: {e}") 1436 + raise e 1437 + 1438 + # Log agent details 1439 + logger.info(f"X Void agent details - ID: {void_agent.id}") 1440 + logger.info(f"Agent name: {void_agent.name}") 1441 + 1442 + return void_agent 1443 + 1444 + def x_main_loop(testing_mode=False): 1445 + """ 1446 + Main X bot loop that continuously monitors for mentions and processes them. 1447 + Similar to bsky.py main() but for X/Twitter. 1448 + """ 1449 + import time 1450 + from time import sleep 1451 + 1452 + logger.info("=== STARTING X VOID BOT ===") 1453 + 1454 + # Initialize void agent 1455 + void_agent = initialize_x_void() 1456 + logger.info(f"X void agent initialized: {void_agent.id}") 1457 + 1458 + # Initialize X client 1459 + x_client = create_x_client() 1460 + logger.info("Connected to X API") 1461 + 1462 + # Main loop 1463 + FETCH_DELAY_SEC = 60 # Check every minute for X mentions 1464 + logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds") 1465 + 1466 + if testing_mode: 1467 + logger.info("=== RUNNING IN X TESTING MODE ===") 1468 + logger.info(" - No messages will be sent to X") 1469 + logger.info(" - Queue files will not be deleted") 1470 + 1003 1471 cycle_count = 0 1472 + start_time = time.time() 1004 1473 1005 - # Search-based loop with better rate limits 1006 1474 while True: 1007 1475 try: 1008 1476 cycle_count += 1 1009 - logger.info(f"=== X SEARCH CYCLE {cycle_count} ===") 1477 + logger.info(f"=== X CYCLE {cycle_count} ===") 1010 1478 1011 - # Search for mentions using search endpoint 1012 - mentions = client.search_mentions( 1013 - username=username, 1014 - since_id=last_mention_id, 1015 - max_results=10 1016 - ) 1479 + # Process X notifications (fetch, queue, and process) 1480 + process_x_notifications(void_agent, x_client, testing_mode) 1017 1481 1018 - if mentions: 1019 - logger.info(f"Found {len(mentions)} new mentions via search") 1020 - 1021 - # Update last seen ID 1022 - if mentions: 1023 - last_mention_id = mentions[0]['id'] # Most recent first 1024 - 1025 - # Process each mention (just log for now) 1026 - for mention in mentions: 1027 - logger.info(f"Mention from {mention.get('author_id')}: {mention.get('text', '')[:100]}...") 1028 - 1029 - # Convert to YAML for inspection 1030 - yaml_mention = mention_to_yaml_string(mention) 1031 - 1032 - # Save to file for inspection (temporary) 1033 - debug_dir = Path("x_debug") 1034 - debug_dir.mkdir(exist_ok=True) 1035 - 1036 - mention_file = debug_dir / f"mention_{mention['id']}.yaml" 1037 - with open(mention_file, 'w') as f: 1038 - f.write(yaml_mention) 1039 - 1040 - logger.info(f"Saved mention debug info to {mention_file}") 1041 - else: 1042 - logger.info("No new mentions found via search") 1482 + # Log cycle completion 1483 + elapsed_time = time.time() - start_time 1484 + logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes") 1043 1485 1044 - # Sleep between cycles - search might have better rate limits 1045 - logger.info("Sleeping for 60 seconds...") 1046 - time.sleep(60) 1486 + sleep(FETCH_DELAY_SEC) 1047 1487 1048 1488 except KeyboardInterrupt: 1049 - logger.info("=== X SEARCH LOOP STOPPED BY USER ===") 1050 - logger.info(f"Processed {cycle_count} cycles") 1489 + elapsed_time = time.time() - start_time 1490 + logger.info("=== X BOT STOPPED BY USER ===") 1491 + logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes") 1051 1492 break 1052 1493 except Exception as e: 1053 - logger.error(f"Error in X search cycle {cycle_count}: {e}") 1054 - logger.info("Sleeping for 120 seconds due to error...") 1055 - time.sleep(120) 1494 + logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===") 1495 + logger.error(f"Error details: {e}") 1496 + logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...") 1497 + sleep(FETCH_DELAY_SEC * 2) 1498 + 1499 + def process_queue_only(testing_mode=False): 1500 + """ 1501 + Process all queued X mentions without fetching new ones. 1502 + Useful for rate limit management - queue first, then process separately. 1503 + 1504 + Args: 1505 + testing_mode: If True, don't actually post to X and keep queue files 1506 + """ 1507 + logger.info("=== PROCESSING X QUEUE ONLY ===") 1508 + 1509 + if testing_mode: 1510 + logger.info("=== RUNNING IN X TESTING MODE ===") 1511 + logger.info(" - No messages will be sent to X") 1512 + logger.info(" - Queue files will not be deleted") 1513 + 1514 + try: 1515 + # Initialize void agent 1516 + void_agent = initialize_x_void() 1517 + logger.info(f"X void agent initialized: {void_agent.id}") 1518 + 1519 + # Initialize X client 1520 + x_client = create_x_client() 1521 + logger.info("Connected to X API") 1522 + 1523 + # Process the queue without fetching new mentions 1524 + logger.info("Processing existing X queue...") 1525 + load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 1526 + 1527 + logger.info("=== X QUEUE PROCESSING COMPLETE ===") 1528 + 1529 + except Exception as e: 1530 + logger.error(f"Error processing X queue: {e}") 1531 + raise 1532 + 1533 + def x_notification_loop(): 1534 + """ 1535 + DEPRECATED: Old X notification loop using search-based mention detection. 1536 + Use x_main_loop() instead for the full bot experience. 1537 + """ 1538 + logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.") 1539 + x_main_loop() 1056 1540 1057 1541 if __name__ == "__main__": 1058 1542 import sys 1543 + import argparse 1059 1544 1060 1545 if len(sys.argv) > 1: 1061 - if sys.argv[1] == "loop": 1546 + if sys.argv[1] == "bot": 1547 + # Main bot with optional --test flag 1548 + parser = argparse.ArgumentParser(description='X Void Bot') 1549 + parser.add_argument('command', choices=['bot']) 1550 + parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)') 1551 + args = parser.parse_args() 1552 + x_main_loop(testing_mode=args.test) 1553 + elif sys.argv[1] == "loop": 1062 1554 x_notification_loop() 1063 1555 elif sys.argv[1] == "reply": 1064 1556 reply_to_cameron_post() ··· 1070 1562 test_fetch_and_queue() 1071 1563 elif sys.argv[1] == "thread": 1072 1564 test_thread_context() 1565 + elif sys.argv[1] == "process": 1566 + # Process all queued mentions with optional --test flag 1567 + testing_mode = "--test" in sys.argv 1568 + process_queue_only(testing_mode=testing_mode) 1073 1569 elif sys.argv[1] == "letta": 1074 - # Use specific agent ID if provided, otherwise use default 1075 - agent_id = sys.argv[2] if len(sys.argv) > 2 else "agent-94a01ee3-3023-46e9-baf8-6172f09bee99" 1570 + # Use specific agent ID if provided, otherwise use from config 1571 + agent_id = sys.argv[2] if len(sys.argv) > 2 else None 1076 1572 test_letta_integration(agent_id) 1077 1573 else: 1078 - print("Usage: python x.py [loop|reply|me|search|queue|thread|letta]") 1079 - print(" loop - Run the notification monitoring loop") 1080 - print(" reply - Reply to Cameron's specific post") 1081 - print(" me - Get authenticated user info and correct user ID") 1082 - print(" search - Test search-based mention detection") 1083 - print(" queue - Test fetch and queue mentions (single pass)") 1084 - print(" thread - Test thread context retrieval from queued mention") 1085 - print(" letta - Test sending thread context to Letta agent") 1086 - print(" Optional: python x.py letta <agent-id>") 1574 + print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta]") 1575 + print(" bot - Run the main X bot (use --test for testing mode)") 1576 + print(" Example: python x.py bot --test") 1577 + print(" queue - Fetch and queue mentions only (no processing)") 1578 + print(" process - Process all queued mentions only (no fetching)") 1579 + print(" Example: python x.py process --test") 1580 + print(" loop - Run the old notification monitoring loop (deprecated)") 1581 + print(" reply - Reply to Cameron's specific post") 1582 + print(" me - Get authenticated user info and correct user ID") 1583 + print(" search - Test search-based mention detection") 1584 + print(" thread - Test thread context retrieval from queued mention") 1585 + print(" letta - Test sending thread context to Letta agent") 1586 + print(" Optional: python x.py letta <agent-id>") 1087 1587 else: 1088 1588 test_x_client()
+5
x_queue/acknowledgments/ack_1950690566909710618.json
··· 1 + { 2 + "post_id": "1950690566909710618", 3 + "acknowledged_at": "2025-07-30T19:03:21.675396", 4 + "note": null 5 + }
+5
x_queue/acknowledgments/ack_1950714596828061885.json
··· 1 + { 2 + "post_id": "1950714596828061885", 3 + "acknowledged_at": "2025-07-30T19:03:43.784906", 4 + "note": null 5 + }
+5
x_queue/acknowledgments/ack_1950741288724423041.json
··· 1 + { 2 + "post_id": "1950741288724423041", 3 + "acknowledged_at": "2025-07-30T19:13:29.522859", 4 + "note": null 5 + }
+1
x_queue/processed_mentions.json
··· 1 + ["1950741288724423041", "1950690566909710618", "1950714596828061885", "1950739368530120865"]