a digital person for bluesky
1import os 2import logging 3import requests 4import yaml 5import json 6import hashlib 7import random 8import time 9from typing import Optional, Dict, Any, List, Set 10from datetime import datetime 11from pathlib import Path 12from requests_oauthlib import OAuth1 13from rich import print as rprint 14from rich.panel import Panel 15from rich.text import Text 16 17import bsky_utils 18 19class XRateLimitError(Exception): 20 """Exception raised when X API rate limit is exceeded""" 21 pass 22 23 24# Configure logging 25logging.basicConfig( 26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 27) 28logger = logging.getLogger("x_client") 29 30# X-specific file paths 31X_QUEUE_DIR = Path("x_queue") 32X_CACHE_DIR = Path("x_cache") 33X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json") 34X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json") 35X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt") 36 37class XClient: 38 """X (Twitter) API client for fetching mentions and managing interactions.""" 39 40 def __init__(self, api_key: str, user_id: str, access_token: str = None, 41 consumer_key: str = None, consumer_secret: str = None, 42 access_token_secret: str = None): 43 self.api_key = api_key 44 self.access_token = access_token 45 self.user_id = user_id 46 self.base_url = "https://api.x.com/2" 47 48 # Check if we have OAuth 1.0a credentials 49 if (consumer_key and consumer_secret and access_token and access_token_secret): 50 # Use OAuth 1.0a for User Context 51 self.oauth = OAuth1( 52 consumer_key, 53 client_secret=consumer_secret, 54 resource_owner_key=access_token, 55 resource_owner_secret=access_token_secret 56 ) 57 self.headers = {"Content-Type": "application/json"} 58 self.auth_method = "oauth1a" 59 logger.info("Using OAuth 1.0a User Context authentication for X API") 60 elif access_token: 61 # Use OAuth 2.0 Bearer token for User Context 62 self.oauth = None 63 self.headers = { 64 "Authorization": f"Bearer {access_token}", 65 "Content-Type": "application/json" 66 } 67 self.auth_method = "oauth2_user" 68 logger.info("Using OAuth 2.0 User Context access token for X API") 69 else: 70 # Use Application-Only Bearer token 71 self.oauth = None 72 self.headers = { 73 "Authorization": f"Bearer {api_key}", 74 "Content-Type": "application/json" 75 } 76 self.auth_method = "bearer" 77 logger.info("Using Application-Only Bearer token for X API") 78 79 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]: 80 """Make a request to the X API with proper error handling and exponential backoff.""" 81 url = f"{self.base_url}{endpoint}" 82 83 for attempt in range(max_retries): 84 try: 85 if method.upper() == "GET": 86 if self.oauth: 87 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth) 88 else: 89 response = requests.get(url, headers=self.headers, params=params) 90 elif method.upper() == "POST": 91 if self.oauth: 92 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth) 93 else: 94 response = requests.post(url, headers=self.headers, json=data) 95 else: 96 raise ValueError(f"Unsupported HTTP method: {method}") 97 98 response.raise_for_status() 99 return response.json() 100 101 except requests.exceptions.HTTPError as e: 102 if response.status_code == 401: 103 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials") 104 logger.error(f"Response: {response.text}") 105 return None # Don't retry auth failures 106 elif response.status_code == 403: 107 logger.error(f"X API forbidden with {self.auth_method} - check app permissions") 108 logger.error(f"Response: {response.text}") 109 return None # Don't retry permission failures 110 elif response.status_code == 429: 111 if attempt < max_retries - 1: 112 # Exponential backoff: 60s, 120s, 240s 113 backoff_time = 60 * (2 ** attempt) 114 logger.warning(f"X API rate limit exceeded (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry") 115 logger.error(f"Response: {response.text}") 116 time.sleep(backoff_time) 117 continue 118 else: 119 logger.error("X API rate limit exceeded - max retries reached") 120 logger.error(f"Response: {response.text}") 121 raise XRateLimitError("X API rate limit exceeded") 122 else: 123 if attempt < max_retries - 1: 124 # Exponential backoff for other HTTP errors too 125 backoff_time = 30 * (2 ** attempt) 126 logger.warning(f"X API request failed (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 127 logger.error(f"Response: {response.text}") 128 time.sleep(backoff_time) 129 continue 130 else: 131 logger.error(f"X API request failed after {max_retries} attempts: {e}") 132 logger.error(f"Response: {response.text}") 133 return None 134 135 except Exception as e: 136 if attempt < max_retries - 1: 137 backoff_time = 15 * (2 ** attempt) 138 logger.warning(f"Unexpected error making X API request (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 139 time.sleep(backoff_time) 140 continue 141 else: 142 logger.error(f"Unexpected error making X API request after {max_retries} attempts: {e}") 143 return None 144 145 return None 146 147 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]: 148 """ 149 Fetch mentions for the configured user. 150 151 Args: 152 since_id: Minimum Post ID to include (for getting newer mentions) 153 max_results: Number of results to return (5-100) 154 155 Returns: 156 List of mention objects or None if request failed 157 """ 158 endpoint = f"/users/{self.user_id}/mentions" 159 params = { 160 "max_results": min(max(max_results, 5), 100), # Ensure within API limits 161 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets", 162 "user.fields": "id,name,username", 163 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 164 } 165 166 if since_id: 167 params["since_id"] = since_id 168 169 logger.info(f"Fetching mentions for user {self.user_id}") 170 response = self._make_request(endpoint, params) 171 172 if response: 173 logger.debug(f"X API response: {response}") 174 175 if response and "data" in response: 176 mentions = response["data"] 177 logger.info(f"Retrieved {len(mentions)} mentions") 178 return mentions 179 else: 180 if response: 181 logger.info(f"No mentions in response. Full response: {response}") 182 else: 183 logger.warning("Request failed - no response received") 184 return [] 185 186 def get_user_info(self, user_id: str) -> Optional[Dict]: 187 """Get information about a specific user.""" 188 endpoint = f"/users/{user_id}" 189 params = { 190 "user.fields": "id,name,username,description,public_metrics" 191 } 192 193 response = self._make_request(endpoint, params) 194 return response.get("data") if response else None 195 196 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]: 197 """ 198 Search for mentions using the search endpoint instead of mentions endpoint. 199 This might have better rate limits than the direct mentions endpoint. 200 201 Args: 202 username: Username to search for mentions of (without @) 203 max_results: Number of results to return (10-100) 204 since_id: Only return results newer than this tweet ID 205 206 Returns: 207 List of tweets mentioning the username 208 """ 209 endpoint = "/tweets/search/recent" 210 211 # Search for mentions of the username 212 query = f"@{username}" 213 214 params = { 215 "query": query, 216 "max_results": min(max(max_results, 10), 100), 217 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 218 "user.fields": "id,name,username", 219 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 220 } 221 222 if since_id: 223 params["since_id"] = since_id 224 225 logger.info(f"Searching for mentions of @{username}") 226 response = self._make_request(endpoint, params) 227 228 if response and "data" in response: 229 tweets = response["data"] 230 logger.info(f"Found {len(tweets)} mentions via search") 231 return tweets 232 else: 233 if response: 234 logger.info(f"No mentions found via search. Response: {response}") 235 else: 236 logger.warning("Search request failed") 237 return [] 238 239 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]: 240 """ 241 Get all tweets in a conversation thread up to a specific tweet ID. 242 243 Args: 244 conversation_id: The conversation ID to fetch (should be the original tweet ID) 245 use_cache: Whether to use cached data if available 246 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID) 247 248 Returns: 249 List of tweets in the conversation, ordered chronologically 250 """ 251 # Check cache first if enabled 252 if use_cache: 253 cached_data = get_cached_thread_context(conversation_id) 254 if cached_data: 255 return cached_data 256 257 # First, get the original tweet directly since it might not appear in conversation search 258 original_tweet = None 259 try: 260 endpoint = f"/tweets/{conversation_id}" 261 params = { 262 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 263 "user.fields": "id,name,username", 264 "expansions": "author_id" 265 } 266 response = self._make_request(endpoint, params) 267 if response and "data" in response: 268 original_tweet = response["data"] 269 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}") 270 except Exception as e: 271 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}") 272 273 # Then search for all tweets in this conversation 274 endpoint = "/tweets/search/recent" 275 params = { 276 "query": f"conversation_id:{conversation_id}", 277 "max_results": 100, # Get as many as possible 278 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 279 "user.fields": "id,name,username", 280 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id", 281 "sort_order": "recency" # Get newest first, we'll reverse later 282 } 283 284 # Add until_id parameter to exclude tweets after the mention being processed 285 if until_id: 286 params["until_id"] = until_id 287 logger.info(f"Using until_id={until_id} to exclude future tweets") 288 289 logger.info(f"Fetching thread context for conversation {conversation_id}") 290 response = self._make_request(endpoint, params) 291 292 tweets = [] 293 users_data = {} 294 295 # Collect tweets from search 296 if response and "data" in response: 297 tweets.extend(response["data"]) 298 # Store user data for reference 299 if "includes" in response and "users" in response["includes"]: 300 for user in response["includes"]["users"]: 301 users_data[user["id"]] = user 302 303 # Add original tweet if we got it and it's not already in the list 304 if original_tweet: 305 tweet_ids = [t.get('id') for t in tweets] 306 if original_tweet.get('id') not in tweet_ids: 307 tweets.append(original_tweet) 308 logger.info("Added original tweet to thread context") 309 310 # Attempt to fill gaps by fetching referenced tweets that are missing 311 # This helps with X API's incomplete conversation search results 312 tweet_ids = set(t.get('id') for t in tweets) 313 missing_tweet_ids = set() 314 critical_missing_ids = set() 315 316 # Collect referenced tweet IDs, prioritizing critical ones 317 for tweet in tweets: 318 referenced_tweets = tweet.get('referenced_tweets', []) 319 for ref in referenced_tweets: 320 ref_id = ref.get('id') 321 ref_type = ref.get('type') 322 if ref_id and ref_id not in tweet_ids: 323 missing_tweet_ids.add(ref_id) 324 # Prioritize direct replies and quoted tweets over retweets 325 if ref_type in ['replied_to', 'quoted']: 326 critical_missing_ids.add(ref_id) 327 328 # For rate limit efficiency, only fetch critical missing tweets if we have many 329 if len(missing_tweet_ids) > 10: 330 logger.info(f"Many missing tweets ({len(missing_tweet_ids)}), prioritizing {len(critical_missing_ids)} critical ones") 331 missing_tweet_ids = critical_missing_ids 332 333 # Context sufficiency check - skip backfill if we already have enough context 334 if has_sufficient_context(tweets, missing_tweet_ids): 335 logger.info("Thread has sufficient context, skipping missing tweet backfill") 336 missing_tweet_ids = set() 337 338 # Fetch missing referenced tweets in batches (more rate-limit friendly) 339 if missing_tweet_ids: 340 missing_list = list(missing_tweet_ids) 341 342 # First, check cache for missing tweets 343 cached_tweets = get_cached_tweets(missing_list) 344 for tweet_id, cached_tweet in cached_tweets.items(): 345 if cached_tweet.get('conversation_id') == conversation_id: 346 tweets.append(cached_tweet) 347 tweet_ids.add(tweet_id) 348 logger.info(f"Retrieved missing tweet from cache: {tweet_id}") 349 350 # Add user data if available in cache 351 if cached_tweet.get('author_info'): 352 author_id = cached_tweet.get('author_id') 353 if author_id: 354 users_data[author_id] = cached_tweet['author_info'] 355 356 # Only fetch tweets that weren't found in cache 357 uncached_ids = [tid for tid in missing_list if tid not in cached_tweets] 358 359 if uncached_ids: 360 batch_size = 100 # X API limit for bulk tweet lookup 361 362 for i in range(0, len(uncached_ids), batch_size): 363 batch_ids = uncached_ids[i:i + batch_size] 364 try: 365 endpoint = "/tweets" 366 params = { 367 "ids": ",".join(batch_ids), 368 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 369 "user.fields": "id,name,username", 370 "expansions": "author_id" 371 } 372 response = self._make_request(endpoint, params) 373 374 if response and "data" in response: 375 fetched_tweets = [] 376 batch_users_data = {} 377 378 for missing_tweet in response["data"]: 379 # Only add if it's actually part of this conversation 380 if missing_tweet.get('conversation_id') == conversation_id: 381 tweets.append(missing_tweet) 382 tweet_ids.add(missing_tweet.get('id')) 383 fetched_tweets.append(missing_tweet) 384 logger.info(f"Retrieved missing referenced tweet: {missing_tweet.get('id')}") 385 386 # Add user data if available 387 if "includes" in response and "users" in response["includes"]: 388 for user in response["includes"]["users"]: 389 users_data[user["id"]] = user 390 batch_users_data[user["id"]] = user 391 392 # Cache the newly fetched tweets 393 if fetched_tweets: 394 save_cached_tweets(fetched_tweets, batch_users_data) 395 396 logger.info(f"Batch fetched {len(response['data'])} missing tweets from {len(batch_ids)} requested") 397 398 # Handle partial success - log any missing tweets that weren't found 399 if response and "errors" in response: 400 for error in response["errors"]: 401 logger.warning(f"Could not fetch tweet {error.get('resource_id')}: {error.get('title')}") 402 403 except Exception as e: 404 logger.warning(f"Could not fetch batch of missing tweets {batch_ids[:3]}...: {e}") 405 else: 406 logger.info(f"All {len(missing_list)} missing tweets found in cache") 407 408 if tweets: 409 # Filter out tweets that occur after until_id (if specified) 410 if until_id: 411 original_count = len(tweets) 412 # Convert until_id to int for comparison (Twitter IDs are sequential) 413 until_id_int = int(until_id) 414 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int] 415 filtered_count = len(tweets) 416 if original_count != filtered_count: 417 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}") 418 419 # Sort chronologically (oldest first) 420 tweets.sort(key=lambda x: x.get('created_at', '')) 421 logger.info(f"Retrieved {len(tweets)} tweets in thread") 422 423 thread_data = {"tweets": tweets, "users": users_data} 424 425 # Cache individual tweets from the thread for future backfill 426 save_cached_tweets(tweets, users_data) 427 428 # Cache the result 429 if use_cache: 430 save_cached_thread_context(conversation_id, thread_data) 431 432 return thread_data 433 else: 434 logger.warning("No tweets found for thread context") 435 return None 436 437 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]: 438 """ 439 Post a reply to a specific tweet. 440 441 Args: 442 reply_text: The text content of the reply 443 in_reply_to_tweet_id: The ID of the tweet to reply to 444 445 Returns: 446 Response data if successful, None if failed 447 """ 448 endpoint = "/tweets" 449 450 payload = { 451 "text": reply_text, 452 "reply": { 453 "in_reply_to_tweet_id": in_reply_to_tweet_id 454 } 455 } 456 457 logger.info(f"Attempting to post reply with {self.auth_method} authentication") 458 result = self._make_request(endpoint, method="POST", data=payload) 459 460 if result: 461 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}") 462 return result 463 else: 464 logger.error("Failed to post reply") 465 return None 466 467 def post_tweet(self, tweet_text: str) -> Optional[Dict]: 468 """ 469 Post a standalone tweet (not a reply). 470 471 Args: 472 tweet_text: The text content of the tweet (max 280 characters) 473 474 Returns: 475 Response data if successful, None if failed 476 """ 477 endpoint = "/tweets" 478 479 # Validate tweet length 480 if len(tweet_text) > 280: 481 logger.error(f"Tweet text too long: {len(tweet_text)} characters (max 280)") 482 return None 483 484 payload = { 485 "text": tweet_text 486 } 487 488 logger.info(f"Attempting to post tweet with {self.auth_method} authentication") 489 result = self._make_request(endpoint, method="POST", data=payload) 490 491 if result: 492 logger.info(f"Successfully posted tweet") 493 return result 494 else: 495 logger.error("Failed to post tweet") 496 return None 497 498def load_x_config(config_path: str = "config.yaml") -> Dict[str, str]: 499 """Load X configuration from config file.""" 500 try: 501 with open(config_path, 'r') as f: 502 config = yaml.safe_load(f) 503 504 x_config = config.get('x', {}) 505 if not x_config.get('api_key') or not x_config.get('user_id'): 506 raise ValueError("X API key and user_id must be configured in config.yaml") 507 508 return x_config 509 except Exception as e: 510 logger.error(f"Failed to load X configuration: {e}") 511 raise 512 513def create_x_client(config_path: str = "config.yaml") -> XClient: 514 """Create and return an X client with configuration loaded from file.""" 515 config = load_x_config(config_path) 516 return XClient( 517 api_key=config['api_key'], 518 user_id=config['user_id'], 519 access_token=config.get('access_token'), 520 consumer_key=config.get('consumer_key'), 521 consumer_secret=config.get('consumer_secret'), 522 access_token_secret=config.get('access_token_secret') 523 ) 524 525def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str: 526 """ 527 Convert a mention object to a YAML string for better AI comprehension. 528 Similar to thread_to_yaml_string in bsky_utils.py 529 """ 530 # Extract relevant fields 531 simplified_mention = { 532 'id': mention.get('id'), 533 'text': mention.get('text'), 534 'author_id': mention.get('author_id'), 535 'created_at': mention.get('created_at'), 536 'in_reply_to_user_id': mention.get('in_reply_to_user_id') 537 } 538 539 # Add user information if available 540 if users_data and mention.get('author_id') in users_data: 541 user = users_data[mention.get('author_id')] 542 simplified_mention['author'] = { 543 'username': user.get('username'), 544 'name': user.get('name') 545 } 546 547 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False) 548 549def thread_to_yaml_string(thread_data: Dict) -> str: 550 """ 551 Convert X thread context to YAML string for AI comprehension. 552 Similar to Bluesky's thread_to_yaml_string function. 553 554 Args: 555 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 556 557 Returns: 558 YAML string representation of the thread 559 """ 560 if not thread_data or "tweets" not in thread_data: 561 return "conversation: []\n" 562 563 tweets = thread_data["tweets"] 564 users_data = thread_data.get("users", {}) 565 566 simplified_thread = { 567 "conversation": [] 568 } 569 570 for tweet in tweets: 571 # Get user info 572 author_id = tweet.get('author_id') 573 author_info = {} 574 if author_id and author_id in users_data: 575 user = users_data[author_id] 576 author_info = { 577 'username': user.get('username'), 578 'name': user.get('name') 579 } 580 581 # Build tweet object (simplified for AI consumption) 582 tweet_obj = { 583 'text': tweet.get('text'), 584 'created_at': tweet.get('created_at'), 585 'author': author_info, 586 'author_id': author_id # Include user ID for block management 587 } 588 589 simplified_thread["conversation"].append(tweet_obj) 590 591 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False) 592 593 594def ensure_x_user_blocks_attached(thread_data: Dict, agent_id: Optional[str] = None) -> None: 595 """ 596 Ensure all users in the thread have their X user blocks attached. 597 Creates blocks with initial content including their handle if they don't exist. 598 599 Args: 600 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 601 agent_id: The Letta agent ID to attach blocks to (defaults to config agent_id) 602 """ 603 if not thread_data or "users" not in thread_data: 604 return 605 606 try: 607 from tools.blocks import attach_x_user_blocks, x_user_note_set 608 from config_loader import get_letta_config 609 from letta_client import Letta 610 611 # Get Letta client and agent_id from config 612 config = get_letta_config() 613 client = Letta(token=config['api_key'], timeout=config['timeout']) 614 615 # Use provided agent_id or get from config 616 if agent_id is None: 617 agent_id = config['agent_id'] 618 619 # Get agent info to create a mock agent_state for the functions 620 class MockAgentState: 621 def __init__(self, agent_id): 622 self.id = agent_id 623 624 agent_state = MockAgentState(agent_id) 625 626 users_data = thread_data["users"] 627 user_ids = list(users_data.keys()) 628 629 if not user_ids: 630 return 631 632 logger.info(f"Ensuring X user blocks for {len(user_ids)} users: {user_ids}") 633 634 # Get current blocks to check which users already have blocks with content 635 current_blocks = client.agents.blocks.list(agent_id=agent_id) 636 existing_user_blocks = {} 637 638 for block in current_blocks: 639 if block.label.startswith("x_user_"): 640 user_id = block.label.replace("x_user_", "") 641 existing_user_blocks[user_id] = block 642 643 # Attach all user blocks (this will create missing ones with basic content) 644 attach_result = attach_x_user_blocks(user_ids, agent_state) 645 logger.info(f"X user block attachment result: {attach_result}") 646 647 # For newly created blocks, update with user handle information 648 for user_id in user_ids: 649 if user_id not in existing_user_blocks: 650 user_info = users_data[user_id] 651 username = user_info.get('username', 'unknown') 652 name = user_info.get('name', 'Unknown') 653 654 # Set initial content with handle information 655 initial_content = f"# X User: {user_id}\n\n**Handle:** @{username}\n**Name:** {name}\n\nNo additional information about this user yet." 656 657 try: 658 x_user_note_set(user_id, initial_content, agent_state) 659 logger.info(f"Set initial content for X user {user_id} (@{username})") 660 except Exception as e: 661 logger.error(f"Failed to set initial content for X user {user_id}: {e}") 662 663 except Exception as e: 664 logger.error(f"Error ensuring X user blocks: {e}") 665 666 667# X Caching and Queue System Functions 668 669def load_last_seen_id() -> Optional[str]: 670 """Load the last seen mention ID for incremental fetching.""" 671 if X_LAST_SEEN_FILE.exists(): 672 try: 673 with open(X_LAST_SEEN_FILE, 'r') as f: 674 data = json.load(f) 675 return data.get('last_seen_id') 676 except Exception as e: 677 logger.error(f"Error loading last seen ID: {e}") 678 return None 679 680def save_last_seen_id(mention_id: str): 681 """Save the last seen mention ID.""" 682 try: 683 X_QUEUE_DIR.mkdir(exist_ok=True) 684 with open(X_LAST_SEEN_FILE, 'w') as f: 685 json.dump({ 686 'last_seen_id': mention_id, 687 'updated_at': datetime.now().isoformat() 688 }, f) 689 logger.debug(f"Saved last seen ID: {mention_id}") 690 except Exception as e: 691 logger.error(f"Error saving last seen ID: {e}") 692 693def load_processed_mentions() -> set: 694 """Load the set of processed mention IDs.""" 695 if X_PROCESSED_MENTIONS_FILE.exists(): 696 try: 697 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f: 698 data = json.load(f) 699 # Keep only recent entries (last 10000) 700 if len(data) > 10000: 701 data = data[-10000:] 702 save_processed_mentions(set(data)) 703 return set(data) 704 except Exception as e: 705 logger.error(f"Error loading processed mentions: {e}") 706 return set() 707 708def save_processed_mentions(processed_set: set): 709 """Save the set of processed mention IDs.""" 710 try: 711 X_QUEUE_DIR.mkdir(exist_ok=True) 712 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f: 713 json.dump(list(processed_set), f) 714 except Exception as e: 715 logger.error(f"Error saving processed mentions: {e}") 716 717def load_downrank_users() -> Set[str]: 718 """Load the set of user IDs that should be downranked (responded to 10% of the time).""" 719 try: 720 if not X_DOWNRANK_USERS_FILE.exists(): 721 return set() 722 723 downrank_users = set() 724 with open(X_DOWNRANK_USERS_FILE, 'r') as f: 725 for line in f: 726 line = line.strip() 727 # Skip empty lines and comments 728 if line and not line.startswith('#'): 729 downrank_users.add(line) 730 731 logger.info(f"Loaded {len(downrank_users)} downrank users") 732 return downrank_users 733 except Exception as e: 734 logger.error(f"Error loading downrank users: {e}") 735 return set() 736 737def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool: 738 """ 739 Check if we should respond to a downranked user. 740 Returns True 10% of the time for downranked users, True 100% of the time for others. 741 """ 742 if user_id not in downrank_users: 743 return True 744 745 # 10% chance for downranked users 746 should_respond = random.random() < 0.1 747 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)") 748 return should_respond 749 750def save_mention_to_queue(mention: Dict): 751 """Save a mention to the queue directory for async processing.""" 752 try: 753 mention_id = mention.get('id') 754 if not mention_id: 755 logger.error("Mention missing ID, cannot queue") 756 return 757 758 # Check if already processed 759 processed_mentions = load_processed_mentions() 760 if mention_id in processed_mentions: 761 logger.debug(f"Mention {mention_id} already processed, skipping") 762 return 763 764 # Create queue directory 765 X_QUEUE_DIR.mkdir(exist_ok=True) 766 767 # Create filename using hash (similar to Bluesky system) 768 mention_str = json.dumps(mention, sort_keys=True) 769 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16] 770 filename = f"x_mention_{mention_hash}.json" 771 772 queue_file = X_QUEUE_DIR / filename 773 774 # Save mention data with enhanced debugging information 775 mention_data = { 776 'mention': mention, 777 'queued_at': datetime.now().isoformat(), 778 'type': 'x_mention', 779 # Debug info for conversation tracking 780 'debug_info': { 781 'mention_id': mention.get('id'), 782 'author_id': mention.get('author_id'), 783 'conversation_id': mention.get('conversation_id'), 784 'in_reply_to_user_id': mention.get('in_reply_to_user_id'), 785 'referenced_tweets': mention.get('referenced_tweets', []), 786 'text_preview': mention.get('text', '')[:200], 787 'created_at': mention.get('created_at'), 788 'public_metrics': mention.get('public_metrics', {}), 789 'context_annotations': mention.get('context_annotations', []) 790 } 791 } 792 793 with open(queue_file, 'w') as f: 794 json.dump(mention_data, f, indent=2) 795 796 logger.info(f"Queued X mention {mention_id} -> {filename}") 797 798 except Exception as e: 799 logger.error(f"Error saving mention to queue: {e}") 800 801# X Cache Functions 802def get_cached_thread_context(conversation_id: str) -> Optional[Dict]: 803 """Load cached thread context if available.""" 804 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 805 if cache_file.exists(): 806 try: 807 with open(cache_file, 'r') as f: 808 cached_data = json.load(f) 809 # Check if cache is recent (within 1 hour) 810 from datetime import datetime, timedelta 811 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 812 if datetime.now() - cached_time < timedelta(hours=1): 813 logger.info(f"Using cached thread context for {conversation_id}") 814 return cached_data.get('thread_data') 815 except Exception as e: 816 logger.warning(f"Error loading cached thread context: {e}") 817 return None 818 819def save_cached_thread_context(conversation_id: str, thread_data: Dict): 820 """Save thread context to cache.""" 821 try: 822 X_CACHE_DIR.mkdir(exist_ok=True) 823 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 824 825 cache_data = { 826 'conversation_id': conversation_id, 827 'thread_data': thread_data, 828 'cached_at': datetime.now().isoformat() 829 } 830 831 with open(cache_file, 'w') as f: 832 json.dump(cache_data, f, indent=2) 833 834 logger.debug(f"Cached thread context for {conversation_id}") 835 except Exception as e: 836 logger.error(f"Error caching thread context: {e}") 837 838def get_cached_tweets(tweet_ids: List[str]) -> Dict[str, Dict]: 839 """ 840 Load cached individual tweets if available. 841 Returns dict mapping tweet_id -> tweet_data for found tweets. 842 """ 843 cached_tweets = {} 844 845 for tweet_id in tweet_ids: 846 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 847 if cache_file.exists(): 848 try: 849 with open(cache_file, 'r') as f: 850 cached_data = json.load(f) 851 852 # Use longer cache times for older tweets (24 hours vs 1 hour) 853 from datetime import datetime, timedelta 854 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 855 tweet_created = cached_data.get('tweet_data', {}).get('created_at', '') 856 857 # Parse tweet creation time to determine age 858 try: 859 from dateutil.parser import parse 860 tweet_age = datetime.now() - parse(tweet_created) 861 cache_duration = timedelta(hours=24) if tweet_age > timedelta(hours=24) else timedelta(hours=1) 862 except: 863 cache_duration = timedelta(hours=1) # Default to 1 hour if parsing fails 864 865 if datetime.now() - cached_time < cache_duration: 866 cached_tweets[tweet_id] = cached_data.get('tweet_data') 867 logger.debug(f"Using cached tweet {tweet_id}") 868 869 except Exception as e: 870 logger.warning(f"Error loading cached tweet {tweet_id}: {e}") 871 872 return cached_tweets 873 874def save_cached_tweets(tweets_data: List[Dict], users_data: Dict[str, Dict] = None): 875 """Save individual tweets to cache for future reuse.""" 876 try: 877 X_CACHE_DIR.mkdir(exist_ok=True) 878 879 for tweet in tweets_data: 880 tweet_id = tweet.get('id') 881 if not tweet_id: 882 continue 883 884 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 885 886 # Include user data if available 887 tweet_with_user = tweet.copy() 888 if users_data and tweet.get('author_id') in users_data: 889 tweet_with_user['author_info'] = users_data[tweet.get('author_id')] 890 891 cache_data = { 892 'tweet_id': tweet_id, 893 'tweet_data': tweet_with_user, 894 'cached_at': datetime.now().isoformat() 895 } 896 897 with open(cache_file, 'w') as f: 898 json.dump(cache_data, f, indent=2) 899 900 logger.debug(f"Cached individual tweet {tweet_id}") 901 902 except Exception as e: 903 logger.error(f"Error caching individual tweets: {e}") 904 905def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool: 906 """ 907 Determine if we have sufficient context to skip backfilling missing tweets. 908 909 Args: 910 tweets: List of tweets already in the thread 911 missing_tweet_ids: Set of missing tweet IDs we'd like to fetch 912 913 Returns: 914 True if context is sufficient, False if backfill is needed 915 """ 916 # If no missing tweets, context is sufficient 917 if not missing_tweet_ids: 918 return True 919 920 # If we have a substantial conversation (5+ tweets), likely sufficient 921 if len(tweets) >= 5: 922 logger.debug(f"Thread has {len(tweets)} tweets, considering sufficient") 923 return True 924 925 # If only a few missing tweets and we have some context, might be enough 926 if len(missing_tweet_ids) <= 2 and len(tweets) >= 3: 927 logger.debug(f"Only {len(missing_tweet_ids)} missing tweets with {len(tweets)} existing, considering sufficient") 928 return True 929 930 # Check if we have conversational flow (mentions between users) 931 has_conversation_flow = False 932 for tweet in tweets: 933 text = tweet.get('text', '').lower() 934 # Look for mentions, replies, or conversational indicators 935 if '@' in text or 'reply' in text or len([t for t in tweets if t.get('author_id') != tweet.get('author_id')]) > 1: 936 has_conversation_flow = True 937 break 938 939 # If we have clear conversational flow and reasonable length, sufficient 940 if has_conversation_flow and len(tweets) >= 2: 941 logger.debug("Thread has conversational flow, considering sufficient") 942 return True 943 944 # Otherwise, we need to backfill 945 logger.debug(f"Context insufficient: {len(tweets)} tweets, {len(missing_tweet_ids)} missing, no clear flow") 946 return False 947 948def fetch_and_queue_mentions(username: str) -> int: 949 """ 950 Single-pass function to fetch new mentions and queue them. 951 Returns number of new mentions found. 952 """ 953 try: 954 client = create_x_client() 955 956 # Load last seen ID for incremental fetching 957 last_seen_id = load_last_seen_id() 958 959 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}") 960 961 # Search for mentions 962 mentions = client.search_mentions( 963 username=username, 964 since_id=last_seen_id, 965 max_results=100 # Get as many as possible 966 ) 967 968 if not mentions: 969 logger.info("No new mentions found") 970 return 0 971 972 # Process mentions (newest first, so reverse to process oldest first) 973 mentions.reverse() 974 new_count = 0 975 976 for mention in mentions: 977 save_mention_to_queue(mention) 978 new_count += 1 979 980 # Update last seen ID to the most recent mention 981 if mentions: 982 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent 983 save_last_seen_id(most_recent_id) 984 985 logger.info(f"Queued {new_count} new X mentions") 986 return new_count 987 988 except Exception as e: 989 logger.error(f"Error fetching and queuing mentions: {e}") 990 return 0 991 992# Simple test function 993def get_my_user_info(): 994 """Get the authenticated user's information to find correct user ID.""" 995 try: 996 client = create_x_client() 997 998 # Use the /2/users/me endpoint to get authenticated user info 999 endpoint = "/users/me" 1000 params = { 1001 "user.fields": "id,name,username,description" 1002 } 1003 1004 print("Fetching authenticated user information...") 1005 response = client._make_request(endpoint, params=params) 1006 1007 if response and "data" in response: 1008 user_data = response["data"] 1009 print(f"✅ Found authenticated user:") 1010 print(f" ID: {user_data.get('id')}") 1011 print(f" Username: @{user_data.get('username')}") 1012 print(f" Name: {user_data.get('name')}") 1013 print(f" Description: {user_data.get('description', 'N/A')[:100]}...") 1014 print(f"\n🔧 Update your config.yaml with:") 1015 print(f" user_id: \"{user_data.get('id')}\"") 1016 return user_data 1017 else: 1018 print("❌ Failed to get user information") 1019 print(f"Response: {response}") 1020 return None 1021 1022 except Exception as e: 1023 print(f"Error getting user info: {e}") 1024 return None 1025 1026def test_search_mentions(): 1027 """Test the search-based mention detection.""" 1028 try: 1029 client = create_x_client() 1030 1031 # First get our username 1032 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 1033 if not user_info or "data" not in user_info: 1034 print("❌ Could not get username") 1035 return 1036 1037 username = user_info["data"]["username"] 1038 print(f"🔍 Searching for mentions of @{username}") 1039 1040 mentions = client.search_mentions(username, max_results=5) 1041 1042 if mentions: 1043 print(f"✅ Found {len(mentions)} mentions via search:") 1044 for mention in mentions: 1045 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...") 1046 else: 1047 print("No mentions found via search") 1048 1049 except Exception as e: 1050 print(f"Search test failed: {e}") 1051 1052def test_fetch_and_queue(): 1053 """Test the single-pass fetch and queue function.""" 1054 try: 1055 client = create_x_client() 1056 1057 # Get our username 1058 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 1059 if not user_info or "data" not in user_info: 1060 print("❌ Could not get username") 1061 return 1062 1063 username = user_info["data"]["username"] 1064 print(f"🔄 Fetching and queueing mentions for @{username}") 1065 1066 # Show current state 1067 last_seen = load_last_seen_id() 1068 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}") 1069 1070 # Fetch and queue 1071 new_count = fetch_and_queue_mentions(username) 1072 1073 if new_count > 0: 1074 print(f"✅ Queued {new_count} new mentions") 1075 print(f"📁 Check ./x_queue/ directory for queued mentions") 1076 1077 # Show updated state 1078 new_last_seen = load_last_seen_id() 1079 print(f"📍 Updated last seen ID: {new_last_seen}") 1080 else: 1081 print("ℹ️ No new mentions to queue") 1082 1083 except Exception as e: 1084 print(f"Fetch and queue test failed: {e}") 1085 1086def test_thread_context(): 1087 """Test thread context retrieval from a queued mention.""" 1088 try: 1089 import json 1090 1091 # Find a queued mention file 1092 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1093 if not queue_files: 1094 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1095 return 1096 1097 # Read the first mention 1098 mention_file = queue_files[0] 1099 with open(mention_file, 'r') as f: 1100 mention_data = json.load(f) 1101 1102 mention = mention_data['mention'] 1103 print(f"📄 Using mention: {mention.get('id')}") 1104 print(f"📝 Text: {mention.get('text')}") 1105 1106 # Check if it has a conversation_id 1107 conversation_id = mention.get('conversation_id') 1108 if not conversation_id: 1109 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.") 1110 return 1111 1112 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1113 1114 # Get thread context 1115 client = create_x_client() 1116 thread_data = client.get_thread_context(conversation_id) 1117 1118 if thread_data: 1119 tweets = thread_data.get('tweets', []) 1120 print(f"✅ Retrieved thread with {len(tweets)} tweets") 1121 1122 # Convert to YAML 1123 yaml_thread = thread_to_yaml_string(thread_data) 1124 1125 # Save thread context for inspection 1126 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml" 1127 with open(thread_file, 'w') as f: 1128 f.write(yaml_thread) 1129 1130 print(f"💾 Saved thread context to: {thread_file}") 1131 print("\n📋 Thread preview:") 1132 print(yaml_thread) 1133 else: 1134 print("❌ Failed to retrieve thread context") 1135 1136 except Exception as e: 1137 print(f"Thread context test failed: {e}") 1138 1139def test_letta_integration(agent_id: str = None): 1140 """Test sending X thread context to Letta agent.""" 1141 try: 1142 from letta_client import Letta 1143 import json 1144 import yaml 1145 1146 # Load full config to access letta section 1147 try: 1148 with open("config.yaml", 'r') as f: 1149 full_config = yaml.safe_load(f) 1150 1151 letta_config = full_config.get('letta', {}) 1152 api_key = letta_config.get('api_key') 1153 config_agent_id = letta_config.get('agent_id') 1154 1155 # Use agent_id from config if not provided as parameter 1156 if not agent_id: 1157 if config_agent_id: 1158 agent_id = config_agent_id 1159 print(f"ℹ️ Using agent_id from config: {agent_id}") 1160 else: 1161 print("❌ No agent_id found in config.yaml") 1162 print("Expected config structure:") 1163 print(" letta:") 1164 print(" agent_id: your-agent-id") 1165 return 1166 else: 1167 print(f"ℹ️ Using provided agent_id: {agent_id}") 1168 1169 if not api_key: 1170 # Try loading from environment as fallback 1171 import os 1172 api_key = os.getenv('LETTA_API_KEY') 1173 if not api_key: 1174 print("❌ LETTA_API_KEY not found in config.yaml or environment") 1175 print("Expected config structure:") 1176 print(" letta:") 1177 print(" api_key: your-letta-api-key") 1178 return 1179 else: 1180 print("ℹ️ Using LETTA_API_KEY from environment") 1181 else: 1182 print("ℹ️ Using LETTA_API_KEY from config.yaml") 1183 1184 except Exception as e: 1185 print(f"❌ Error loading config: {e}") 1186 return 1187 1188 letta_client = Letta(token=api_key, timeout=600) 1189 print(f"🤖 Connected to Letta, using agent: {agent_id}") 1190 1191 # Find a queued mention file 1192 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1193 if not queue_files: 1194 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1195 return 1196 1197 # Read the first mention 1198 mention_file = queue_files[0] 1199 with open(mention_file, 'r') as f: 1200 mention_data = json.load(f) 1201 1202 mention = mention_data['mention'] 1203 conversation_id = mention.get('conversation_id') 1204 1205 if not conversation_id: 1206 print("❌ No conversation_id found in mention.") 1207 return 1208 1209 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1210 1211 # Get thread context 1212 x_client = create_x_client() 1213 thread_data = x_client.get_thread_context(conversation_id) 1214 1215 if not thread_data: 1216 print("❌ Failed to retrieve thread context") 1217 return 1218 1219 # Convert to YAML 1220 yaml_thread = thread_to_yaml_string(thread_data) 1221 1222 # Create prompt for the agent 1223 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately. 1224 1225Here is the thread context: 1226 1227{yaml_thread} 1228 1229Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona.""" 1230 1231 prompt_char_count = len(prompt) 1232 print(f"📤 Sending thread context to Letta agent... | prompt: {prompt_char_count} chars") 1233 1234 # Print the prompt in a rich panel 1235 rprint(Panel(prompt, title=f"Prompt ({prompt_char_count} chars)", border_style="blue")) 1236 1237 # Send to Letta agent using streaming 1238 message_stream = letta_client.agents.messages.create_stream( 1239 agent_id=agent_id, 1240 messages=[{"role": "user", "content": prompt}], 1241 stream_tokens=False, 1242 max_steps=10 1243 ) 1244 1245 print("🔄 Streaming response from agent...") 1246 response_text = "" 1247 1248 for chunk in message_stream: 1249 print(chunk) 1250 if hasattr(chunk, 'message_type'): 1251 if chunk.message_type == 'assistant_message': 1252 print(f"🤖 Agent response: {chunk.content}") 1253 response_text = chunk.content 1254 elif chunk.message_type == 'reasoning_message': 1255 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...") 1256 elif chunk.message_type == 'tool_call_message': 1257 print(f"🔧 Agent tool call: {chunk.tool_call.name}") 1258 1259 if response_text: 1260 print(f"\n✅ Agent generated response:") 1261 print(f"📝 Response: {response_text}") 1262 else: 1263 print("❌ No response generated by agent") 1264 1265 except Exception as e: 1266 print(f"Letta integration test failed: {e}") 1267 import traceback 1268 traceback.print_exc() 1269 1270def test_x_client(): 1271 """Test the X client by fetching mentions.""" 1272 try: 1273 client = create_x_client() 1274 mentions = client.get_mentions(max_results=5) 1275 1276 if mentions: 1277 print(f"Successfully retrieved {len(mentions)} mentions:") 1278 for mention in mentions: 1279 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...") 1280 else: 1281 print("No mentions retrieved") 1282 1283 except Exception as e: 1284 print(f"Test failed: {e}") 1285 1286def reply_to_cameron_post(): 1287 """ 1288 Reply to Cameron's specific X post. 1289 1290 NOTE: This requires OAuth User Context authentication, not Bearer token. 1291 Current Bearer token is Application-Only which can't post. 1292 """ 1293 try: 1294 client = create_x_client() 1295 1296 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618 1297 cameron_post_id = "1950690566909710618" 1298 1299 # Simple reply message 1300 reply_text = "Hello from void! 🤖 Testing X integration." 1301 1302 print(f"Attempting to reply to post {cameron_post_id}") 1303 print(f"Reply text: {reply_text}") 1304 print("\nNOTE: This will fail with current Bearer token (Application-Only)") 1305 print("Posting requires OAuth User Context authentication") 1306 1307 result = client.post_reply(reply_text, cameron_post_id) 1308 1309 if result: 1310 print(f"✅ Successfully posted reply!") 1311 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}") 1312 else: 1313 print("❌ Failed to post reply (expected with current auth)") 1314 1315 except Exception as e: 1316 print(f"Reply failed: {e}") 1317 1318def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False): 1319 """ 1320 Process an X mention and generate a reply using the Letta agent. 1321 Similar to bsky.py process_mention but for X/Twitter. 1322 1323 Args: 1324 void_agent: The Letta agent instance 1325 x_client: The X API client 1326 mention_data: The mention data dictionary 1327 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 1328 testing_mode: If True, don't actually post to X 1329 1330 Returns: 1331 True: Successfully processed, remove from queue 1332 False: Failed but retryable, keep in queue 1333 None: Failed with non-retryable error, move to errors directory 1334 "no_reply": No reply was generated, move to no_reply directory 1335 """ 1336 try: 1337 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}") 1338 1339 # Extract mention details 1340 if isinstance(mention_data, dict): 1341 # Handle both raw mention and queued mention formats 1342 if 'mention' in mention_data: 1343 mention = mention_data['mention'] 1344 else: 1345 mention = mention_data 1346 else: 1347 mention = mention_data 1348 1349 mention_id = mention.get('id') 1350 mention_text = mention.get('text', '') 1351 author_id = mention.get('author_id') 1352 conversation_id = mention.get('conversation_id') 1353 in_reply_to_user_id = mention.get('in_reply_to_user_id') 1354 referenced_tweets = mention.get('referenced_tweets', []) 1355 1356 # Check downrank list - only respond to downranked users 10% of the time 1357 downrank_users = load_downrank_users() 1358 if not should_respond_to_downranked_user(str(author_id), downrank_users): 1359 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection") 1360 return "no_reply" 1361 1362 # Enhanced conversation tracking for debug - especially important for Grok handling 1363 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}") 1364 logger.info(f" Author ID: {author_id}") 1365 logger.info(f" Conversation ID: {conversation_id}") 1366 logger.info(f" In Reply To User ID: {in_reply_to_user_id}") 1367 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items") 1368 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets 1369 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}") 1370 logger.info(f" Text preview: {mention_text[:100]}...") 1371 1372 if not conversation_id: 1373 logger.warning(f"❌ No conversation_id found for mention {mention_id} - this may cause thread context issues") 1374 return None 1375 1376 # Get thread context with caching enabled for efficiency 1377 # Use mention_id as until_id to exclude tweets that occurred after this mention 1378 try: 1379 thread_data = x_client.get_thread_context(conversation_id, use_cache=True, until_id=mention_id) 1380 if not thread_data: 1381 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}") 1382 return False 1383 1384 # If this mention references a specific tweet, ensure we have that tweet in context 1385 if referenced_tweets: 1386 for ref in referenced_tweets: 1387 if ref.get('type') == 'replied_to': 1388 ref_id = ref.get('id') 1389 # Check if the referenced tweet is in our thread data 1390 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])] 1391 if ref_id and ref_id not in thread_tweet_ids: 1392 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch") 1393 try: 1394 # Fetch the missing referenced tweet directly 1395 endpoint = f"/tweets/{ref_id}" 1396 params = { 1397 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 1398 "user.fields": "id,name,username", 1399 "expansions": "author_id" 1400 } 1401 response = x_client._make_request(endpoint, params) 1402 if response and "data" in response: 1403 missing_tweet = response["data"] 1404 if missing_tweet.get('conversation_id') == conversation_id: 1405 # Add to thread data 1406 if 'tweets' not in thread_data: 1407 thread_data['tweets'] = [] 1408 thread_data['tweets'].append(missing_tweet) 1409 1410 # Add user data if available 1411 if "includes" in response and "users" in response["includes"]: 1412 if 'users' not in thread_data: 1413 thread_data['users'] = {} 1414 for user in response["includes"]["users"]: 1415 thread_data['users'][user["id"]] = user 1416 1417 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context") 1418 else: 1419 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}") 1420 except Exception as e: 1421 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}") 1422 1423 # Enhanced thread context debugging 1424 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}") 1425 thread_posts = thread_data.get('tweets', []) 1426 thread_users = thread_data.get('users', {}) 1427 logger.info(f" Posts in thread: {len(thread_posts)}") 1428 logger.info(f" Users in thread: {len(thread_users)}") 1429 1430 # Log thread participants for Grok detection 1431 for user_id, user_info in thread_users.items(): 1432 username = user_info.get('username', 'unknown') 1433 name = user_info.get('name', 'Unknown') 1434 is_verified = user_info.get('verified', False) 1435 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}") 1436 1437 # Special logging for Grok or AI-related users 1438 if 'grok' in username.lower() or 'grok' in name.lower(): 1439 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})") 1440 1441 # Log conversation structure 1442 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts 1443 post_id = post.get('id') 1444 post_author = post.get('author_id') 1445 post_text = post.get('text', '')[:50] 1446 is_reply = 'in_reply_to_user_id' in post 1447 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...") 1448 1449 except Exception as e: 1450 logger.error(f"❌ Error getting thread context: {e}") 1451 return False 1452 1453 # Convert to YAML string 1454 thread_context = thread_to_yaml_string(thread_data) 1455 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters") 1456 1457 # Save comprehensive conversation data for debugging 1458 try: 1459 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1460 debug_dir.mkdir(parents=True, exist_ok=True) 1461 1462 # Save raw thread data (JSON) 1463 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f: 1464 json.dump(thread_data, f, indent=2) 1465 1466 # Save YAML thread context 1467 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f: 1468 f.write(thread_context) 1469 1470 # Save mention processing debug info 1471 debug_info = { 1472 'processed_at': datetime.now().isoformat(), 1473 'mention_id': mention_id, 1474 'conversation_id': conversation_id, 1475 'author_id': author_id, 1476 'in_reply_to_user_id': in_reply_to_user_id, 1477 'referenced_tweets': referenced_tweets, 1478 'thread_stats': { 1479 'total_posts': len(thread_posts), 1480 'total_users': len(thread_users), 1481 'yaml_length': len(thread_context) 1482 }, 1483 'users_in_conversation': { 1484 user_id: { 1485 'username': user_info.get('username'), 1486 'name': user_info.get('name'), 1487 'verified': user_info.get('verified', False), 1488 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower() 1489 } 1490 for user_id, user_info in thread_users.items() 1491 } 1492 } 1493 1494 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f: 1495 json.dump(debug_info, f, indent=2) 1496 1497 logger.info(f"💾 Saved conversation debug data to: {debug_dir}") 1498 1499 except Exception as debug_error: 1500 logger.warning(f"Failed to save debug data: {debug_error}") 1501 # Continue processing even if debug save fails 1502 1503 # Check for #voidstop 1504 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower(): 1505 logger.info("Found #voidstop, skipping this mention") 1506 return True 1507 1508 # Ensure X user blocks are attached 1509 try: 1510 ensure_x_user_blocks_attached(thread_data, void_agent.id) 1511 except Exception as e: 1512 logger.warning(f"Failed to ensure X user blocks: {e}") 1513 # Continue without user blocks rather than failing completely 1514 1515 # Create prompt for Letta agent 1516 author_info = thread_data.get('users', {}).get(author_id, {}) 1517 author_username = author_info.get('username', 'unknown') 1518 author_name = author_info.get('name', author_username) 1519 1520 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}). 1521 1522MOST RECENT POST (the mention you're responding to): 1523"{mention_text}" 1524 1525FULL THREAD CONTEXT: 1526```yaml 1527{thread_context} 1528``` 1529 1530The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 1531 1532If you need to update user information, use the x_user_* tools. 1533 1534To reply, use the add_post_to_x_thread tool: 1535- Each call creates one post (max 280 characters) 1536- For most responses, a single call is sufficient 1537- Only use multiple calls for threaded replies when: 1538 * The topic requires extended explanation that cannot fit in 280 characters 1539 * You're explicitly asked for a detailed/long response 1540 * The conversation naturally benefits from a structured multi-part answer 1541- Avoid unnecessary threads - be concise when possible""" 1542 1543 # Log mention processing 1544 title = f"X MENTION FROM @{author_username}" 1545 print(f"\n{title}") 1546 print(f" {'' * len(title)}") 1547 for line in mention_text.split('\n'): 1548 print(f" {line}") 1549 1550 # Send to Letta agent 1551 from config_loader import get_letta_config 1552 from letta_client import Letta 1553 1554 config = get_letta_config() 1555 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 1556 1557 prompt_char_count = len(prompt) 1558 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars | prompt: {prompt_char_count} chars") 1559 1560 try: 1561 # Use streaming to avoid timeout errors 1562 message_stream = letta_client.agents.messages.create_stream( 1563 agent_id=void_agent.id, 1564 messages=[{"role": "user", "content": prompt}], 1565 stream_tokens=False, 1566 max_steps=100 1567 ) 1568 1569 # Collect streaming response (simplified version of bsky.py logic) 1570 all_messages = [] 1571 for chunk in message_stream: 1572 if hasattr(chunk, 'message_type'): 1573 if chunk.message_type == 'reasoning_message': 1574 print("\n◆ Reasoning") 1575 print(" ─────────") 1576 for line in chunk.reasoning.split('\n'): 1577 print(f" {line}") 1578 elif chunk.message_type == 'tool_call_message': 1579 tool_name = chunk.tool_call.name 1580 if tool_name == 'add_post_to_x_thread': 1581 try: 1582 args = json.loads(chunk.tool_call.arguments) 1583 text = args.get('text', '') 1584 if text: 1585 print("\n✎ X Post") 1586 print(" ────────") 1587 for line in text.split('\n'): 1588 print(f" {line}") 1589 except: 1590 pass 1591 elif tool_name == 'halt_activity': 1592 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT") 1593 if queue_filepath and queue_filepath.exists(): 1594 queue_filepath.unlink() 1595 logger.info(f"Deleted queue file: {queue_filepath.name}") 1596 logger.info("=== X BOT TERMINATED BY AGENT ===") 1597 exit(0) 1598 elif chunk.message_type == 'tool_return_message': 1599 tool_name = chunk.name 1600 status = chunk.status 1601 if status == 'success' and tool_name == 'add_post_to_x_thread': 1602 print("\n✓ X Post Queued") 1603 print(" ──────────────") 1604 print(" Post queued successfully") 1605 elif chunk.message_type == 'assistant_message': 1606 print("\n▶ Assistant Response") 1607 print(" ──────────────────") 1608 for line in chunk.content.split('\n'): 1609 print(f" {line}") 1610 1611 all_messages.append(chunk) 1612 if str(chunk) == 'done': 1613 break 1614 1615 # Convert streaming response for compatibility 1616 message_response = type('StreamingResponse', (), { 1617 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 1618 })() 1619 1620 except Exception as api_error: 1621 logger.error(f"Letta API error: {api_error}") 1622 raise 1623 1624 # Extract successful add_post_to_x_thread tool calls 1625 reply_candidates = [] 1626 tool_call_results = {} 1627 ignored_notification = False 1628 ack_note = None # Track any note from annotate_ack tool 1629 1630 # First pass: collect tool return statuses 1631 for message in message_response.messages: 1632 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 1633 if message.name == 'add_post_to_x_thread': 1634 tool_call_results[message.tool_call_id] = message.status 1635 elif message.name == 'ignore_notification': 1636 if message.status == 'success': 1637 ignored_notification = True 1638 logger.info("🚫 X notification ignored") 1639 1640 # Second pass: collect successful tool calls 1641 for message in message_response.messages: 1642 if hasattr(message, 'tool_call') and message.tool_call: 1643 # Collect annotate_ack tool calls 1644 if message.tool_call.name == 'annotate_ack': 1645 try: 1646 args = json.loads(message.tool_call.arguments) 1647 note = args.get('note', '') 1648 if note: 1649 ack_note = note 1650 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 1651 except json.JSONDecodeError as e: 1652 logger.error(f"Failed to parse annotate_ack arguments: {e}") 1653 1654 # Collect add_post_to_x_thread tool calls - only if they were successful 1655 elif message.tool_call.name == 'add_post_to_x_thread': 1656 tool_call_id = message.tool_call.tool_call_id 1657 tool_status = tool_call_results.get(tool_call_id, 'unknown') 1658 1659 if tool_status == 'success': 1660 try: 1661 args = json.loads(message.tool_call.arguments) 1662 reply_text = args.get('text', '') 1663 if reply_text: 1664 reply_candidates.append(reply_text) 1665 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...") 1666 except json.JSONDecodeError as e: 1667 logger.error(f"Failed to parse tool call arguments: {e}") 1668 1669 # Save agent response data to debug folder 1670 try: 1671 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1672 1673 # Save complete agent interaction 1674 agent_response_data = { 1675 'processed_at': datetime.now().isoformat(), 1676 'mention_id': mention_id, 1677 'conversation_id': conversation_id, 1678 'prompt_sent': prompt, 1679 'reply_candidates': reply_candidates, 1680 'ignored_notification': ignored_notification, 1681 'ack_note': ack_note, 1682 'tool_call_results': tool_call_results, 1683 'all_messages': [] 1684 } 1685 1686 # Convert messages to serializable format 1687 for message in message_response.messages: 1688 msg_data = { 1689 'message_type': getattr(message, 'message_type', 'unknown'), 1690 'content': getattr(message, 'content', ''), 1691 'reasoning': getattr(message, 'reasoning', ''), 1692 'status': getattr(message, 'status', ''), 1693 'name': getattr(message, 'name', ''), 1694 } 1695 1696 if hasattr(message, 'tool_call') and message.tool_call: 1697 msg_data['tool_call'] = { 1698 'name': message.tool_call.name, 1699 'arguments': message.tool_call.arguments, 1700 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '') 1701 } 1702 1703 agent_response_data['all_messages'].append(msg_data) 1704 1705 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f: 1706 json.dump(agent_response_data, f, indent=2) 1707 1708 logger.info(f"💾 Saved agent response debug data") 1709 1710 except Exception as debug_error: 1711 logger.warning(f"Failed to save agent response debug data: {debug_error}") 1712 1713 # Handle conflicts 1714 if reply_candidates and ignored_notification: 1715 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!") 1716 return False 1717 1718 if reply_candidates: 1719 # Post replies to X 1720 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X") 1721 1722 if len(reply_candidates) == 1: 1723 content = reply_candidates[0] 1724 title = f"Reply to @{author_username}" 1725 else: 1726 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)]) 1727 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)" 1728 1729 print(f"\n{title}") 1730 print(f" {'' * len(title)}") 1731 for line in content.split('\n'): 1732 print(f" {line}") 1733 1734 if testing_mode: 1735 logger.info("TESTING MODE: Skipping actual X post") 1736 return True 1737 else: 1738 # Post to X using thread approach 1739 success = post_x_thread_replies(x_client, mention_id, reply_candidates) 1740 if success: 1741 logger.info(f"Successfully replied to @{author_username} on X") 1742 1743 # Acknowledge the post we're replying to 1744 try: 1745 ack_result = acknowledge_x_post(x_client, mention_id, ack_note) 1746 if ack_result: 1747 if ack_note: 1748 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")") 1749 else: 1750 logger.info(f"Successfully acknowledged X post from @{author_username}") 1751 else: 1752 logger.warning(f"Failed to acknowledge X post from @{author_username}") 1753 except Exception as e: 1754 logger.error(f"Error acknowledging X post from @{author_username}: {e}") 1755 # Don't fail the entire operation if acknowledgment fails 1756 1757 return True 1758 else: 1759 logger.error(f"Failed to send reply to @{author_username} on X") 1760 return False 1761 else: 1762 if ignored_notification: 1763 logger.info(f"X mention from @{author_username} was explicitly ignored") 1764 return "ignored" 1765 else: 1766 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass") 1767 return False # Keep in queue for retry instead of removing 1768 1769 except Exception as e: 1770 logger.error(f"Error processing X mention: {e}") 1771 return False 1772 1773def acknowledge_x_post(x_client, post_id, note=None): 1774 """ 1775 Acknowledge an X post that we replied to. 1776 Uses the same Bluesky client and uploads to the void data repository on atproto, 1777 just like Bluesky acknowledgments. 1778 1779 Args: 1780 x_client: XClient instance (not used, kept for compatibility) 1781 post_id: The X post ID we're acknowledging 1782 note: Optional note to include with the acknowledgment 1783 1784 Returns: 1785 True if successful, False otherwise 1786 """ 1787 try: 1788 # Use Bluesky client to upload acks to the void data repository on atproto 1789 bsky_client = bsky_utils.default_login() 1790 1791 # Create a synthetic URI and CID for the X post 1792 # X posts don't have atproto URIs/CIDs, so we create identifiers 1793 post_uri = f"x://twitter.com/post/{post_id}" 1794 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts 1795 1796 # Use the same acknowledge_post function as Bluesky 1797 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note) 1798 1799 if ack_result: 1800 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else "")) 1801 return True 1802 else: 1803 logger.error(f"Failed to acknowledge X post {post_id}") 1804 return False 1805 1806 except Exception as e: 1807 logger.error(f"Error acknowledging X post {post_id}: {e}") 1808 return False 1809 1810def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages): 1811 """ 1812 Post a series of replies to X, threading them properly. 1813 1814 Args: 1815 x_client: XClient instance 1816 in_reply_to_tweet_id: The original tweet ID to reply to 1817 reply_messages: List of reply text strings 1818 1819 Returns: 1820 True if successful, False otherwise 1821 """ 1822 try: 1823 current_reply_id = in_reply_to_tweet_id 1824 1825 for i, reply_text in enumerate(reply_messages): 1826 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...") 1827 1828 result = x_client.post_reply(reply_text, current_reply_id) 1829 1830 if result and 'data' in result: 1831 new_tweet_id = result['data']['id'] 1832 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}") 1833 # For threading, the next reply should reply to this one 1834 current_reply_id = new_tweet_id 1835 else: 1836 logger.error(f"Failed to post X reply {i+1}") 1837 return False 1838 1839 return True 1840 1841 except Exception as e: 1842 logger.error(f"Error posting X thread replies: {e}") 1843 return False 1844 1845def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False): 1846 """ 1847 Load and process all X mentions from the queue. 1848 Similar to bsky.py load_and_process_queued_notifications but for X. 1849 """ 1850 try: 1851 # Get all X mention files in queue directory 1852 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1853 1854 if not queue_files: 1855 return 1856 1857 # Load file metadata and sort by creation time (chronological order) 1858 file_metadata = [] 1859 for filepath in queue_files: 1860 try: 1861 with open(filepath, 'r') as f: 1862 queue_data = json.load(f) 1863 mention_data = queue_data.get('mention', queue_data) 1864 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing 1865 file_metadata.append((created_at, filepath)) 1866 except Exception as e: 1867 logger.warning(f"Error reading queue file {filepath.name}: {e}") 1868 # Add with default timestamp so it still gets processed 1869 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath)) 1870 1871 # Sort by creation time (oldest first) 1872 file_metadata.sort(key=lambda x: x[0]) 1873 1874 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order") 1875 1876 for i, (created_at, filepath) in enumerate(file_metadata, 1): 1877 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})") 1878 1879 try: 1880 # Load mention data 1881 with open(filepath, 'r') as f: 1882 queue_data = json.load(f) 1883 1884 mention_data = queue_data.get('mention', queue_data) 1885 1886 # Process the mention 1887 success = process_x_mention(void_agent, x_client, mention_data, 1888 queue_filepath=filepath, testing_mode=testing_mode) 1889 1890 except XRateLimitError: 1891 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning") 1892 break 1893 1894 except Exception as e: 1895 logger.error(f"Error processing X queue file {filepath.name}: {e}") 1896 continue 1897 1898 # Handle file based on processing result 1899 if success: 1900 if testing_mode: 1901 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}") 1902 else: 1903 filepath.unlink() 1904 logger.info(f"Successfully processed and removed X file: {filepath.name}") 1905 1906 # Mark as processed 1907 processed_mentions = load_processed_mentions() 1908 processed_mentions.add(mention_data.get('id')) 1909 save_processed_mentions(processed_mentions) 1910 1911 elif success is None: # Move to error directory 1912 error_dir = X_QUEUE_DIR / "errors" 1913 error_dir.mkdir(exist_ok=True) 1914 error_path = error_dir / filepath.name 1915 filepath.rename(error_path) 1916 logger.warning(f"Moved X file {filepath.name} to errors directory") 1917 1918 elif success == "no_reply": # Move to no_reply directory 1919 no_reply_dir = X_QUEUE_DIR / "no_reply" 1920 no_reply_dir.mkdir(exist_ok=True) 1921 no_reply_path = no_reply_dir / filepath.name 1922 filepath.rename(no_reply_path) 1923 logger.info(f"Moved X file {filepath.name} to no_reply directory") 1924 1925 elif success == "ignored": # Delete ignored notifications 1926 filepath.unlink() 1927 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}") 1928 1929 else: 1930 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry") 1931 1932 except Exception as e: 1933 logger.error(f"Error loading queued X mentions: {e}") 1934 1935def process_x_notifications(void_agent, x_client, testing_mode=False): 1936 """ 1937 Fetch new X mentions, queue them, and process the queue. 1938 Similar to bsky.py process_notifications but for X. 1939 """ 1940 try: 1941 # Get username for fetching mentions 1942 user_info = x_client._make_request("/users/me", params={"user.fields": "username"}) 1943 if not user_info or "data" not in user_info: 1944 logger.error("Could not get username for X mentions") 1945 return 1946 1947 username = user_info["data"]["username"] 1948 1949 # Fetch and queue new mentions 1950 new_count = fetch_and_queue_mentions(username) 1951 1952 if new_count > 0: 1953 logger.info(f"Found {new_count} new X mentions to process") 1954 1955 # Process the entire queue 1956 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 1957 1958 except Exception as e: 1959 logger.error(f"Error processing X notifications: {e}") 1960 1961def periodic_user_block_cleanup(client, agent_id: str) -> None: 1962 """ 1963 Detach all user blocks from the agent to prevent memory bloat. 1964 This should be called periodically to ensure clean state. 1965 """ 1966 try: 1967 # Get all blocks attached to the agent 1968 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 1969 1970 user_blocks_to_detach = [] 1971 for block in attached_blocks: 1972 if hasattr(block, 'label') and block.label.startswith('user_'): 1973 user_blocks_to_detach.append({ 1974 'label': block.label, 1975 'id': block.id 1976 }) 1977 1978 if not user_blocks_to_detach: 1979 logger.debug("No user blocks found to detach during periodic cleanup") 1980 return 1981 1982 logger.info(f"Found {len(user_blocks_to_detach)} user blocks to detach") 1983 1984 # Detach each user block 1985 for block in user_blocks_to_detach: 1986 try: 1987 client.agents.blocks.detach( 1988 agent_id=agent_id, 1989 block_id=block['id'] 1990 ) 1991 logger.debug(f"Detached user block: {block['label']}") 1992 except Exception as e: 1993 logger.error(f"Failed to detach user block {block['label']}: {e}") 1994 1995 logger.info(f"Periodic cleanup complete: detached {len(user_blocks_to_detach)} user blocks") 1996 1997 except Exception as e: 1998 logger.error(f"Error during periodic user block cleanup: {e}") 1999 2000def initialize_x_void(): 2001 """Initialize the void agent for X operations.""" 2002 logger.info("Starting void agent initialization for X...") 2003 2004 from config_loader import get_letta_config 2005 from letta_client import Letta 2006 2007 # Get config 2008 config = get_letta_config() 2009 client = Letta(token=config['api_key'], timeout=config['timeout']) 2010 agent_id = config['agent_id'] 2011 2012 try: 2013 void_agent = client.agents.retrieve(agent_id=agent_id) 2014 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})") 2015 except Exception as e: 2016 logger.error(f"Failed to load void agent {agent_id}: {e}") 2017 raise e 2018 2019 # Clean up all user blocks at startup 2020 logger.info("🧹 Cleaning up user blocks at X startup...") 2021 periodic_user_block_cleanup(client, agent_id) 2022 2023 # Ensure correct tools are attached for X 2024 logger.info("Configuring tools for X platform...") 2025 try: 2026 from tool_manager import ensure_platform_tools 2027 ensure_platform_tools('x', void_agent.id) 2028 except Exception as e: 2029 logger.error(f"Failed to configure platform tools: {e}") 2030 logger.warning("Continuing with existing tool configuration") 2031 2032 # Log agent details 2033 logger.info(f"X Void agent details - ID: {void_agent.id}") 2034 logger.info(f"Agent name: {void_agent.name}") 2035 2036 return void_agent 2037 2038def x_main_loop(testing_mode=False, cleanup_interval=10): 2039 """ 2040 Main X bot loop that continuously monitors for mentions and processes them. 2041 Similar to bsky.py main() but for X/Twitter. 2042 2043 Args: 2044 testing_mode: If True, don't actually post to X 2045 cleanup_interval: Run user block cleanup every N cycles (0 to disable) 2046 """ 2047 import time 2048 from time import sleep 2049 from config_loader import get_letta_config 2050 from letta_client import Letta 2051 2052 logger.info("=== STARTING X VOID BOT ===") 2053 2054 # Initialize void agent 2055 void_agent = initialize_x_void() 2056 logger.info(f"X void agent initialized: {void_agent.id}") 2057 2058 # Initialize X client 2059 x_client = create_x_client() 2060 logger.info("Connected to X API") 2061 2062 # Get Letta client for periodic cleanup 2063 config = get_letta_config() 2064 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 2065 2066 # Main loop 2067 FETCH_DELAY_SEC = 120 # Check every 2 minutes for X mentions (reduced from 60s to conserve API calls) 2068 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds") 2069 2070 if testing_mode: 2071 logger.info("=== RUNNING IN X TESTING MODE ===") 2072 logger.info(" - No messages will be sent to X") 2073 logger.info(" - Queue files will not be deleted") 2074 2075 if cleanup_interval > 0: 2076 logger.info(f"User block cleanup enabled every {cleanup_interval} cycles") 2077 else: 2078 logger.info("User block cleanup disabled") 2079 2080 cycle_count = 0 2081 start_time = time.time() 2082 2083 while True: 2084 try: 2085 cycle_count += 1 2086 logger.info(f"=== X CYCLE {cycle_count} ===") 2087 2088 # Process X notifications (fetch, queue, and process) 2089 process_x_notifications(void_agent, x_client, testing_mode) 2090 2091 # Run periodic cleanup every N cycles 2092 if cleanup_interval > 0 and cycle_count % cleanup_interval == 0: 2093 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 2094 periodic_user_block_cleanup(letta_client, void_agent.id) 2095 2096 # Log cycle completion 2097 elapsed_time = time.time() - start_time 2098 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes") 2099 2100 sleep(FETCH_DELAY_SEC) 2101 2102 except KeyboardInterrupt: 2103 elapsed_time = time.time() - start_time 2104 logger.info("=== X BOT STOPPED BY USER ===") 2105 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes") 2106 break 2107 except Exception as e: 2108 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===") 2109 logger.error(f"Error details: {e}") 2110 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...") 2111 sleep(FETCH_DELAY_SEC * 2) 2112 2113def process_queue_only(testing_mode=False): 2114 """ 2115 Process all queued X mentions without fetching new ones. 2116 Useful for rate limit management - queue first, then process separately. 2117 2118 Args: 2119 testing_mode: If True, don't actually post to X and keep queue files 2120 """ 2121 logger.info("=== PROCESSING X QUEUE ONLY ===") 2122 2123 if testing_mode: 2124 logger.info("=== RUNNING IN X TESTING MODE ===") 2125 logger.info(" - No messages will be sent to X") 2126 logger.info(" - Queue files will not be deleted") 2127 2128 try: 2129 # Initialize void agent 2130 void_agent = initialize_x_void() 2131 logger.info(f"X void agent initialized: {void_agent.id}") 2132 2133 # Initialize X client 2134 x_client = create_x_client() 2135 logger.info("Connected to X API") 2136 2137 # Process the queue without fetching new mentions 2138 logger.info("Processing existing X queue...") 2139 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 2140 2141 logger.info("=== X QUEUE PROCESSING COMPLETE ===") 2142 2143 except Exception as e: 2144 logger.error(f"Error processing X queue: {e}") 2145 raise 2146 2147def x_notification_loop(): 2148 """ 2149 DEPRECATED: Old X notification loop using search-based mention detection. 2150 Use x_main_loop() instead for the full bot experience. 2151 """ 2152 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.") 2153 x_main_loop() 2154 2155if __name__ == "__main__": 2156 import sys 2157 import argparse 2158 2159 if len(sys.argv) > 1: 2160 if sys.argv[1] == "bot": 2161 # Main bot with optional --test flag and cleanup interval 2162 parser = argparse.ArgumentParser(description='X Void Bot') 2163 parser.add_argument('command', choices=['bot']) 2164 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)') 2165 parser.add_argument('--cleanup-interval', type=int, default=10, 2166 help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 2167 args = parser.parse_args() 2168 x_main_loop(testing_mode=args.test, cleanup_interval=args.cleanup_interval) 2169 elif sys.argv[1] == "loop": 2170 x_notification_loop() 2171 elif sys.argv[1] == "reply": 2172 reply_to_cameron_post() 2173 elif sys.argv[1] == "me": 2174 get_my_user_info() 2175 elif sys.argv[1] == "search": 2176 test_search_mentions() 2177 elif sys.argv[1] == "queue": 2178 test_fetch_and_queue() 2179 elif sys.argv[1] == "thread": 2180 test_thread_context() 2181 elif sys.argv[1] == "process": 2182 # Process all queued mentions with optional --test flag 2183 testing_mode = "--test" in sys.argv 2184 process_queue_only(testing_mode=testing_mode) 2185 elif sys.argv[1] == "letta": 2186 # Use specific agent ID if provided, otherwise use from config 2187 agent_id = sys.argv[2] if len(sys.argv) > 2 else None 2188 test_letta_integration(agent_id) 2189 elif sys.argv[1] == "downrank": 2190 # View or manage downrank list 2191 if len(sys.argv) > 2 and sys.argv[2] == "list": 2192 downrank_users = load_downrank_users() 2193 if downrank_users: 2194 print(f"📋 Downrank users ({len(downrank_users)} total):") 2195 for user_id in sorted(downrank_users): 2196 print(f" - {user_id}") 2197 else: 2198 print("📋 No downrank users configured") 2199 else: 2200 print("Usage: python x.py downrank list") 2201 print(" list - Show all downranked user IDs") 2202 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list") 2203 else: 2204 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]") 2205 print(" bot - Run the main X bot (use --test for testing mode)") 2206 print(" Example: python x.py bot --test") 2207 print(" queue - Fetch and queue mentions only (no processing)") 2208 print(" process - Process all queued mentions only (no fetching)") 2209 print(" Example: python x.py process --test") 2210 print(" downrank - Manage downrank users (10% response rate)") 2211 print(" Example: python x.py downrank list") 2212 print(" loop - Run the old notification monitoring loop (deprecated)") 2213 print(" reply - Reply to Cameron's specific post") 2214 print(" me - Get authenticated user info and correct user ID") 2215 print(" search - Test search-based mention detection") 2216 print(" thread - Test thread context retrieval from queued mention") 2217 print(" letta - Test sending thread context to Letta agent") 2218 print(" Optional: python x.py letta <agent-id>") 2219 else: 2220 test_x_client()