a digital person for bluesky
1import os 2import logging 3import requests 4import yaml 5import json 6import hashlib 7import random 8import time 9from typing import Optional, Dict, Any, List, Set 10from datetime import datetime 11from pathlib import Path 12from requests_oauthlib import OAuth1 13from rich import print as rprint 14from rich.panel import Panel 15from rich.text import Text 16 17import bsky_utils 18 19class XRateLimitError(Exception): 20 """Exception raised when X API rate limit is exceeded""" 21 pass 22 23 24# Configure logging 25logging.basicConfig( 26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 27) 28logger = logging.getLogger("x_client") 29 30# X-specific file paths 31X_QUEUE_DIR = Path("x_queue") 32X_CACHE_DIR = Path("x_cache") 33X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json") 34X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json") 35X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt") 36 37class XClient: 38 """X (Twitter) API client for fetching mentions and managing interactions.""" 39 40 def __init__(self, api_key: str, user_id: str, access_token: str = None, 41 consumer_key: str = None, consumer_secret: str = None, 42 access_token_secret: str = None): 43 self.api_key = api_key 44 self.access_token = access_token 45 self.user_id = user_id 46 self.base_url = "https://api.x.com/2" 47 48 # Check if we have OAuth 1.0a credentials 49 if (consumer_key and consumer_secret and access_token and access_token_secret): 50 # Use OAuth 1.0a for User Context 51 self.oauth = OAuth1( 52 consumer_key, 53 client_secret=consumer_secret, 54 resource_owner_key=access_token, 55 resource_owner_secret=access_token_secret 56 ) 57 self.headers = {"Content-Type": "application/json"} 58 self.auth_method = "oauth1a" 59 logger.info("Using OAuth 1.0a User Context authentication for X API") 60 elif access_token: 61 # Use OAuth 2.0 Bearer token for User Context 62 self.oauth = None 63 self.headers = { 64 "Authorization": f"Bearer {access_token}", 65 "Content-Type": "application/json" 66 } 67 self.auth_method = "oauth2_user" 68 logger.info("Using OAuth 2.0 User Context access token for X API") 69 else: 70 # Use Application-Only Bearer token 71 self.oauth = None 72 self.headers = { 73 "Authorization": f"Bearer {api_key}", 74 "Content-Type": "application/json" 75 } 76 self.auth_method = "bearer" 77 logger.info("Using Application-Only Bearer token for X API") 78 79 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None) -> Optional[Dict]: 80 """Make a request to the X API with proper error handling.""" 81 url = f"{self.base_url}{endpoint}" 82 83 try: 84 if method.upper() == "GET": 85 if self.oauth: 86 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth) 87 else: 88 response = requests.get(url, headers=self.headers, params=params) 89 elif method.upper() == "POST": 90 if self.oauth: 91 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth) 92 else: 93 response = requests.post(url, headers=self.headers, json=data) 94 else: 95 raise ValueError(f"Unsupported HTTP method: {method}") 96 97 response.raise_for_status() 98 return response.json() 99 except requests.exceptions.HTTPError as e: 100 if response.status_code == 401: 101 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials") 102 logger.error(f"Response: {response.text}") 103 elif response.status_code == 403: 104 logger.error(f"X API forbidden with {self.auth_method} - check app permissions") 105 logger.error(f"Response: {response.text}") 106 elif response.status_code == 429: 107 logger.error("X API rate limit exceeded - waiting 60 seconds before retry") 108 logger.error(f"Response: {response.text}") 109 time.sleep(60) 110 raise XRateLimitError("X API rate limit exceeded") 111 else: 112 logger.error(f"X API request failed: {e}") 113 logger.error(f"Response: {response.text}") 114 return None 115 except Exception as e: 116 logger.error(f"Unexpected error making X API request: {e}") 117 return None 118 119 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]: 120 """ 121 Fetch mentions for the configured user. 122 123 Args: 124 since_id: Minimum Post ID to include (for getting newer mentions) 125 max_results: Number of results to return (5-100) 126 127 Returns: 128 List of mention objects or None if request failed 129 """ 130 endpoint = f"/users/{self.user_id}/mentions" 131 params = { 132 "max_results": min(max(max_results, 5), 100), # Ensure within API limits 133 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets", 134 "user.fields": "id,name,username", 135 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 136 } 137 138 if since_id: 139 params["since_id"] = since_id 140 141 logger.info(f"Fetching mentions for user {self.user_id}") 142 response = self._make_request(endpoint, params) 143 144 if response: 145 logger.debug(f"X API response: {response}") 146 147 if response and "data" in response: 148 mentions = response["data"] 149 logger.info(f"Retrieved {len(mentions)} mentions") 150 return mentions 151 else: 152 if response: 153 logger.info(f"No mentions in response. Full response: {response}") 154 else: 155 logger.warning("Request failed - no response received") 156 return [] 157 158 def get_user_info(self, user_id: str) -> Optional[Dict]: 159 """Get information about a specific user.""" 160 endpoint = f"/users/{user_id}" 161 params = { 162 "user.fields": "id,name,username,description,public_metrics" 163 } 164 165 response = self._make_request(endpoint, params) 166 return response.get("data") if response else None 167 168 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]: 169 """ 170 Search for mentions using the search endpoint instead of mentions endpoint. 171 This might have better rate limits than the direct mentions endpoint. 172 173 Args: 174 username: Username to search for mentions of (without @) 175 max_results: Number of results to return (10-100) 176 since_id: Only return results newer than this tweet ID 177 178 Returns: 179 List of tweets mentioning the username 180 """ 181 endpoint = "/tweets/search/recent" 182 183 # Search for mentions of the username 184 query = f"@{username}" 185 186 params = { 187 "query": query, 188 "max_results": min(max(max_results, 10), 100), 189 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 190 "user.fields": "id,name,username", 191 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 192 } 193 194 if since_id: 195 params["since_id"] = since_id 196 197 logger.info(f"Searching for mentions of @{username}") 198 response = self._make_request(endpoint, params) 199 200 if response and "data" in response: 201 tweets = response["data"] 202 logger.info(f"Found {len(tweets)} mentions via search") 203 return tweets 204 else: 205 if response: 206 logger.info(f"No mentions found via search. Response: {response}") 207 else: 208 logger.warning("Search request failed") 209 return [] 210 211 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]: 212 """ 213 Get all tweets in a conversation thread up to a specific tweet ID. 214 215 Args: 216 conversation_id: The conversation ID to fetch (should be the original tweet ID) 217 use_cache: Whether to use cached data if available 218 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID) 219 220 Returns: 221 List of tweets in the conversation, ordered chronologically 222 """ 223 # Check cache first if enabled 224 if use_cache: 225 cached_data = get_cached_thread_context(conversation_id) 226 if cached_data: 227 return cached_data 228 229 # First, get the original tweet directly since it might not appear in conversation search 230 original_tweet = None 231 try: 232 endpoint = f"/tweets/{conversation_id}" 233 params = { 234 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 235 "user.fields": "id,name,username", 236 "expansions": "author_id" 237 } 238 response = self._make_request(endpoint, params) 239 if response and "data" in response: 240 original_tweet = response["data"] 241 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}") 242 except Exception as e: 243 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}") 244 245 # Then search for all tweets in this conversation 246 endpoint = "/tweets/search/recent" 247 params = { 248 "query": f"conversation_id:{conversation_id}", 249 "max_results": 100, # Get as many as possible 250 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 251 "user.fields": "id,name,username", 252 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id", 253 "sort_order": "recency" # Get newest first, we'll reverse later 254 } 255 256 # Add until_id parameter to exclude tweets after the mention being processed 257 if until_id: 258 params["until_id"] = until_id 259 logger.info(f"Using until_id={until_id} to exclude future tweets") 260 261 logger.info(f"Fetching thread context for conversation {conversation_id}") 262 response = self._make_request(endpoint, params) 263 264 tweets = [] 265 users_data = {} 266 267 # Collect tweets from search 268 if response and "data" in response: 269 tweets.extend(response["data"]) 270 # Store user data for reference 271 if "includes" in response and "users" in response["includes"]: 272 for user in response["includes"]["users"]: 273 users_data[user["id"]] = user 274 275 # Add original tweet if we got it and it's not already in the list 276 if original_tweet: 277 tweet_ids = [t.get('id') for t in tweets] 278 if original_tweet.get('id') not in tweet_ids: 279 tweets.append(original_tweet) 280 logger.info("Added original tweet to thread context") 281 282 # Attempt to fill gaps by fetching referenced tweets that are missing 283 # This helps with X API's incomplete conversation search results 284 tweet_ids = set(t.get('id') for t in tweets) 285 missing_tweet_ids = set() 286 287 # Collect all referenced tweet IDs that aren't in our current set 288 for tweet in tweets: 289 referenced_tweets = tweet.get('referenced_tweets', []) 290 for ref in referenced_tweets: 291 ref_id = ref.get('id') 292 if ref_id and ref_id not in tweet_ids: 293 missing_tweet_ids.add(ref_id) 294 295 # Fetch missing referenced tweets individually 296 for missing_id in missing_tweet_ids: 297 try: 298 endpoint = f"/tweets/{missing_id}" 299 params = { 300 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 301 "user.fields": "id,name,username", 302 "expansions": "author_id" 303 } 304 response = self._make_request(endpoint, params) 305 if response and "data" in response: 306 missing_tweet = response["data"] 307 # Only add if it's actually part of this conversation 308 if missing_tweet.get('conversation_id') == conversation_id: 309 tweets.append(missing_tweet) 310 tweet_ids.add(missing_id) 311 logger.info(f"Retrieved missing referenced tweet: {missing_id}") 312 313 # Also add user data if available 314 if "includes" in response and "users" in response["includes"]: 315 for user in response["includes"]["users"]: 316 users_data[user["id"]] = user 317 except Exception as e: 318 logger.warning(f"Could not fetch missing tweet {missing_id}: {e}") 319 320 if tweets: 321 # Filter out tweets that occur after until_id (if specified) 322 if until_id: 323 original_count = len(tweets) 324 # Convert until_id to int for comparison (Twitter IDs are sequential) 325 until_id_int = int(until_id) 326 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int] 327 filtered_count = len(tweets) 328 if original_count != filtered_count: 329 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}") 330 331 # Sort chronologically (oldest first) 332 tweets.sort(key=lambda x: x.get('created_at', '')) 333 logger.info(f"Retrieved {len(tweets)} tweets in thread") 334 335 thread_data = {"tweets": tweets, "users": users_data} 336 337 # Cache the result 338 if use_cache: 339 save_cached_thread_context(conversation_id, thread_data) 340 341 return thread_data 342 else: 343 logger.warning("No tweets found for thread context") 344 return None 345 346 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]: 347 """ 348 Post a reply to a specific tweet. 349 350 Args: 351 reply_text: The text content of the reply 352 in_reply_to_tweet_id: The ID of the tweet to reply to 353 354 Returns: 355 Response data if successful, None if failed 356 """ 357 endpoint = "/tweets" 358 359 payload = { 360 "text": reply_text, 361 "reply": { 362 "in_reply_to_tweet_id": in_reply_to_tweet_id 363 } 364 } 365 366 logger.info(f"Attempting to post reply with {self.auth_method} authentication") 367 result = self._make_request(endpoint, method="POST", data=payload) 368 369 if result: 370 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}") 371 return result 372 else: 373 logger.error("Failed to post reply") 374 return None 375 376def load_x_config(config_path: str = "config.yaml") -> Dict[str, str]: 377 """Load X configuration from config file.""" 378 try: 379 with open(config_path, 'r') as f: 380 config = yaml.safe_load(f) 381 382 x_config = config.get('x', {}) 383 if not x_config.get('api_key') or not x_config.get('user_id'): 384 raise ValueError("X API key and user_id must be configured in config.yaml") 385 386 return x_config 387 except Exception as e: 388 logger.error(f"Failed to load X configuration: {e}") 389 raise 390 391def create_x_client(config_path: str = "config.yaml") -> XClient: 392 """Create and return an X client with configuration loaded from file.""" 393 config = load_x_config(config_path) 394 return XClient( 395 api_key=config['api_key'], 396 user_id=config['user_id'], 397 access_token=config.get('access_token'), 398 consumer_key=config.get('consumer_key'), 399 consumer_secret=config.get('consumer_secret'), 400 access_token_secret=config.get('access_token_secret') 401 ) 402 403def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str: 404 """ 405 Convert a mention object to a YAML string for better AI comprehension. 406 Similar to thread_to_yaml_string in bsky_utils.py 407 """ 408 # Extract relevant fields 409 simplified_mention = { 410 'id': mention.get('id'), 411 'text': mention.get('text'), 412 'author_id': mention.get('author_id'), 413 'created_at': mention.get('created_at'), 414 'in_reply_to_user_id': mention.get('in_reply_to_user_id') 415 } 416 417 # Add user information if available 418 if users_data and mention.get('author_id') in users_data: 419 user = users_data[mention.get('author_id')] 420 simplified_mention['author'] = { 421 'username': user.get('username'), 422 'name': user.get('name') 423 } 424 425 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False) 426 427def thread_to_yaml_string(thread_data: Dict) -> str: 428 """ 429 Convert X thread context to YAML string for AI comprehension. 430 Similar to Bluesky's thread_to_yaml_string function. 431 432 Args: 433 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 434 435 Returns: 436 YAML string representation of the thread 437 """ 438 if not thread_data or "tweets" not in thread_data: 439 return "conversation: []\n" 440 441 tweets = thread_data["tweets"] 442 users_data = thread_data.get("users", {}) 443 444 simplified_thread = { 445 "conversation": [] 446 } 447 448 for tweet in tweets: 449 # Get user info 450 author_id = tweet.get('author_id') 451 author_info = {} 452 if author_id and author_id in users_data: 453 user = users_data[author_id] 454 author_info = { 455 'username': user.get('username'), 456 'name': user.get('name') 457 } 458 459 # Build tweet object (simplified for AI consumption) 460 tweet_obj = { 461 'text': tweet.get('text'), 462 'created_at': tweet.get('created_at'), 463 'author': author_info, 464 'author_id': author_id # Include user ID for block management 465 } 466 467 simplified_thread["conversation"].append(tweet_obj) 468 469 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False) 470 471 472def ensure_x_user_blocks_attached(thread_data: Dict, agent_id: Optional[str] = None) -> None: 473 """ 474 Ensure all users in the thread have their X user blocks attached. 475 Creates blocks with initial content including their handle if they don't exist. 476 477 Args: 478 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 479 agent_id: The Letta agent ID to attach blocks to (defaults to config agent_id) 480 """ 481 if not thread_data or "users" not in thread_data: 482 return 483 484 try: 485 from tools.blocks import attach_x_user_blocks, x_user_note_set 486 from config_loader import get_letta_config 487 from letta_client import Letta 488 489 # Get Letta client and agent_id from config 490 config = get_letta_config() 491 client = Letta(token=config['api_key'], timeout=config['timeout']) 492 493 # Use provided agent_id or get from config 494 if agent_id is None: 495 agent_id = config['agent_id'] 496 497 # Get agent info to create a mock agent_state for the functions 498 class MockAgentState: 499 def __init__(self, agent_id): 500 self.id = agent_id 501 502 agent_state = MockAgentState(agent_id) 503 504 users_data = thread_data["users"] 505 user_ids = list(users_data.keys()) 506 507 if not user_ids: 508 return 509 510 logger.info(f"Ensuring X user blocks for {len(user_ids)} users: {user_ids}") 511 512 # Get current blocks to check which users already have blocks with content 513 current_blocks = client.agents.blocks.list(agent_id=agent_id) 514 existing_user_blocks = {} 515 516 for block in current_blocks: 517 if block.label.startswith("x_user_"): 518 user_id = block.label.replace("x_user_", "") 519 existing_user_blocks[user_id] = block 520 521 # Attach all user blocks (this will create missing ones with basic content) 522 attach_result = attach_x_user_blocks(user_ids, agent_state) 523 logger.info(f"X user block attachment result: {attach_result}") 524 525 # For newly created blocks, update with user handle information 526 for user_id in user_ids: 527 if user_id not in existing_user_blocks: 528 user_info = users_data[user_id] 529 username = user_info.get('username', 'unknown') 530 name = user_info.get('name', 'Unknown') 531 532 # Set initial content with handle information 533 initial_content = f"# X User: {user_id}\n\n**Handle:** @{username}\n**Name:** {name}\n\nNo additional information about this user yet." 534 535 try: 536 x_user_note_set(user_id, initial_content, agent_state) 537 logger.info(f"Set initial content for X user {user_id} (@{username})") 538 except Exception as e: 539 logger.error(f"Failed to set initial content for X user {user_id}: {e}") 540 541 except Exception as e: 542 logger.error(f"Error ensuring X user blocks: {e}") 543 544 545# X Caching and Queue System Functions 546 547def load_last_seen_id() -> Optional[str]: 548 """Load the last seen mention ID for incremental fetching.""" 549 if X_LAST_SEEN_FILE.exists(): 550 try: 551 with open(X_LAST_SEEN_FILE, 'r') as f: 552 data = json.load(f) 553 return data.get('last_seen_id') 554 except Exception as e: 555 logger.error(f"Error loading last seen ID: {e}") 556 return None 557 558def save_last_seen_id(mention_id: str): 559 """Save the last seen mention ID.""" 560 try: 561 X_QUEUE_DIR.mkdir(exist_ok=True) 562 with open(X_LAST_SEEN_FILE, 'w') as f: 563 json.dump({ 564 'last_seen_id': mention_id, 565 'updated_at': datetime.now().isoformat() 566 }, f) 567 logger.debug(f"Saved last seen ID: {mention_id}") 568 except Exception as e: 569 logger.error(f"Error saving last seen ID: {e}") 570 571def load_processed_mentions() -> set: 572 """Load the set of processed mention IDs.""" 573 if X_PROCESSED_MENTIONS_FILE.exists(): 574 try: 575 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f: 576 data = json.load(f) 577 # Keep only recent entries (last 10000) 578 if len(data) > 10000: 579 data = data[-10000:] 580 save_processed_mentions(set(data)) 581 return set(data) 582 except Exception as e: 583 logger.error(f"Error loading processed mentions: {e}") 584 return set() 585 586def save_processed_mentions(processed_set: set): 587 """Save the set of processed mention IDs.""" 588 try: 589 X_QUEUE_DIR.mkdir(exist_ok=True) 590 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f: 591 json.dump(list(processed_set), f) 592 except Exception as e: 593 logger.error(f"Error saving processed mentions: {e}") 594 595def load_downrank_users() -> Set[str]: 596 """Load the set of user IDs that should be downranked (responded to 10% of the time).""" 597 try: 598 if not X_DOWNRANK_USERS_FILE.exists(): 599 return set() 600 601 downrank_users = set() 602 with open(X_DOWNRANK_USERS_FILE, 'r') as f: 603 for line in f: 604 line = line.strip() 605 # Skip empty lines and comments 606 if line and not line.startswith('#'): 607 downrank_users.add(line) 608 609 logger.info(f"Loaded {len(downrank_users)} downrank users") 610 return downrank_users 611 except Exception as e: 612 logger.error(f"Error loading downrank users: {e}") 613 return set() 614 615def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool: 616 """ 617 Check if we should respond to a downranked user. 618 Returns True 10% of the time for downranked users, True 100% of the time for others. 619 """ 620 if user_id not in downrank_users: 621 return True 622 623 # 10% chance for downranked users 624 should_respond = random.random() < 0.1 625 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)") 626 return should_respond 627 628def save_mention_to_queue(mention: Dict): 629 """Save a mention to the queue directory for async processing.""" 630 try: 631 mention_id = mention.get('id') 632 if not mention_id: 633 logger.error("Mention missing ID, cannot queue") 634 return 635 636 # Check if already processed 637 processed_mentions = load_processed_mentions() 638 if mention_id in processed_mentions: 639 logger.debug(f"Mention {mention_id} already processed, skipping") 640 return 641 642 # Create queue directory 643 X_QUEUE_DIR.mkdir(exist_ok=True) 644 645 # Create filename using hash (similar to Bluesky system) 646 mention_str = json.dumps(mention, sort_keys=True) 647 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16] 648 filename = f"x_mention_{mention_hash}.json" 649 650 queue_file = X_QUEUE_DIR / filename 651 652 # Save mention data with enhanced debugging information 653 mention_data = { 654 'mention': mention, 655 'queued_at': datetime.now().isoformat(), 656 'type': 'x_mention', 657 # Debug info for conversation tracking 658 'debug_info': { 659 'mention_id': mention.get('id'), 660 'author_id': mention.get('author_id'), 661 'conversation_id': mention.get('conversation_id'), 662 'in_reply_to_user_id': mention.get('in_reply_to_user_id'), 663 'referenced_tweets': mention.get('referenced_tweets', []), 664 'text_preview': mention.get('text', '')[:200], 665 'created_at': mention.get('created_at'), 666 'public_metrics': mention.get('public_metrics', {}), 667 'context_annotations': mention.get('context_annotations', []) 668 } 669 } 670 671 with open(queue_file, 'w') as f: 672 json.dump(mention_data, f, indent=2) 673 674 logger.info(f"Queued X mention {mention_id} -> {filename}") 675 676 except Exception as e: 677 logger.error(f"Error saving mention to queue: {e}") 678 679# X Cache Functions 680def get_cached_thread_context(conversation_id: str) -> Optional[Dict]: 681 """Load cached thread context if available.""" 682 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 683 if cache_file.exists(): 684 try: 685 with open(cache_file, 'r') as f: 686 cached_data = json.load(f) 687 # Check if cache is recent (within 1 hour) 688 from datetime import datetime, timedelta 689 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 690 if datetime.now() - cached_time < timedelta(hours=1): 691 logger.info(f"Using cached thread context for {conversation_id}") 692 return cached_data.get('thread_data') 693 except Exception as e: 694 logger.warning(f"Error loading cached thread context: {e}") 695 return None 696 697def save_cached_thread_context(conversation_id: str, thread_data: Dict): 698 """Save thread context to cache.""" 699 try: 700 X_CACHE_DIR.mkdir(exist_ok=True) 701 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 702 703 cache_data = { 704 'conversation_id': conversation_id, 705 'thread_data': thread_data, 706 'cached_at': datetime.now().isoformat() 707 } 708 709 with open(cache_file, 'w') as f: 710 json.dump(cache_data, f, indent=2) 711 712 logger.debug(f"Cached thread context for {conversation_id}") 713 except Exception as e: 714 logger.error(f"Error caching thread context: {e}") 715 716def fetch_and_queue_mentions(username: str) -> int: 717 """ 718 Single-pass function to fetch new mentions and queue them. 719 Returns number of new mentions found. 720 """ 721 try: 722 client = create_x_client() 723 724 # Load last seen ID for incremental fetching 725 last_seen_id = load_last_seen_id() 726 727 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}") 728 729 # Search for mentions 730 mentions = client.search_mentions( 731 username=username, 732 since_id=last_seen_id, 733 max_results=100 # Get as many as possible 734 ) 735 736 if not mentions: 737 logger.info("No new mentions found") 738 return 0 739 740 # Process mentions (newest first, so reverse to process oldest first) 741 mentions.reverse() 742 new_count = 0 743 744 for mention in mentions: 745 save_mention_to_queue(mention) 746 new_count += 1 747 748 # Update last seen ID to the most recent mention 749 if mentions: 750 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent 751 save_last_seen_id(most_recent_id) 752 753 logger.info(f"Queued {new_count} new X mentions") 754 return new_count 755 756 except Exception as e: 757 logger.error(f"Error fetching and queuing mentions: {e}") 758 return 0 759 760# Simple test function 761def get_my_user_info(): 762 """Get the authenticated user's information to find correct user ID.""" 763 try: 764 client = create_x_client() 765 766 # Use the /2/users/me endpoint to get authenticated user info 767 endpoint = "/users/me" 768 params = { 769 "user.fields": "id,name,username,description" 770 } 771 772 print("Fetching authenticated user information...") 773 response = client._make_request(endpoint, params=params) 774 775 if response and "data" in response: 776 user_data = response["data"] 777 print(f"✅ Found authenticated user:") 778 print(f" ID: {user_data.get('id')}") 779 print(f" Username: @{user_data.get('username')}") 780 print(f" Name: {user_data.get('name')}") 781 print(f" Description: {user_data.get('description', 'N/A')[:100]}...") 782 print(f"\n🔧 Update your config.yaml with:") 783 print(f" user_id: \"{user_data.get('id')}\"") 784 return user_data 785 else: 786 print("❌ Failed to get user information") 787 print(f"Response: {response}") 788 return None 789 790 except Exception as e: 791 print(f"Error getting user info: {e}") 792 return None 793 794def test_search_mentions(): 795 """Test the search-based mention detection.""" 796 try: 797 client = create_x_client() 798 799 # First get our username 800 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 801 if not user_info or "data" not in user_info: 802 print("❌ Could not get username") 803 return 804 805 username = user_info["data"]["username"] 806 print(f"🔍 Searching for mentions of @{username}") 807 808 mentions = client.search_mentions(username, max_results=5) 809 810 if mentions: 811 print(f"✅ Found {len(mentions)} mentions via search:") 812 for mention in mentions: 813 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...") 814 else: 815 print("No mentions found via search") 816 817 except Exception as e: 818 print(f"Search test failed: {e}") 819 820def test_fetch_and_queue(): 821 """Test the single-pass fetch and queue function.""" 822 try: 823 client = create_x_client() 824 825 # Get our username 826 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 827 if not user_info or "data" not in user_info: 828 print("❌ Could not get username") 829 return 830 831 username = user_info["data"]["username"] 832 print(f"🔄 Fetching and queueing mentions for @{username}") 833 834 # Show current state 835 last_seen = load_last_seen_id() 836 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}") 837 838 # Fetch and queue 839 new_count = fetch_and_queue_mentions(username) 840 841 if new_count > 0: 842 print(f"✅ Queued {new_count} new mentions") 843 print(f"📁 Check ./x_queue/ directory for queued mentions") 844 845 # Show updated state 846 new_last_seen = load_last_seen_id() 847 print(f"📍 Updated last seen ID: {new_last_seen}") 848 else: 849 print("ℹ️ No new mentions to queue") 850 851 except Exception as e: 852 print(f"Fetch and queue test failed: {e}") 853 854def test_thread_context(): 855 """Test thread context retrieval from a queued mention.""" 856 try: 857 import json 858 859 # Find a queued mention file 860 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 861 if not queue_files: 862 print("❌ No queued mentions found. Run 'python x.py queue' first.") 863 return 864 865 # Read the first mention 866 mention_file = queue_files[0] 867 with open(mention_file, 'r') as f: 868 mention_data = json.load(f) 869 870 mention = mention_data['mention'] 871 print(f"📄 Using mention: {mention.get('id')}") 872 print(f"📝 Text: {mention.get('text')}") 873 874 # Check if it has a conversation_id 875 conversation_id = mention.get('conversation_id') 876 if not conversation_id: 877 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.") 878 return 879 880 print(f"🧵 Getting thread context for conversation: {conversation_id}") 881 882 # Get thread context 883 client = create_x_client() 884 thread_data = client.get_thread_context(conversation_id) 885 886 if thread_data: 887 tweets = thread_data.get('tweets', []) 888 print(f"✅ Retrieved thread with {len(tweets)} tweets") 889 890 # Convert to YAML 891 yaml_thread = thread_to_yaml_string(thread_data) 892 893 # Save thread context for inspection 894 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml" 895 with open(thread_file, 'w') as f: 896 f.write(yaml_thread) 897 898 print(f"💾 Saved thread context to: {thread_file}") 899 print("\n📋 Thread preview:") 900 print(yaml_thread) 901 else: 902 print("❌ Failed to retrieve thread context") 903 904 except Exception as e: 905 print(f"Thread context test failed: {e}") 906 907def test_letta_integration(agent_id: str = None): 908 """Test sending X thread context to Letta agent.""" 909 try: 910 from letta_client import Letta 911 import json 912 import yaml 913 914 # Load full config to access letta section 915 try: 916 with open("config.yaml", 'r') as f: 917 full_config = yaml.safe_load(f) 918 919 letta_config = full_config.get('letta', {}) 920 api_key = letta_config.get('api_key') 921 config_agent_id = letta_config.get('agent_id') 922 923 # Use agent_id from config if not provided as parameter 924 if not agent_id: 925 if config_agent_id: 926 agent_id = config_agent_id 927 print(f"ℹ️ Using agent_id from config: {agent_id}") 928 else: 929 print("❌ No agent_id found in config.yaml") 930 print("Expected config structure:") 931 print(" letta:") 932 print(" agent_id: your-agent-id") 933 return 934 else: 935 print(f"ℹ️ Using provided agent_id: {agent_id}") 936 937 if not api_key: 938 # Try loading from environment as fallback 939 import os 940 api_key = os.getenv('LETTA_API_KEY') 941 if not api_key: 942 print("❌ LETTA_API_KEY not found in config.yaml or environment") 943 print("Expected config structure:") 944 print(" letta:") 945 print(" api_key: your-letta-api-key") 946 return 947 else: 948 print("ℹ️ Using LETTA_API_KEY from environment") 949 else: 950 print("ℹ️ Using LETTA_API_KEY from config.yaml") 951 952 except Exception as e: 953 print(f"❌ Error loading config: {e}") 954 return 955 956 letta_client = Letta(token=api_key, timeout=600) 957 print(f"🤖 Connected to Letta, using agent: {agent_id}") 958 959 # Find a queued mention file 960 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 961 if not queue_files: 962 print("❌ No queued mentions found. Run 'python x.py queue' first.") 963 return 964 965 # Read the first mention 966 mention_file = queue_files[0] 967 with open(mention_file, 'r') as f: 968 mention_data = json.load(f) 969 970 mention = mention_data['mention'] 971 conversation_id = mention.get('conversation_id') 972 973 if not conversation_id: 974 print("❌ No conversation_id found in mention.") 975 return 976 977 print(f"🧵 Getting thread context for conversation: {conversation_id}") 978 979 # Get thread context 980 x_client = create_x_client() 981 thread_data = x_client.get_thread_context(conversation_id) 982 983 if not thread_data: 984 print("❌ Failed to retrieve thread context") 985 return 986 987 # Convert to YAML 988 yaml_thread = thread_to_yaml_string(thread_data) 989 990 # Create prompt for the agent 991 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately. 992 993Here is the thread context: 994 995{yaml_thread} 996 997Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona.""" 998 999 print(f"📤 Sending thread context to Letta agent...") 1000 1001 # Print the prompt in a rich panel 1002 rprint(Panel(prompt, title="Prompt", border_style="blue")) 1003 1004 # Send to Letta agent using streaming 1005 message_stream = letta_client.agents.messages.create_stream( 1006 agent_id=agent_id, 1007 messages=[{"role": "user", "content": prompt}], 1008 stream_tokens=False, 1009 max_steps=10 1010 ) 1011 1012 print("🔄 Streaming response from agent...") 1013 response_text = "" 1014 1015 for chunk in message_stream: 1016 print(chunk) 1017 if hasattr(chunk, 'message_type'): 1018 if chunk.message_type == 'assistant_message': 1019 print(f"🤖 Agent response: {chunk.content}") 1020 response_text = chunk.content 1021 elif chunk.message_type == 'reasoning_message': 1022 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...") 1023 elif chunk.message_type == 'tool_call_message': 1024 print(f"🔧 Agent tool call: {chunk.tool_call.name}") 1025 1026 if response_text: 1027 print(f"\n✅ Agent generated response:") 1028 print(f"📝 Response: {response_text}") 1029 else: 1030 print("❌ No response generated by agent") 1031 1032 except Exception as e: 1033 print(f"Letta integration test failed: {e}") 1034 import traceback 1035 traceback.print_exc() 1036 1037def test_x_client(): 1038 """Test the X client by fetching mentions.""" 1039 try: 1040 client = create_x_client() 1041 mentions = client.get_mentions(max_results=5) 1042 1043 if mentions: 1044 print(f"Successfully retrieved {len(mentions)} mentions:") 1045 for mention in mentions: 1046 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...") 1047 else: 1048 print("No mentions retrieved") 1049 1050 except Exception as e: 1051 print(f"Test failed: {e}") 1052 1053def reply_to_cameron_post(): 1054 """ 1055 Reply to Cameron's specific X post. 1056 1057 NOTE: This requires OAuth User Context authentication, not Bearer token. 1058 Current Bearer token is Application-Only which can't post. 1059 """ 1060 try: 1061 client = create_x_client() 1062 1063 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618 1064 cameron_post_id = "1950690566909710618" 1065 1066 # Simple reply message 1067 reply_text = "Hello from void! 🤖 Testing X integration." 1068 1069 print(f"Attempting to reply to post {cameron_post_id}") 1070 print(f"Reply text: {reply_text}") 1071 print("\nNOTE: This will fail with current Bearer token (Application-Only)") 1072 print("Posting requires OAuth User Context authentication") 1073 1074 result = client.post_reply(reply_text, cameron_post_id) 1075 1076 if result: 1077 print(f"✅ Successfully posted reply!") 1078 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}") 1079 else: 1080 print("❌ Failed to post reply (expected with current auth)") 1081 1082 except Exception as e: 1083 print(f"Reply failed: {e}") 1084 1085def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False): 1086 """ 1087 Process an X mention and generate a reply using the Letta agent. 1088 Similar to bsky.py process_mention but for X/Twitter. 1089 1090 Args: 1091 void_agent: The Letta agent instance 1092 x_client: The X API client 1093 mention_data: The mention data dictionary 1094 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 1095 testing_mode: If True, don't actually post to X 1096 1097 Returns: 1098 True: Successfully processed, remove from queue 1099 False: Failed but retryable, keep in queue 1100 None: Failed with non-retryable error, move to errors directory 1101 "no_reply": No reply was generated, move to no_reply directory 1102 """ 1103 try: 1104 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}") 1105 1106 # Extract mention details 1107 if isinstance(mention_data, dict): 1108 # Handle both raw mention and queued mention formats 1109 if 'mention' in mention_data: 1110 mention = mention_data['mention'] 1111 else: 1112 mention = mention_data 1113 else: 1114 mention = mention_data 1115 1116 mention_id = mention.get('id') 1117 mention_text = mention.get('text', '') 1118 author_id = mention.get('author_id') 1119 conversation_id = mention.get('conversation_id') 1120 in_reply_to_user_id = mention.get('in_reply_to_user_id') 1121 referenced_tweets = mention.get('referenced_tweets', []) 1122 1123 # Check downrank list - only respond to downranked users 10% of the time 1124 downrank_users = load_downrank_users() 1125 if not should_respond_to_downranked_user(str(author_id), downrank_users): 1126 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection") 1127 return "no_reply" 1128 1129 # Enhanced conversation tracking for debug - especially important for Grok handling 1130 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}") 1131 logger.info(f" Author ID: {author_id}") 1132 logger.info(f" Conversation ID: {conversation_id}") 1133 logger.info(f" In Reply To User ID: {in_reply_to_user_id}") 1134 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items") 1135 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets 1136 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}") 1137 logger.info(f" Text preview: {mention_text[:100]}...") 1138 1139 if not conversation_id: 1140 logger.warning(f"❌ No conversation_id found for mention {mention_id} - this may cause thread context issues") 1141 return None 1142 1143 # Get thread context (disable cache for missing context issues) 1144 # Use mention_id as until_id to exclude tweets that occurred after this mention 1145 try: 1146 thread_data = x_client.get_thread_context(conversation_id, use_cache=False, until_id=mention_id) 1147 if not thread_data: 1148 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}") 1149 return False 1150 1151 # If this mention references a specific tweet, ensure we have that tweet in context 1152 if referenced_tweets: 1153 for ref in referenced_tweets: 1154 if ref.get('type') == 'replied_to': 1155 ref_id = ref.get('id') 1156 # Check if the referenced tweet is in our thread data 1157 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])] 1158 if ref_id and ref_id not in thread_tweet_ids: 1159 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch") 1160 try: 1161 # Fetch the missing referenced tweet directly 1162 endpoint = f"/tweets/{ref_id}" 1163 params = { 1164 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 1165 "user.fields": "id,name,username", 1166 "expansions": "author_id" 1167 } 1168 response = x_client._make_request(endpoint, params) 1169 if response and "data" in response: 1170 missing_tweet = response["data"] 1171 if missing_tweet.get('conversation_id') == conversation_id: 1172 # Add to thread data 1173 if 'tweets' not in thread_data: 1174 thread_data['tweets'] = [] 1175 thread_data['tweets'].append(missing_tweet) 1176 1177 # Add user data if available 1178 if "includes" in response and "users" in response["includes"]: 1179 if 'users' not in thread_data: 1180 thread_data['users'] = {} 1181 for user in response["includes"]["users"]: 1182 thread_data['users'][user["id"]] = user 1183 1184 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context") 1185 else: 1186 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}") 1187 except Exception as e: 1188 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}") 1189 1190 # Enhanced thread context debugging 1191 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}") 1192 thread_posts = thread_data.get('tweets', []) 1193 thread_users = thread_data.get('users', {}) 1194 logger.info(f" Posts in thread: {len(thread_posts)}") 1195 logger.info(f" Users in thread: {len(thread_users)}") 1196 1197 # Log thread participants for Grok detection 1198 for user_id, user_info in thread_users.items(): 1199 username = user_info.get('username', 'unknown') 1200 name = user_info.get('name', 'Unknown') 1201 is_verified = user_info.get('verified', False) 1202 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}") 1203 1204 # Special logging for Grok or AI-related users 1205 if 'grok' in username.lower() or 'grok' in name.lower(): 1206 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})") 1207 1208 # Log conversation structure 1209 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts 1210 post_id = post.get('id') 1211 post_author = post.get('author_id') 1212 post_text = post.get('text', '')[:50] 1213 is_reply = 'in_reply_to_user_id' in post 1214 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...") 1215 1216 except Exception as e: 1217 logger.error(f"❌ Error getting thread context: {e}") 1218 return False 1219 1220 # Convert to YAML string 1221 thread_context = thread_to_yaml_string(thread_data) 1222 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters") 1223 1224 # Save comprehensive conversation data for debugging 1225 try: 1226 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1227 debug_dir.mkdir(parents=True, exist_ok=True) 1228 1229 # Save raw thread data (JSON) 1230 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f: 1231 json.dump(thread_data, f, indent=2) 1232 1233 # Save YAML thread context 1234 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f: 1235 f.write(thread_context) 1236 1237 # Save mention processing debug info 1238 debug_info = { 1239 'processed_at': datetime.now().isoformat(), 1240 'mention_id': mention_id, 1241 'conversation_id': conversation_id, 1242 'author_id': author_id, 1243 'in_reply_to_user_id': in_reply_to_user_id, 1244 'referenced_tweets': referenced_tweets, 1245 'thread_stats': { 1246 'total_posts': len(thread_posts), 1247 'total_users': len(thread_users), 1248 'yaml_length': len(thread_context) 1249 }, 1250 'users_in_conversation': { 1251 user_id: { 1252 'username': user_info.get('username'), 1253 'name': user_info.get('name'), 1254 'verified': user_info.get('verified', False), 1255 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower() 1256 } 1257 for user_id, user_info in thread_users.items() 1258 } 1259 } 1260 1261 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f: 1262 json.dump(debug_info, f, indent=2) 1263 1264 logger.info(f"💾 Saved conversation debug data to: {debug_dir}") 1265 1266 except Exception as debug_error: 1267 logger.warning(f"Failed to save debug data: {debug_error}") 1268 # Continue processing even if debug save fails 1269 1270 # Check for #voidstop 1271 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower(): 1272 logger.info("Found #voidstop, skipping this mention") 1273 return True 1274 1275 # Ensure X user blocks are attached 1276 try: 1277 ensure_x_user_blocks_attached(thread_data, void_agent.id) 1278 except Exception as e: 1279 logger.warning(f"Failed to ensure X user blocks: {e}") 1280 # Continue without user blocks rather than failing completely 1281 1282 # Create prompt for Letta agent 1283 author_info = thread_data.get('users', {}).get(author_id, {}) 1284 author_username = author_info.get('username', 'unknown') 1285 author_name = author_info.get('name', author_username) 1286 1287 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}). 1288 1289MOST RECENT POST (the mention you're responding to): 1290"{mention_text}" 1291 1292FULL THREAD CONTEXT: 1293```yaml 1294{thread_context} 1295``` 1296 1297The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 1298 1299If you need to update user information, use the x_user_* tools. 1300 1301To reply, use the add_post_to_x_thread tool: 1302- Each call creates one post (max 280 characters) 1303- For most responses, a single call is sufficient 1304- Only use multiple calls for threaded replies when: 1305 * The topic requires extended explanation that cannot fit in 280 characters 1306 * You're explicitly asked for a detailed/long response 1307 * The conversation naturally benefits from a structured multi-part answer 1308- Avoid unnecessary threads - be concise when possible""" 1309 1310 # Log mention processing 1311 title = f"X MENTION FROM @{author_username}" 1312 print(f"\n{title}") 1313 print(f" {'' * len(title)}") 1314 for line in mention_text.split('\n'): 1315 print(f" {line}") 1316 1317 # Send to Letta agent 1318 from config_loader import get_letta_config 1319 from letta_client import Letta 1320 1321 config = get_letta_config() 1322 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 1323 1324 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars") 1325 1326 try: 1327 # Use streaming to avoid timeout errors 1328 message_stream = letta_client.agents.messages.create_stream( 1329 agent_id=void_agent.id, 1330 messages=[{"role": "user", "content": prompt}], 1331 stream_tokens=False, 1332 max_steps=100 1333 ) 1334 1335 # Collect streaming response (simplified version of bsky.py logic) 1336 all_messages = [] 1337 for chunk in message_stream: 1338 if hasattr(chunk, 'message_type'): 1339 if chunk.message_type == 'reasoning_message': 1340 print("\n◆ Reasoning") 1341 print(" ─────────") 1342 for line in chunk.reasoning.split('\n'): 1343 print(f" {line}") 1344 elif chunk.message_type == 'tool_call_message': 1345 tool_name = chunk.tool_call.name 1346 if tool_name == 'add_post_to_x_thread': 1347 try: 1348 args = json.loads(chunk.tool_call.arguments) 1349 text = args.get('text', '') 1350 if text: 1351 print("\n✎ X Post") 1352 print(" ────────") 1353 for line in text.split('\n'): 1354 print(f" {line}") 1355 except: 1356 pass 1357 elif tool_name == 'halt_activity': 1358 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT") 1359 if queue_filepath and queue_filepath.exists(): 1360 queue_filepath.unlink() 1361 logger.info(f"Deleted queue file: {queue_filepath.name}") 1362 logger.info("=== X BOT TERMINATED BY AGENT ===") 1363 exit(0) 1364 elif chunk.message_type == 'tool_return_message': 1365 tool_name = chunk.name 1366 status = chunk.status 1367 if status == 'success' and tool_name == 'add_post_to_x_thread': 1368 print("\n✓ X Post Queued") 1369 print(" ──────────────") 1370 print(" Post queued successfully") 1371 elif chunk.message_type == 'assistant_message': 1372 print("\n▶ Assistant Response") 1373 print(" ──────────────────") 1374 for line in chunk.content.split('\n'): 1375 print(f" {line}") 1376 1377 all_messages.append(chunk) 1378 if str(chunk) == 'done': 1379 break 1380 1381 # Convert streaming response for compatibility 1382 message_response = type('StreamingResponse', (), { 1383 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 1384 })() 1385 1386 except Exception as api_error: 1387 logger.error(f"Letta API error: {api_error}") 1388 raise 1389 1390 # Extract successful add_post_to_x_thread tool calls 1391 reply_candidates = [] 1392 tool_call_results = {} 1393 ignored_notification = False 1394 ack_note = None # Track any note from annotate_ack tool 1395 1396 # First pass: collect tool return statuses 1397 for message in message_response.messages: 1398 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 1399 if message.name == 'add_post_to_x_thread': 1400 tool_call_results[message.tool_call_id] = message.status 1401 elif message.name == 'ignore_notification': 1402 if message.status == 'success': 1403 ignored_notification = True 1404 logger.info("🚫 X notification ignored") 1405 1406 # Second pass: collect successful tool calls 1407 for message in message_response.messages: 1408 if hasattr(message, 'tool_call') and message.tool_call: 1409 # Collect annotate_ack tool calls 1410 if message.tool_call.name == 'annotate_ack': 1411 try: 1412 args = json.loads(message.tool_call.arguments) 1413 note = args.get('note', '') 1414 if note: 1415 ack_note = note 1416 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 1417 except json.JSONDecodeError as e: 1418 logger.error(f"Failed to parse annotate_ack arguments: {e}") 1419 1420 # Collect add_post_to_x_thread tool calls - only if they were successful 1421 elif message.tool_call.name == 'add_post_to_x_thread': 1422 tool_call_id = message.tool_call.tool_call_id 1423 tool_status = tool_call_results.get(tool_call_id, 'unknown') 1424 1425 if tool_status == 'success': 1426 try: 1427 args = json.loads(message.tool_call.arguments) 1428 reply_text = args.get('text', '') 1429 if reply_text: 1430 reply_candidates.append(reply_text) 1431 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...") 1432 except json.JSONDecodeError as e: 1433 logger.error(f"Failed to parse tool call arguments: {e}") 1434 1435 # Save agent response data to debug folder 1436 try: 1437 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1438 1439 # Save complete agent interaction 1440 agent_response_data = { 1441 'processed_at': datetime.now().isoformat(), 1442 'mention_id': mention_id, 1443 'conversation_id': conversation_id, 1444 'prompt_sent': prompt, 1445 'reply_candidates': reply_candidates, 1446 'ignored_notification': ignored_notification, 1447 'ack_note': ack_note, 1448 'tool_call_results': tool_call_results, 1449 'all_messages': [] 1450 } 1451 1452 # Convert messages to serializable format 1453 for message in message_response.messages: 1454 msg_data = { 1455 'message_type': getattr(message, 'message_type', 'unknown'), 1456 'content': getattr(message, 'content', ''), 1457 'reasoning': getattr(message, 'reasoning', ''), 1458 'status': getattr(message, 'status', ''), 1459 'name': getattr(message, 'name', ''), 1460 } 1461 1462 if hasattr(message, 'tool_call') and message.tool_call: 1463 msg_data['tool_call'] = { 1464 'name': message.tool_call.name, 1465 'arguments': message.tool_call.arguments, 1466 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '') 1467 } 1468 1469 agent_response_data['all_messages'].append(msg_data) 1470 1471 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f: 1472 json.dump(agent_response_data, f, indent=2) 1473 1474 logger.info(f"💾 Saved agent response debug data") 1475 1476 except Exception as debug_error: 1477 logger.warning(f"Failed to save agent response debug data: {debug_error}") 1478 1479 # Handle conflicts 1480 if reply_candidates and ignored_notification: 1481 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!") 1482 return False 1483 1484 if reply_candidates: 1485 # Post replies to X 1486 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X") 1487 1488 if len(reply_candidates) == 1: 1489 content = reply_candidates[0] 1490 title = f"Reply to @{author_username}" 1491 else: 1492 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)]) 1493 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)" 1494 1495 print(f"\n{title}") 1496 print(f" {'' * len(title)}") 1497 for line in content.split('\n'): 1498 print(f" {line}") 1499 1500 if testing_mode: 1501 logger.info("TESTING MODE: Skipping actual X post") 1502 return True 1503 else: 1504 # Post to X using thread approach 1505 success = post_x_thread_replies(x_client, mention_id, reply_candidates) 1506 if success: 1507 logger.info(f"Successfully replied to @{author_username} on X") 1508 1509 # Acknowledge the post we're replying to 1510 try: 1511 ack_result = acknowledge_x_post(x_client, mention_id, ack_note) 1512 if ack_result: 1513 if ack_note: 1514 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")") 1515 else: 1516 logger.info(f"Successfully acknowledged X post from @{author_username}") 1517 else: 1518 logger.warning(f"Failed to acknowledge X post from @{author_username}") 1519 except Exception as e: 1520 logger.error(f"Error acknowledging X post from @{author_username}: {e}") 1521 # Don't fail the entire operation if acknowledgment fails 1522 1523 return True 1524 else: 1525 logger.error(f"Failed to send reply to @{author_username} on X") 1526 return False 1527 else: 1528 if ignored_notification: 1529 logger.info(f"X mention from @{author_username} was explicitly ignored") 1530 return "ignored" 1531 else: 1532 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass") 1533 return False # Keep in queue for retry instead of removing 1534 1535 except Exception as e: 1536 logger.error(f"Error processing X mention: {e}") 1537 return False 1538 1539def acknowledge_x_post(x_client, post_id, note=None): 1540 """ 1541 Acknowledge an X post that we replied to. 1542 Uses the same Bluesky client and uploads to the void data repository on atproto, 1543 just like Bluesky acknowledgments. 1544 1545 Args: 1546 x_client: XClient instance (not used, kept for compatibility) 1547 post_id: The X post ID we're acknowledging 1548 note: Optional note to include with the acknowledgment 1549 1550 Returns: 1551 True if successful, False otherwise 1552 """ 1553 try: 1554 # Use Bluesky client to upload acks to the void data repository on atproto 1555 bsky_client = bsky_utils.default_login() 1556 1557 # Create a synthetic URI and CID for the X post 1558 # X posts don't have atproto URIs/CIDs, so we create identifiers 1559 post_uri = f"x://twitter.com/post/{post_id}" 1560 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts 1561 1562 # Use the same acknowledge_post function as Bluesky 1563 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note) 1564 1565 if ack_result: 1566 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else "")) 1567 return True 1568 else: 1569 logger.error(f"Failed to acknowledge X post {post_id}") 1570 return False 1571 1572 except Exception as e: 1573 logger.error(f"Error acknowledging X post {post_id}: {e}") 1574 return False 1575 1576def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages): 1577 """ 1578 Post a series of replies to X, threading them properly. 1579 1580 Args: 1581 x_client: XClient instance 1582 in_reply_to_tweet_id: The original tweet ID to reply to 1583 reply_messages: List of reply text strings 1584 1585 Returns: 1586 True if successful, False otherwise 1587 """ 1588 try: 1589 current_reply_id = in_reply_to_tweet_id 1590 1591 for i, reply_text in enumerate(reply_messages): 1592 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...") 1593 1594 result = x_client.post_reply(reply_text, current_reply_id) 1595 1596 if result and 'data' in result: 1597 new_tweet_id = result['data']['id'] 1598 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}") 1599 # For threading, the next reply should reply to this one 1600 current_reply_id = new_tweet_id 1601 else: 1602 logger.error(f"Failed to post X reply {i+1}") 1603 return False 1604 1605 return True 1606 1607 except Exception as e: 1608 logger.error(f"Error posting X thread replies: {e}") 1609 return False 1610 1611def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False): 1612 """ 1613 Load and process all X mentions from the queue. 1614 Similar to bsky.py load_and_process_queued_notifications but for X. 1615 """ 1616 try: 1617 # Get all X mention files in queue directory 1618 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1619 1620 if not queue_files: 1621 return 1622 1623 # Load file metadata and sort by creation time (chronological order) 1624 file_metadata = [] 1625 for filepath in queue_files: 1626 try: 1627 with open(filepath, 'r') as f: 1628 queue_data = json.load(f) 1629 mention_data = queue_data.get('mention', queue_data) 1630 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing 1631 file_metadata.append((created_at, filepath)) 1632 except Exception as e: 1633 logger.warning(f"Error reading queue file {filepath.name}: {e}") 1634 # Add with default timestamp so it still gets processed 1635 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath)) 1636 1637 # Sort by creation time (oldest first) 1638 file_metadata.sort(key=lambda x: x[0]) 1639 1640 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order") 1641 1642 for i, (created_at, filepath) in enumerate(file_metadata, 1): 1643 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})") 1644 1645 try: 1646 # Load mention data 1647 with open(filepath, 'r') as f: 1648 queue_data = json.load(f) 1649 1650 mention_data = queue_data.get('mention', queue_data) 1651 1652 # Process the mention 1653 success = process_x_mention(void_agent, x_client, mention_data, 1654 queue_filepath=filepath, testing_mode=testing_mode) 1655 1656 except XRateLimitError: 1657 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning") 1658 break 1659 1660 except Exception as e: 1661 logger.error(f"Error processing X queue file {filepath.name}: {e}") 1662 continue 1663 1664 # Handle file based on processing result 1665 if success: 1666 if testing_mode: 1667 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}") 1668 else: 1669 filepath.unlink() 1670 logger.info(f"Successfully processed and removed X file: {filepath.name}") 1671 1672 # Mark as processed 1673 processed_mentions = load_processed_mentions() 1674 processed_mentions.add(mention_data.get('id')) 1675 save_processed_mentions(processed_mentions) 1676 1677 elif success is None: # Move to error directory 1678 error_dir = X_QUEUE_DIR / "errors" 1679 error_dir.mkdir(exist_ok=True) 1680 error_path = error_dir / filepath.name 1681 filepath.rename(error_path) 1682 logger.warning(f"Moved X file {filepath.name} to errors directory") 1683 1684 elif success == "no_reply": # Move to no_reply directory 1685 no_reply_dir = X_QUEUE_DIR / "no_reply" 1686 no_reply_dir.mkdir(exist_ok=True) 1687 no_reply_path = no_reply_dir / filepath.name 1688 filepath.rename(no_reply_path) 1689 logger.info(f"Moved X file {filepath.name} to no_reply directory") 1690 1691 elif success == "ignored": # Delete ignored notifications 1692 filepath.unlink() 1693 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}") 1694 1695 else: 1696 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry") 1697 1698 except Exception as e: 1699 logger.error(f"Error loading queued X mentions: {e}") 1700 1701def process_x_notifications(void_agent, x_client, testing_mode=False): 1702 """ 1703 Fetch new X mentions, queue them, and process the queue. 1704 Similar to bsky.py process_notifications but for X. 1705 """ 1706 try: 1707 # Get username for fetching mentions 1708 user_info = x_client._make_request("/users/me", params={"user.fields": "username"}) 1709 if not user_info or "data" not in user_info: 1710 logger.error("Could not get username for X mentions") 1711 return 1712 1713 username = user_info["data"]["username"] 1714 1715 # Fetch and queue new mentions 1716 new_count = fetch_and_queue_mentions(username) 1717 1718 if new_count > 0: 1719 logger.info(f"Found {new_count} new X mentions to process") 1720 1721 # Process the entire queue 1722 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 1723 1724 except Exception as e: 1725 logger.error(f"Error processing X notifications: {e}") 1726 1727def initialize_x_void(): 1728 """Initialize the void agent for X operations.""" 1729 logger.info("Starting void agent initialization for X...") 1730 1731 from config_loader import get_letta_config 1732 from letta_client import Letta 1733 1734 # Get config 1735 config = get_letta_config() 1736 client = Letta(token=config['api_key'], timeout=config['timeout']) 1737 agent_id = config['agent_id'] 1738 1739 try: 1740 void_agent = client.agents.retrieve(agent_id=agent_id) 1741 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})") 1742 except Exception as e: 1743 logger.error(f"Failed to load void agent {agent_id}: {e}") 1744 raise e 1745 1746 # Ensure correct tools are attached for X 1747 logger.info("Configuring tools for X platform...") 1748 try: 1749 from tool_manager import ensure_platform_tools 1750 ensure_platform_tools('x', void_agent.id) 1751 except Exception as e: 1752 logger.error(f"Failed to configure platform tools: {e}") 1753 logger.warning("Continuing with existing tool configuration") 1754 1755 # Log agent details 1756 logger.info(f"X Void agent details - ID: {void_agent.id}") 1757 logger.info(f"Agent name: {void_agent.name}") 1758 1759 return void_agent 1760 1761def x_main_loop(testing_mode=False): 1762 """ 1763 Main X bot loop that continuously monitors for mentions and processes them. 1764 Similar to bsky.py main() but for X/Twitter. 1765 """ 1766 import time 1767 from time import sleep 1768 1769 logger.info("=== STARTING X VOID BOT ===") 1770 1771 # Initialize void agent 1772 void_agent = initialize_x_void() 1773 logger.info(f"X void agent initialized: {void_agent.id}") 1774 1775 # Initialize X client 1776 x_client = create_x_client() 1777 logger.info("Connected to X API") 1778 1779 # Main loop 1780 FETCH_DELAY_SEC = 60 # Check every minute for X mentions 1781 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds") 1782 1783 if testing_mode: 1784 logger.info("=== RUNNING IN X TESTING MODE ===") 1785 logger.info(" - No messages will be sent to X") 1786 logger.info(" - Queue files will not be deleted") 1787 1788 cycle_count = 0 1789 start_time = time.time() 1790 1791 while True: 1792 try: 1793 cycle_count += 1 1794 logger.info(f"=== X CYCLE {cycle_count} ===") 1795 1796 # Process X notifications (fetch, queue, and process) 1797 process_x_notifications(void_agent, x_client, testing_mode) 1798 1799 # Log cycle completion 1800 elapsed_time = time.time() - start_time 1801 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes") 1802 1803 sleep(FETCH_DELAY_SEC) 1804 1805 except KeyboardInterrupt: 1806 elapsed_time = time.time() - start_time 1807 logger.info("=== X BOT STOPPED BY USER ===") 1808 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes") 1809 break 1810 except Exception as e: 1811 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===") 1812 logger.error(f"Error details: {e}") 1813 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...") 1814 sleep(FETCH_DELAY_SEC * 2) 1815 1816def process_queue_only(testing_mode=False): 1817 """ 1818 Process all queued X mentions without fetching new ones. 1819 Useful for rate limit management - queue first, then process separately. 1820 1821 Args: 1822 testing_mode: If True, don't actually post to X and keep queue files 1823 """ 1824 logger.info("=== PROCESSING X QUEUE ONLY ===") 1825 1826 if testing_mode: 1827 logger.info("=== RUNNING IN X TESTING MODE ===") 1828 logger.info(" - No messages will be sent to X") 1829 logger.info(" - Queue files will not be deleted") 1830 1831 try: 1832 # Initialize void agent 1833 void_agent = initialize_x_void() 1834 logger.info(f"X void agent initialized: {void_agent.id}") 1835 1836 # Initialize X client 1837 x_client = create_x_client() 1838 logger.info("Connected to X API") 1839 1840 # Process the queue without fetching new mentions 1841 logger.info("Processing existing X queue...") 1842 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 1843 1844 logger.info("=== X QUEUE PROCESSING COMPLETE ===") 1845 1846 except Exception as e: 1847 logger.error(f"Error processing X queue: {e}") 1848 raise 1849 1850def x_notification_loop(): 1851 """ 1852 DEPRECATED: Old X notification loop using search-based mention detection. 1853 Use x_main_loop() instead for the full bot experience. 1854 """ 1855 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.") 1856 x_main_loop() 1857 1858if __name__ == "__main__": 1859 import sys 1860 import argparse 1861 1862 if len(sys.argv) > 1: 1863 if sys.argv[1] == "bot": 1864 # Main bot with optional --test flag 1865 parser = argparse.ArgumentParser(description='X Void Bot') 1866 parser.add_argument('command', choices=['bot']) 1867 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)') 1868 args = parser.parse_args() 1869 x_main_loop(testing_mode=args.test) 1870 elif sys.argv[1] == "loop": 1871 x_notification_loop() 1872 elif sys.argv[1] == "reply": 1873 reply_to_cameron_post() 1874 elif sys.argv[1] == "me": 1875 get_my_user_info() 1876 elif sys.argv[1] == "search": 1877 test_search_mentions() 1878 elif sys.argv[1] == "queue": 1879 test_fetch_and_queue() 1880 elif sys.argv[1] == "thread": 1881 test_thread_context() 1882 elif sys.argv[1] == "process": 1883 # Process all queued mentions with optional --test flag 1884 testing_mode = "--test" in sys.argv 1885 process_queue_only(testing_mode=testing_mode) 1886 elif sys.argv[1] == "letta": 1887 # Use specific agent ID if provided, otherwise use from config 1888 agent_id = sys.argv[2] if len(sys.argv) > 2 else None 1889 test_letta_integration(agent_id) 1890 elif sys.argv[1] == "downrank": 1891 # View or manage downrank list 1892 if len(sys.argv) > 2 and sys.argv[2] == "list": 1893 downrank_users = load_downrank_users() 1894 if downrank_users: 1895 print(f"📋 Downrank users ({len(downrank_users)} total):") 1896 for user_id in sorted(downrank_users): 1897 print(f" - {user_id}") 1898 else: 1899 print("📋 No downrank users configured") 1900 else: 1901 print("Usage: python x.py downrank list") 1902 print(" list - Show all downranked user IDs") 1903 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list") 1904 else: 1905 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]") 1906 print(" bot - Run the main X bot (use --test for testing mode)") 1907 print(" Example: python x.py bot --test") 1908 print(" queue - Fetch and queue mentions only (no processing)") 1909 print(" process - Process all queued mentions only (no fetching)") 1910 print(" Example: python x.py process --test") 1911 print(" downrank - Manage downrank users (10% response rate)") 1912 print(" Example: python x.py downrank list") 1913 print(" loop - Run the old notification monitoring loop (deprecated)") 1914 print(" reply - Reply to Cameron's specific post") 1915 print(" me - Get authenticated user info and correct user ID") 1916 print(" search - Test search-based mention detection") 1917 print(" thread - Test thread context retrieval from queued mention") 1918 print(" letta - Test sending thread context to Letta agent") 1919 print(" Optional: python x.py letta <agent-id>") 1920 else: 1921 test_x_client()