a digital person for bluesky
1import os 2import logging 3import requests 4import yaml 5import json 6import hashlib 7import random 8import time 9from typing import Optional, Dict, Any, List, Set 10from datetime import datetime 11from pathlib import Path 12from requests_oauthlib import OAuth1 13from rich import print as rprint 14from rich.panel import Panel 15from rich.text import Text 16 17import bsky_utils 18 19class XRateLimitError(Exception): 20 """Exception raised when X API rate limit is exceeded""" 21 pass 22 23 24# Configure logging 25logging.basicConfig( 26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 27) 28logger = logging.getLogger("x_client") 29 30# X-specific file paths 31X_QUEUE_DIR = Path("x_queue") 32X_CACHE_DIR = Path("x_cache") 33X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json") 34X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json") 35X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt") 36 37class XClient: 38 """X (Twitter) API client for fetching mentions and managing interactions.""" 39 40 def __init__(self, api_key: str, user_id: str, access_token: str = None, 41 consumer_key: str = None, consumer_secret: str = None, 42 access_token_secret: str = None): 43 self.api_key = api_key 44 self.access_token = access_token 45 self.user_id = user_id 46 self.base_url = "https://api.x.com/2" 47 48 # Check if we have OAuth 1.0a credentials 49 if (consumer_key and consumer_secret and access_token and access_token_secret): 50 # Use OAuth 1.0a for User Context 51 self.oauth = OAuth1( 52 consumer_key, 53 client_secret=consumer_secret, 54 resource_owner_key=access_token, 55 resource_owner_secret=access_token_secret 56 ) 57 self.headers = {"Content-Type": "application/json"} 58 self.auth_method = "oauth1a" 59 logger.info("Using OAuth 1.0a User Context authentication for X API") 60 elif access_token: 61 # Use OAuth 2.0 Bearer token for User Context 62 self.oauth = None 63 self.headers = { 64 "Authorization": f"Bearer {access_token}", 65 "Content-Type": "application/json" 66 } 67 self.auth_method = "oauth2_user" 68 logger.info("Using OAuth 2.0 User Context access token for X API") 69 else: 70 # Use Application-Only Bearer token 71 self.oauth = None 72 self.headers = { 73 "Authorization": f"Bearer {api_key}", 74 "Content-Type": "application/json" 75 } 76 self.auth_method = "bearer" 77 logger.info("Using Application-Only Bearer token for X API") 78 79 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]: 80 """Make a request to the X API with proper error handling and exponential backoff.""" 81 url = f"{self.base_url}{endpoint}" 82 83 for attempt in range(max_retries): 84 try: 85 if method.upper() == "GET": 86 if self.oauth: 87 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth) 88 else: 89 response = requests.get(url, headers=self.headers, params=params) 90 elif method.upper() == "POST": 91 if self.oauth: 92 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth) 93 else: 94 response = requests.post(url, headers=self.headers, json=data) 95 else: 96 raise ValueError(f"Unsupported HTTP method: {method}") 97 98 response.raise_for_status() 99 return response.json() 100 101 except requests.exceptions.HTTPError as e: 102 if response.status_code == 401: 103 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials") 104 logger.error(f"Response: {response.text}") 105 return None # Don't retry auth failures 106 elif response.status_code == 403: 107 logger.error(f"X API forbidden with {self.auth_method} - check app permissions") 108 logger.error(f"Response: {response.text}") 109 return None # Don't retry permission failures 110 elif response.status_code == 429: 111 if attempt < max_retries - 1: 112 # Exponential backoff: 60s, 120s, 240s 113 backoff_time = 60 * (2 ** attempt) 114 logger.warning(f"X API rate limit exceeded (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry") 115 logger.error(f"Response: {response.text}") 116 time.sleep(backoff_time) 117 continue 118 else: 119 logger.error("X API rate limit exceeded - max retries reached") 120 logger.error(f"Response: {response.text}") 121 raise XRateLimitError("X API rate limit exceeded") 122 else: 123 if attempt < max_retries - 1: 124 # Exponential backoff for other HTTP errors too 125 backoff_time = 30 * (2 ** attempt) 126 logger.warning(f"X API request failed (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 127 logger.error(f"Response: {response.text}") 128 time.sleep(backoff_time) 129 continue 130 else: 131 logger.error(f"X API request failed after {max_retries} attempts: {e}") 132 logger.error(f"Response: {response.text}") 133 return None 134 135 except Exception as e: 136 if attempt < max_retries - 1: 137 backoff_time = 15 * (2 ** attempt) 138 logger.warning(f"Unexpected error making X API request (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 139 time.sleep(backoff_time) 140 continue 141 else: 142 logger.error(f"Unexpected error making X API request after {max_retries} attempts: {e}") 143 return None 144 145 return None 146 147 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]: 148 """ 149 Fetch mentions for the configured user. 150 151 Args: 152 since_id: Minimum Post ID to include (for getting newer mentions) 153 max_results: Number of results to return (5-100) 154 155 Returns: 156 List of mention objects or None if request failed 157 """ 158 endpoint = f"/users/{self.user_id}/mentions" 159 params = { 160 "max_results": min(max(max_results, 5), 100), # Ensure within API limits 161 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets", 162 "user.fields": "id,name,username", 163 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 164 } 165 166 if since_id: 167 params["since_id"] = since_id 168 169 logger.info(f"Fetching mentions for user {self.user_id}") 170 response = self._make_request(endpoint, params) 171 172 if response: 173 logger.debug(f"X API response: {response}") 174 175 if response and "data" in response: 176 mentions = response["data"] 177 logger.info(f"Retrieved {len(mentions)} mentions") 178 return mentions 179 else: 180 if response: 181 logger.info(f"No mentions in response. Full response: {response}") 182 else: 183 logger.warning("Request failed - no response received") 184 return [] 185 186 def get_user_info(self, user_id: str) -> Optional[Dict]: 187 """Get information about a specific user.""" 188 endpoint = f"/users/{user_id}" 189 params = { 190 "user.fields": "id,name,username,description,public_metrics" 191 } 192 193 response = self._make_request(endpoint, params) 194 return response.get("data") if response else None 195 196 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]: 197 """ 198 Search for mentions using the search endpoint instead of mentions endpoint. 199 This might have better rate limits than the direct mentions endpoint. 200 201 Args: 202 username: Username to search for mentions of (without @) 203 max_results: Number of results to return (10-100) 204 since_id: Only return results newer than this tweet ID 205 206 Returns: 207 List of tweets mentioning the username 208 """ 209 endpoint = "/tweets/search/recent" 210 211 # Search for mentions of the username 212 query = f"@{username}" 213 214 params = { 215 "query": query, 216 "max_results": min(max(max_results, 10), 100), 217 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 218 "user.fields": "id,name,username", 219 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 220 } 221 222 if since_id: 223 params["since_id"] = since_id 224 225 logger.info(f"Searching for mentions of @{username}") 226 response = self._make_request(endpoint, params) 227 228 if response and "data" in response: 229 tweets = response["data"] 230 logger.info(f"Found {len(tweets)} mentions via search") 231 return tweets 232 else: 233 if response: 234 logger.info(f"No mentions found via search. Response: {response}") 235 else: 236 logger.warning("Search request failed") 237 return [] 238 239 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]: 240 """ 241 Get all tweets in a conversation thread up to a specific tweet ID. 242 243 Args: 244 conversation_id: The conversation ID to fetch (should be the original tweet ID) 245 use_cache: Whether to use cached data if available 246 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID) 247 248 Returns: 249 List of tweets in the conversation, ordered chronologically 250 """ 251 # Check cache first if enabled 252 if use_cache: 253 cached_data = get_cached_thread_context(conversation_id) 254 if cached_data: 255 return cached_data 256 257 # First, get the original tweet directly since it might not appear in conversation search 258 original_tweet = None 259 try: 260 endpoint = f"/tweets/{conversation_id}" 261 params = { 262 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 263 "user.fields": "id,name,username", 264 "expansions": "author_id" 265 } 266 response = self._make_request(endpoint, params) 267 if response and "data" in response: 268 original_tweet = response["data"] 269 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}") 270 except Exception as e: 271 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}") 272 273 # Then search for all tweets in this conversation 274 endpoint = "/tweets/search/recent" 275 params = { 276 "query": f"conversation_id:{conversation_id}", 277 "max_results": 100, # Get as many as possible 278 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 279 "user.fields": "id,name,username", 280 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id", 281 "sort_order": "recency" # Get newest first, we'll reverse later 282 } 283 284 # Add until_id parameter to exclude tweets after the mention being processed 285 if until_id: 286 params["until_id"] = until_id 287 logger.info(f"Using until_id={until_id} to exclude future tweets") 288 289 logger.info(f"Fetching thread context for conversation {conversation_id}") 290 response = self._make_request(endpoint, params) 291 292 tweets = [] 293 users_data = {} 294 295 # Collect tweets from search 296 if response and "data" in response: 297 tweets.extend(response["data"]) 298 # Store user data for reference 299 if "includes" in response and "users" in response["includes"]: 300 for user in response["includes"]["users"]: 301 users_data[user["id"]] = user 302 303 # Add original tweet if we got it and it's not already in the list 304 if original_tweet: 305 tweet_ids = [t.get('id') for t in tweets] 306 if original_tweet.get('id') not in tweet_ids: 307 tweets.append(original_tweet) 308 logger.info("Added original tweet to thread context") 309 310 # Attempt to fill gaps by fetching referenced tweets that are missing 311 # This helps with X API's incomplete conversation search results 312 tweet_ids = set(t.get('id') for t in tweets) 313 missing_tweet_ids = set() 314 critical_missing_ids = set() 315 316 # Collect referenced tweet IDs, prioritizing critical ones 317 for tweet in tweets: 318 referenced_tweets = tweet.get('referenced_tweets', []) 319 for ref in referenced_tweets: 320 ref_id = ref.get('id') 321 ref_type = ref.get('type') 322 if ref_id and ref_id not in tweet_ids: 323 missing_tweet_ids.add(ref_id) 324 # Prioritize direct replies and quoted tweets over retweets 325 if ref_type in ['replied_to', 'quoted']: 326 critical_missing_ids.add(ref_id) 327 328 # For rate limit efficiency, only fetch critical missing tweets if we have many 329 if len(missing_tweet_ids) > 10: 330 logger.info(f"Many missing tweets ({len(missing_tweet_ids)}), prioritizing {len(critical_missing_ids)} critical ones") 331 missing_tweet_ids = critical_missing_ids 332 333 # Context sufficiency check - skip backfill if we already have enough context 334 if has_sufficient_context(tweets, missing_tweet_ids): 335 logger.info("Thread has sufficient context, skipping missing tweet backfill") 336 missing_tweet_ids = set() 337 338 # Fetch missing referenced tweets in batches (more rate-limit friendly) 339 if missing_tweet_ids: 340 missing_list = list(missing_tweet_ids) 341 342 # First, check cache for missing tweets 343 cached_tweets = get_cached_tweets(missing_list) 344 for tweet_id, cached_tweet in cached_tweets.items(): 345 if cached_tweet.get('conversation_id') == conversation_id: 346 tweets.append(cached_tweet) 347 tweet_ids.add(tweet_id) 348 logger.info(f"Retrieved missing tweet from cache: {tweet_id}") 349 350 # Add user data if available in cache 351 if cached_tweet.get('author_info'): 352 author_id = cached_tweet.get('author_id') 353 if author_id: 354 users_data[author_id] = cached_tweet['author_info'] 355 356 # Only fetch tweets that weren't found in cache 357 uncached_ids = [tid for tid in missing_list if tid not in cached_tweets] 358 359 if uncached_ids: 360 batch_size = 100 # X API limit for bulk tweet lookup 361 362 for i in range(0, len(uncached_ids), batch_size): 363 batch_ids = uncached_ids[i:i + batch_size] 364 try: 365 endpoint = "/tweets" 366 params = { 367 "ids": ",".join(batch_ids), 368 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 369 "user.fields": "id,name,username", 370 "expansions": "author_id" 371 } 372 response = self._make_request(endpoint, params) 373 374 if response and "data" in response: 375 fetched_tweets = [] 376 batch_users_data = {} 377 378 for missing_tweet in response["data"]: 379 # Only add if it's actually part of this conversation 380 if missing_tweet.get('conversation_id') == conversation_id: 381 tweets.append(missing_tweet) 382 tweet_ids.add(missing_tweet.get('id')) 383 fetched_tweets.append(missing_tweet) 384 logger.info(f"Retrieved missing referenced tweet: {missing_tweet.get('id')}") 385 386 # Add user data if available 387 if "includes" in response and "users" in response["includes"]: 388 for user in response["includes"]["users"]: 389 users_data[user["id"]] = user 390 batch_users_data[user["id"]] = user 391 392 # Cache the newly fetched tweets 393 if fetched_tweets: 394 save_cached_tweets(fetched_tweets, batch_users_data) 395 396 logger.info(f"Batch fetched {len(response['data'])} missing tweets from {len(batch_ids)} requested") 397 398 # Handle partial success - log any missing tweets that weren't found 399 if response and "errors" in response: 400 for error in response["errors"]: 401 logger.warning(f"Could not fetch tweet {error.get('resource_id')}: {error.get('title')}") 402 403 except Exception as e: 404 logger.warning(f"Could not fetch batch of missing tweets {batch_ids[:3]}...: {e}") 405 else: 406 logger.info(f"All {len(missing_list)} missing tweets found in cache") 407 408 if tweets: 409 # Filter out tweets that occur after until_id (if specified) 410 if until_id: 411 original_count = len(tweets) 412 # Convert until_id to int for comparison (Twitter IDs are sequential) 413 until_id_int = int(until_id) 414 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int] 415 filtered_count = len(tweets) 416 if original_count != filtered_count: 417 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}") 418 419 # Sort chronologically (oldest first) 420 tweets.sort(key=lambda x: x.get('created_at', '')) 421 logger.info(f"Retrieved {len(tweets)} tweets in thread") 422 423 thread_data = {"tweets": tweets, "users": users_data} 424 425 # Cache individual tweets from the thread for future backfill 426 save_cached_tweets(tweets, users_data) 427 428 # Cache the result 429 if use_cache: 430 save_cached_thread_context(conversation_id, thread_data) 431 432 return thread_data 433 else: 434 logger.warning("No tweets found for thread context") 435 return None 436 437 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]: 438 """ 439 Post a reply to a specific tweet. 440 441 Args: 442 reply_text: The text content of the reply 443 in_reply_to_tweet_id: The ID of the tweet to reply to 444 445 Returns: 446 Response data if successful, None if failed 447 """ 448 endpoint = "/tweets" 449 450 payload = { 451 "text": reply_text, 452 "reply": { 453 "in_reply_to_tweet_id": in_reply_to_tweet_id 454 } 455 } 456 457 logger.info(f"Attempting to post reply with {self.auth_method} authentication") 458 result = self._make_request(endpoint, method="POST", data=payload) 459 460 if result: 461 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}") 462 return result 463 else: 464 logger.error("Failed to post reply") 465 return None 466 467def load_x_config(config_path: str = "config.yaml") -> Dict[str, str]: 468 """Load X configuration from config file.""" 469 try: 470 with open(config_path, 'r') as f: 471 config = yaml.safe_load(f) 472 473 x_config = config.get('x', {}) 474 if not x_config.get('api_key') or not x_config.get('user_id'): 475 raise ValueError("X API key and user_id must be configured in config.yaml") 476 477 return x_config 478 except Exception as e: 479 logger.error(f"Failed to load X configuration: {e}") 480 raise 481 482def create_x_client(config_path: str = "config.yaml") -> XClient: 483 """Create and return an X client with configuration loaded from file.""" 484 config = load_x_config(config_path) 485 return XClient( 486 api_key=config['api_key'], 487 user_id=config['user_id'], 488 access_token=config.get('access_token'), 489 consumer_key=config.get('consumer_key'), 490 consumer_secret=config.get('consumer_secret'), 491 access_token_secret=config.get('access_token_secret') 492 ) 493 494def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str: 495 """ 496 Convert a mention object to a YAML string for better AI comprehension. 497 Similar to thread_to_yaml_string in bsky_utils.py 498 """ 499 # Extract relevant fields 500 simplified_mention = { 501 'id': mention.get('id'), 502 'text': mention.get('text'), 503 'author_id': mention.get('author_id'), 504 'created_at': mention.get('created_at'), 505 'in_reply_to_user_id': mention.get('in_reply_to_user_id') 506 } 507 508 # Add user information if available 509 if users_data and mention.get('author_id') in users_data: 510 user = users_data[mention.get('author_id')] 511 simplified_mention['author'] = { 512 'username': user.get('username'), 513 'name': user.get('name') 514 } 515 516 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False) 517 518def thread_to_yaml_string(thread_data: Dict) -> str: 519 """ 520 Convert X thread context to YAML string for AI comprehension. 521 Similar to Bluesky's thread_to_yaml_string function. 522 523 Args: 524 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 525 526 Returns: 527 YAML string representation of the thread 528 """ 529 if not thread_data or "tweets" not in thread_data: 530 return "conversation: []\n" 531 532 tweets = thread_data["tweets"] 533 users_data = thread_data.get("users", {}) 534 535 simplified_thread = { 536 "conversation": [] 537 } 538 539 for tweet in tweets: 540 # Get user info 541 author_id = tweet.get('author_id') 542 author_info = {} 543 if author_id and author_id in users_data: 544 user = users_data[author_id] 545 author_info = { 546 'username': user.get('username'), 547 'name': user.get('name') 548 } 549 550 # Build tweet object (simplified for AI consumption) 551 tweet_obj = { 552 'text': tweet.get('text'), 553 'created_at': tweet.get('created_at'), 554 'author': author_info, 555 'author_id': author_id # Include user ID for block management 556 } 557 558 simplified_thread["conversation"].append(tweet_obj) 559 560 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False) 561 562 563def ensure_x_user_blocks_attached(thread_data: Dict, agent_id: Optional[str] = None) -> None: 564 """ 565 Ensure all users in the thread have their X user blocks attached. 566 Creates blocks with initial content including their handle if they don't exist. 567 568 Args: 569 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 570 agent_id: The Letta agent ID to attach blocks to (defaults to config agent_id) 571 """ 572 if not thread_data or "users" not in thread_data: 573 return 574 575 try: 576 from tools.blocks import attach_x_user_blocks, x_user_note_set 577 from config_loader import get_letta_config 578 from letta_client import Letta 579 580 # Get Letta client and agent_id from config 581 config = get_letta_config() 582 client = Letta(token=config['api_key'], timeout=config['timeout']) 583 584 # Use provided agent_id or get from config 585 if agent_id is None: 586 agent_id = config['agent_id'] 587 588 # Get agent info to create a mock agent_state for the functions 589 class MockAgentState: 590 def __init__(self, agent_id): 591 self.id = agent_id 592 593 agent_state = MockAgentState(agent_id) 594 595 users_data = thread_data["users"] 596 user_ids = list(users_data.keys()) 597 598 if not user_ids: 599 return 600 601 logger.info(f"Ensuring X user blocks for {len(user_ids)} users: {user_ids}") 602 603 # Get current blocks to check which users already have blocks with content 604 current_blocks = client.agents.blocks.list(agent_id=agent_id) 605 existing_user_blocks = {} 606 607 for block in current_blocks: 608 if block.label.startswith("x_user_"): 609 user_id = block.label.replace("x_user_", "") 610 existing_user_blocks[user_id] = block 611 612 # Attach all user blocks (this will create missing ones with basic content) 613 attach_result = attach_x_user_blocks(user_ids, agent_state) 614 logger.info(f"X user block attachment result: {attach_result}") 615 616 # For newly created blocks, update with user handle information 617 for user_id in user_ids: 618 if user_id not in existing_user_blocks: 619 user_info = users_data[user_id] 620 username = user_info.get('username', 'unknown') 621 name = user_info.get('name', 'Unknown') 622 623 # Set initial content with handle information 624 initial_content = f"# X User: {user_id}\n\n**Handle:** @{username}\n**Name:** {name}\n\nNo additional information about this user yet." 625 626 try: 627 x_user_note_set(user_id, initial_content, agent_state) 628 logger.info(f"Set initial content for X user {user_id} (@{username})") 629 except Exception as e: 630 logger.error(f"Failed to set initial content for X user {user_id}: {e}") 631 632 except Exception as e: 633 logger.error(f"Error ensuring X user blocks: {e}") 634 635 636# X Caching and Queue System Functions 637 638def load_last_seen_id() -> Optional[str]: 639 """Load the last seen mention ID for incremental fetching.""" 640 if X_LAST_SEEN_FILE.exists(): 641 try: 642 with open(X_LAST_SEEN_FILE, 'r') as f: 643 data = json.load(f) 644 return data.get('last_seen_id') 645 except Exception as e: 646 logger.error(f"Error loading last seen ID: {e}") 647 return None 648 649def save_last_seen_id(mention_id: str): 650 """Save the last seen mention ID.""" 651 try: 652 X_QUEUE_DIR.mkdir(exist_ok=True) 653 with open(X_LAST_SEEN_FILE, 'w') as f: 654 json.dump({ 655 'last_seen_id': mention_id, 656 'updated_at': datetime.now().isoformat() 657 }, f) 658 logger.debug(f"Saved last seen ID: {mention_id}") 659 except Exception as e: 660 logger.error(f"Error saving last seen ID: {e}") 661 662def load_processed_mentions() -> set: 663 """Load the set of processed mention IDs.""" 664 if X_PROCESSED_MENTIONS_FILE.exists(): 665 try: 666 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f: 667 data = json.load(f) 668 # Keep only recent entries (last 10000) 669 if len(data) > 10000: 670 data = data[-10000:] 671 save_processed_mentions(set(data)) 672 return set(data) 673 except Exception as e: 674 logger.error(f"Error loading processed mentions: {e}") 675 return set() 676 677def save_processed_mentions(processed_set: set): 678 """Save the set of processed mention IDs.""" 679 try: 680 X_QUEUE_DIR.mkdir(exist_ok=True) 681 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f: 682 json.dump(list(processed_set), f) 683 except Exception as e: 684 logger.error(f"Error saving processed mentions: {e}") 685 686def load_downrank_users() -> Set[str]: 687 """Load the set of user IDs that should be downranked (responded to 10% of the time).""" 688 try: 689 if not X_DOWNRANK_USERS_FILE.exists(): 690 return set() 691 692 downrank_users = set() 693 with open(X_DOWNRANK_USERS_FILE, 'r') as f: 694 for line in f: 695 line = line.strip() 696 # Skip empty lines and comments 697 if line and not line.startswith('#'): 698 downrank_users.add(line) 699 700 logger.info(f"Loaded {len(downrank_users)} downrank users") 701 return downrank_users 702 except Exception as e: 703 logger.error(f"Error loading downrank users: {e}") 704 return set() 705 706def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool: 707 """ 708 Check if we should respond to a downranked user. 709 Returns True 10% of the time for downranked users, True 100% of the time for others. 710 """ 711 if user_id not in downrank_users: 712 return True 713 714 # 10% chance for downranked users 715 should_respond = random.random() < 0.1 716 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)") 717 return should_respond 718 719def save_mention_to_queue(mention: Dict): 720 """Save a mention to the queue directory for async processing.""" 721 try: 722 mention_id = mention.get('id') 723 if not mention_id: 724 logger.error("Mention missing ID, cannot queue") 725 return 726 727 # Check if already processed 728 processed_mentions = load_processed_mentions() 729 if mention_id in processed_mentions: 730 logger.debug(f"Mention {mention_id} already processed, skipping") 731 return 732 733 # Create queue directory 734 X_QUEUE_DIR.mkdir(exist_ok=True) 735 736 # Create filename using hash (similar to Bluesky system) 737 mention_str = json.dumps(mention, sort_keys=True) 738 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16] 739 filename = f"x_mention_{mention_hash}.json" 740 741 queue_file = X_QUEUE_DIR / filename 742 743 # Save mention data with enhanced debugging information 744 mention_data = { 745 'mention': mention, 746 'queued_at': datetime.now().isoformat(), 747 'type': 'x_mention', 748 # Debug info for conversation tracking 749 'debug_info': { 750 'mention_id': mention.get('id'), 751 'author_id': mention.get('author_id'), 752 'conversation_id': mention.get('conversation_id'), 753 'in_reply_to_user_id': mention.get('in_reply_to_user_id'), 754 'referenced_tweets': mention.get('referenced_tweets', []), 755 'text_preview': mention.get('text', '')[:200], 756 'created_at': mention.get('created_at'), 757 'public_metrics': mention.get('public_metrics', {}), 758 'context_annotations': mention.get('context_annotations', []) 759 } 760 } 761 762 with open(queue_file, 'w') as f: 763 json.dump(mention_data, f, indent=2) 764 765 logger.info(f"Queued X mention {mention_id} -> {filename}") 766 767 except Exception as e: 768 logger.error(f"Error saving mention to queue: {e}") 769 770# X Cache Functions 771def get_cached_thread_context(conversation_id: str) -> Optional[Dict]: 772 """Load cached thread context if available.""" 773 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 774 if cache_file.exists(): 775 try: 776 with open(cache_file, 'r') as f: 777 cached_data = json.load(f) 778 # Check if cache is recent (within 1 hour) 779 from datetime import datetime, timedelta 780 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 781 if datetime.now() - cached_time < timedelta(hours=1): 782 logger.info(f"Using cached thread context for {conversation_id}") 783 return cached_data.get('thread_data') 784 except Exception as e: 785 logger.warning(f"Error loading cached thread context: {e}") 786 return None 787 788def save_cached_thread_context(conversation_id: str, thread_data: Dict): 789 """Save thread context to cache.""" 790 try: 791 X_CACHE_DIR.mkdir(exist_ok=True) 792 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 793 794 cache_data = { 795 'conversation_id': conversation_id, 796 'thread_data': thread_data, 797 'cached_at': datetime.now().isoformat() 798 } 799 800 with open(cache_file, 'w') as f: 801 json.dump(cache_data, f, indent=2) 802 803 logger.debug(f"Cached thread context for {conversation_id}") 804 except Exception as e: 805 logger.error(f"Error caching thread context: {e}") 806 807def get_cached_tweets(tweet_ids: List[str]) -> Dict[str, Dict]: 808 """ 809 Load cached individual tweets if available. 810 Returns dict mapping tweet_id -> tweet_data for found tweets. 811 """ 812 cached_tweets = {} 813 814 for tweet_id in tweet_ids: 815 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 816 if cache_file.exists(): 817 try: 818 with open(cache_file, 'r') as f: 819 cached_data = json.load(f) 820 821 # Use longer cache times for older tweets (24 hours vs 1 hour) 822 from datetime import datetime, timedelta 823 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 824 tweet_created = cached_data.get('tweet_data', {}).get('created_at', '') 825 826 # Parse tweet creation time to determine age 827 try: 828 from dateutil.parser import parse 829 tweet_age = datetime.now() - parse(tweet_created) 830 cache_duration = timedelta(hours=24) if tweet_age > timedelta(hours=24) else timedelta(hours=1) 831 except: 832 cache_duration = timedelta(hours=1) # Default to 1 hour if parsing fails 833 834 if datetime.now() - cached_time < cache_duration: 835 cached_tweets[tweet_id] = cached_data.get('tweet_data') 836 logger.debug(f"Using cached tweet {tweet_id}") 837 838 except Exception as e: 839 logger.warning(f"Error loading cached tweet {tweet_id}: {e}") 840 841 return cached_tweets 842 843def save_cached_tweets(tweets_data: List[Dict], users_data: Dict[str, Dict] = None): 844 """Save individual tweets to cache for future reuse.""" 845 try: 846 X_CACHE_DIR.mkdir(exist_ok=True) 847 848 for tweet in tweets_data: 849 tweet_id = tweet.get('id') 850 if not tweet_id: 851 continue 852 853 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 854 855 # Include user data if available 856 tweet_with_user = tweet.copy() 857 if users_data and tweet.get('author_id') in users_data: 858 tweet_with_user['author_info'] = users_data[tweet.get('author_id')] 859 860 cache_data = { 861 'tweet_id': tweet_id, 862 'tweet_data': tweet_with_user, 863 'cached_at': datetime.now().isoformat() 864 } 865 866 with open(cache_file, 'w') as f: 867 json.dump(cache_data, f, indent=2) 868 869 logger.debug(f"Cached individual tweet {tweet_id}") 870 871 except Exception as e: 872 logger.error(f"Error caching individual tweets: {e}") 873 874def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool: 875 """ 876 Determine if we have sufficient context to skip backfilling missing tweets. 877 878 Args: 879 tweets: List of tweets already in the thread 880 missing_tweet_ids: Set of missing tweet IDs we'd like to fetch 881 882 Returns: 883 True if context is sufficient, False if backfill is needed 884 """ 885 # If no missing tweets, context is sufficient 886 if not missing_tweet_ids: 887 return True 888 889 # If we have a substantial conversation (5+ tweets), likely sufficient 890 if len(tweets) >= 5: 891 logger.debug(f"Thread has {len(tweets)} tweets, considering sufficient") 892 return True 893 894 # If only a few missing tweets and we have some context, might be enough 895 if len(missing_tweet_ids) <= 2 and len(tweets) >= 3: 896 logger.debug(f"Only {len(missing_tweet_ids)} missing tweets with {len(tweets)} existing, considering sufficient") 897 return True 898 899 # Check if we have conversational flow (mentions between users) 900 has_conversation_flow = False 901 for tweet in tweets: 902 text = tweet.get('text', '').lower() 903 # Look for mentions, replies, or conversational indicators 904 if '@' in text or 'reply' in text or len([t for t in tweets if t.get('author_id') != tweet.get('author_id')]) > 1: 905 has_conversation_flow = True 906 break 907 908 # If we have clear conversational flow and reasonable length, sufficient 909 if has_conversation_flow and len(tweets) >= 2: 910 logger.debug("Thread has conversational flow, considering sufficient") 911 return True 912 913 # Otherwise, we need to backfill 914 logger.debug(f"Context insufficient: {len(tweets)} tweets, {len(missing_tweet_ids)} missing, no clear flow") 915 return False 916 917def fetch_and_queue_mentions(username: str) -> int: 918 """ 919 Single-pass function to fetch new mentions and queue them. 920 Returns number of new mentions found. 921 """ 922 try: 923 client = create_x_client() 924 925 # Load last seen ID for incremental fetching 926 last_seen_id = load_last_seen_id() 927 928 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}") 929 930 # Search for mentions 931 mentions = client.search_mentions( 932 username=username, 933 since_id=last_seen_id, 934 max_results=100 # Get as many as possible 935 ) 936 937 if not mentions: 938 logger.info("No new mentions found") 939 return 0 940 941 # Process mentions (newest first, so reverse to process oldest first) 942 mentions.reverse() 943 new_count = 0 944 945 for mention in mentions: 946 save_mention_to_queue(mention) 947 new_count += 1 948 949 # Update last seen ID to the most recent mention 950 if mentions: 951 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent 952 save_last_seen_id(most_recent_id) 953 954 logger.info(f"Queued {new_count} new X mentions") 955 return new_count 956 957 except Exception as e: 958 logger.error(f"Error fetching and queuing mentions: {e}") 959 return 0 960 961# Simple test function 962def get_my_user_info(): 963 """Get the authenticated user's information to find correct user ID.""" 964 try: 965 client = create_x_client() 966 967 # Use the /2/users/me endpoint to get authenticated user info 968 endpoint = "/users/me" 969 params = { 970 "user.fields": "id,name,username,description" 971 } 972 973 print("Fetching authenticated user information...") 974 response = client._make_request(endpoint, params=params) 975 976 if response and "data" in response: 977 user_data = response["data"] 978 print(f"✅ Found authenticated user:") 979 print(f" ID: {user_data.get('id')}") 980 print(f" Username: @{user_data.get('username')}") 981 print(f" Name: {user_data.get('name')}") 982 print(f" Description: {user_data.get('description', 'N/A')[:100]}...") 983 print(f"\n🔧 Update your config.yaml with:") 984 print(f" user_id: \"{user_data.get('id')}\"") 985 return user_data 986 else: 987 print("❌ Failed to get user information") 988 print(f"Response: {response}") 989 return None 990 991 except Exception as e: 992 print(f"Error getting user info: {e}") 993 return None 994 995def test_search_mentions(): 996 """Test the search-based mention detection.""" 997 try: 998 client = create_x_client() 999 1000 # First get our username 1001 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 1002 if not user_info or "data" not in user_info: 1003 print("❌ Could not get username") 1004 return 1005 1006 username = user_info["data"]["username"] 1007 print(f"🔍 Searching for mentions of @{username}") 1008 1009 mentions = client.search_mentions(username, max_results=5) 1010 1011 if mentions: 1012 print(f"✅ Found {len(mentions)} mentions via search:") 1013 for mention in mentions: 1014 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...") 1015 else: 1016 print("No mentions found via search") 1017 1018 except Exception as e: 1019 print(f"Search test failed: {e}") 1020 1021def test_fetch_and_queue(): 1022 """Test the single-pass fetch and queue function.""" 1023 try: 1024 client = create_x_client() 1025 1026 # Get our username 1027 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 1028 if not user_info or "data" not in user_info: 1029 print("❌ Could not get username") 1030 return 1031 1032 username = user_info["data"]["username"] 1033 print(f"🔄 Fetching and queueing mentions for @{username}") 1034 1035 # Show current state 1036 last_seen = load_last_seen_id() 1037 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}") 1038 1039 # Fetch and queue 1040 new_count = fetch_and_queue_mentions(username) 1041 1042 if new_count > 0: 1043 print(f"✅ Queued {new_count} new mentions") 1044 print(f"📁 Check ./x_queue/ directory for queued mentions") 1045 1046 # Show updated state 1047 new_last_seen = load_last_seen_id() 1048 print(f"📍 Updated last seen ID: {new_last_seen}") 1049 else: 1050 print("ℹ️ No new mentions to queue") 1051 1052 except Exception as e: 1053 print(f"Fetch and queue test failed: {e}") 1054 1055def test_thread_context(): 1056 """Test thread context retrieval from a queued mention.""" 1057 try: 1058 import json 1059 1060 # Find a queued mention file 1061 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1062 if not queue_files: 1063 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1064 return 1065 1066 # Read the first mention 1067 mention_file = queue_files[0] 1068 with open(mention_file, 'r') as f: 1069 mention_data = json.load(f) 1070 1071 mention = mention_data['mention'] 1072 print(f"📄 Using mention: {mention.get('id')}") 1073 print(f"📝 Text: {mention.get('text')}") 1074 1075 # Check if it has a conversation_id 1076 conversation_id = mention.get('conversation_id') 1077 if not conversation_id: 1078 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.") 1079 return 1080 1081 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1082 1083 # Get thread context 1084 client = create_x_client() 1085 thread_data = client.get_thread_context(conversation_id) 1086 1087 if thread_data: 1088 tweets = thread_data.get('tweets', []) 1089 print(f"✅ Retrieved thread with {len(tweets)} tweets") 1090 1091 # Convert to YAML 1092 yaml_thread = thread_to_yaml_string(thread_data) 1093 1094 # Save thread context for inspection 1095 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml" 1096 with open(thread_file, 'w') as f: 1097 f.write(yaml_thread) 1098 1099 print(f"💾 Saved thread context to: {thread_file}") 1100 print("\n📋 Thread preview:") 1101 print(yaml_thread) 1102 else: 1103 print("❌ Failed to retrieve thread context") 1104 1105 except Exception as e: 1106 print(f"Thread context test failed: {e}") 1107 1108def test_letta_integration(agent_id: str = None): 1109 """Test sending X thread context to Letta agent.""" 1110 try: 1111 from letta_client import Letta 1112 import json 1113 import yaml 1114 1115 # Load full config to access letta section 1116 try: 1117 with open("config.yaml", 'r') as f: 1118 full_config = yaml.safe_load(f) 1119 1120 letta_config = full_config.get('letta', {}) 1121 api_key = letta_config.get('api_key') 1122 config_agent_id = letta_config.get('agent_id') 1123 1124 # Use agent_id from config if not provided as parameter 1125 if not agent_id: 1126 if config_agent_id: 1127 agent_id = config_agent_id 1128 print(f"ℹ️ Using agent_id from config: {agent_id}") 1129 else: 1130 print("❌ No agent_id found in config.yaml") 1131 print("Expected config structure:") 1132 print(" letta:") 1133 print(" agent_id: your-agent-id") 1134 return 1135 else: 1136 print(f"ℹ️ Using provided agent_id: {agent_id}") 1137 1138 if not api_key: 1139 # Try loading from environment as fallback 1140 import os 1141 api_key = os.getenv('LETTA_API_KEY') 1142 if not api_key: 1143 print("❌ LETTA_API_KEY not found in config.yaml or environment") 1144 print("Expected config structure:") 1145 print(" letta:") 1146 print(" api_key: your-letta-api-key") 1147 return 1148 else: 1149 print("ℹ️ Using LETTA_API_KEY from environment") 1150 else: 1151 print("ℹ️ Using LETTA_API_KEY from config.yaml") 1152 1153 except Exception as e: 1154 print(f"❌ Error loading config: {e}") 1155 return 1156 1157 letta_client = Letta(token=api_key, timeout=600) 1158 print(f"🤖 Connected to Letta, using agent: {agent_id}") 1159 1160 # Find a queued mention file 1161 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1162 if not queue_files: 1163 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1164 return 1165 1166 # Read the first mention 1167 mention_file = queue_files[0] 1168 with open(mention_file, 'r') as f: 1169 mention_data = json.load(f) 1170 1171 mention = mention_data['mention'] 1172 conversation_id = mention.get('conversation_id') 1173 1174 if not conversation_id: 1175 print("❌ No conversation_id found in mention.") 1176 return 1177 1178 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1179 1180 # Get thread context 1181 x_client = create_x_client() 1182 thread_data = x_client.get_thread_context(conversation_id) 1183 1184 if not thread_data: 1185 print("❌ Failed to retrieve thread context") 1186 return 1187 1188 # Convert to YAML 1189 yaml_thread = thread_to_yaml_string(thread_data) 1190 1191 # Create prompt for the agent 1192 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately. 1193 1194Here is the thread context: 1195 1196{yaml_thread} 1197 1198Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona.""" 1199 1200 print(f"📤 Sending thread context to Letta agent...") 1201 1202 # Print the prompt in a rich panel 1203 rprint(Panel(prompt, title="Prompt", border_style="blue")) 1204 1205 # Send to Letta agent using streaming 1206 message_stream = letta_client.agents.messages.create_stream( 1207 agent_id=agent_id, 1208 messages=[{"role": "user", "content": prompt}], 1209 stream_tokens=False, 1210 max_steps=10 1211 ) 1212 1213 print("🔄 Streaming response from agent...") 1214 response_text = "" 1215 1216 for chunk in message_stream: 1217 print(chunk) 1218 if hasattr(chunk, 'message_type'): 1219 if chunk.message_type == 'assistant_message': 1220 print(f"🤖 Agent response: {chunk.content}") 1221 response_text = chunk.content 1222 elif chunk.message_type == 'reasoning_message': 1223 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...") 1224 elif chunk.message_type == 'tool_call_message': 1225 print(f"🔧 Agent tool call: {chunk.tool_call.name}") 1226 1227 if response_text: 1228 print(f"\n✅ Agent generated response:") 1229 print(f"📝 Response: {response_text}") 1230 else: 1231 print("❌ No response generated by agent") 1232 1233 except Exception as e: 1234 print(f"Letta integration test failed: {e}") 1235 import traceback 1236 traceback.print_exc() 1237 1238def test_x_client(): 1239 """Test the X client by fetching mentions.""" 1240 try: 1241 client = create_x_client() 1242 mentions = client.get_mentions(max_results=5) 1243 1244 if mentions: 1245 print(f"Successfully retrieved {len(mentions)} mentions:") 1246 for mention in mentions: 1247 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...") 1248 else: 1249 print("No mentions retrieved") 1250 1251 except Exception as e: 1252 print(f"Test failed: {e}") 1253 1254def reply_to_cameron_post(): 1255 """ 1256 Reply to Cameron's specific X post. 1257 1258 NOTE: This requires OAuth User Context authentication, not Bearer token. 1259 Current Bearer token is Application-Only which can't post. 1260 """ 1261 try: 1262 client = create_x_client() 1263 1264 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618 1265 cameron_post_id = "1950690566909710618" 1266 1267 # Simple reply message 1268 reply_text = "Hello from void! 🤖 Testing X integration." 1269 1270 print(f"Attempting to reply to post {cameron_post_id}") 1271 print(f"Reply text: {reply_text}") 1272 print("\nNOTE: This will fail with current Bearer token (Application-Only)") 1273 print("Posting requires OAuth User Context authentication") 1274 1275 result = client.post_reply(reply_text, cameron_post_id) 1276 1277 if result: 1278 print(f"✅ Successfully posted reply!") 1279 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}") 1280 else: 1281 print("❌ Failed to post reply (expected with current auth)") 1282 1283 except Exception as e: 1284 print(f"Reply failed: {e}") 1285 1286def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False): 1287 """ 1288 Process an X mention and generate a reply using the Letta agent. 1289 Similar to bsky.py process_mention but for X/Twitter. 1290 1291 Args: 1292 void_agent: The Letta agent instance 1293 x_client: The X API client 1294 mention_data: The mention data dictionary 1295 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 1296 testing_mode: If True, don't actually post to X 1297 1298 Returns: 1299 True: Successfully processed, remove from queue 1300 False: Failed but retryable, keep in queue 1301 None: Failed with non-retryable error, move to errors directory 1302 "no_reply": No reply was generated, move to no_reply directory 1303 """ 1304 try: 1305 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}") 1306 1307 # Extract mention details 1308 if isinstance(mention_data, dict): 1309 # Handle both raw mention and queued mention formats 1310 if 'mention' in mention_data: 1311 mention = mention_data['mention'] 1312 else: 1313 mention = mention_data 1314 else: 1315 mention = mention_data 1316 1317 mention_id = mention.get('id') 1318 mention_text = mention.get('text', '') 1319 author_id = mention.get('author_id') 1320 conversation_id = mention.get('conversation_id') 1321 in_reply_to_user_id = mention.get('in_reply_to_user_id') 1322 referenced_tweets = mention.get('referenced_tweets', []) 1323 1324 # Check downrank list - only respond to downranked users 10% of the time 1325 downrank_users = load_downrank_users() 1326 if not should_respond_to_downranked_user(str(author_id), downrank_users): 1327 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection") 1328 return "no_reply" 1329 1330 # Enhanced conversation tracking for debug - especially important for Grok handling 1331 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}") 1332 logger.info(f" Author ID: {author_id}") 1333 logger.info(f" Conversation ID: {conversation_id}") 1334 logger.info(f" In Reply To User ID: {in_reply_to_user_id}") 1335 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items") 1336 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets 1337 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}") 1338 logger.info(f" Text preview: {mention_text[:100]}...") 1339 1340 if not conversation_id: 1341 logger.warning(f"❌ No conversation_id found for mention {mention_id} - this may cause thread context issues") 1342 return None 1343 1344 # Get thread context with caching enabled for efficiency 1345 # Use mention_id as until_id to exclude tweets that occurred after this mention 1346 try: 1347 thread_data = x_client.get_thread_context(conversation_id, use_cache=True, until_id=mention_id) 1348 if not thread_data: 1349 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}") 1350 return False 1351 1352 # If this mention references a specific tweet, ensure we have that tweet in context 1353 if referenced_tweets: 1354 for ref in referenced_tweets: 1355 if ref.get('type') == 'replied_to': 1356 ref_id = ref.get('id') 1357 # Check if the referenced tweet is in our thread data 1358 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])] 1359 if ref_id and ref_id not in thread_tweet_ids: 1360 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch") 1361 try: 1362 # Fetch the missing referenced tweet directly 1363 endpoint = f"/tweets/{ref_id}" 1364 params = { 1365 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 1366 "user.fields": "id,name,username", 1367 "expansions": "author_id" 1368 } 1369 response = x_client._make_request(endpoint, params) 1370 if response and "data" in response: 1371 missing_tweet = response["data"] 1372 if missing_tweet.get('conversation_id') == conversation_id: 1373 # Add to thread data 1374 if 'tweets' not in thread_data: 1375 thread_data['tweets'] = [] 1376 thread_data['tweets'].append(missing_tweet) 1377 1378 # Add user data if available 1379 if "includes" in response and "users" in response["includes"]: 1380 if 'users' not in thread_data: 1381 thread_data['users'] = {} 1382 for user in response["includes"]["users"]: 1383 thread_data['users'][user["id"]] = user 1384 1385 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context") 1386 else: 1387 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}") 1388 except Exception as e: 1389 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}") 1390 1391 # Enhanced thread context debugging 1392 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}") 1393 thread_posts = thread_data.get('tweets', []) 1394 thread_users = thread_data.get('users', {}) 1395 logger.info(f" Posts in thread: {len(thread_posts)}") 1396 logger.info(f" Users in thread: {len(thread_users)}") 1397 1398 # Log thread participants for Grok detection 1399 for user_id, user_info in thread_users.items(): 1400 username = user_info.get('username', 'unknown') 1401 name = user_info.get('name', 'Unknown') 1402 is_verified = user_info.get('verified', False) 1403 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}") 1404 1405 # Special logging for Grok or AI-related users 1406 if 'grok' in username.lower() or 'grok' in name.lower(): 1407 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})") 1408 1409 # Log conversation structure 1410 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts 1411 post_id = post.get('id') 1412 post_author = post.get('author_id') 1413 post_text = post.get('text', '')[:50] 1414 is_reply = 'in_reply_to_user_id' in post 1415 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...") 1416 1417 except Exception as e: 1418 logger.error(f"❌ Error getting thread context: {e}") 1419 return False 1420 1421 # Convert to YAML string 1422 thread_context = thread_to_yaml_string(thread_data) 1423 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters") 1424 1425 # Save comprehensive conversation data for debugging 1426 try: 1427 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1428 debug_dir.mkdir(parents=True, exist_ok=True) 1429 1430 # Save raw thread data (JSON) 1431 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f: 1432 json.dump(thread_data, f, indent=2) 1433 1434 # Save YAML thread context 1435 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f: 1436 f.write(thread_context) 1437 1438 # Save mention processing debug info 1439 debug_info = { 1440 'processed_at': datetime.now().isoformat(), 1441 'mention_id': mention_id, 1442 'conversation_id': conversation_id, 1443 'author_id': author_id, 1444 'in_reply_to_user_id': in_reply_to_user_id, 1445 'referenced_tweets': referenced_tweets, 1446 'thread_stats': { 1447 'total_posts': len(thread_posts), 1448 'total_users': len(thread_users), 1449 'yaml_length': len(thread_context) 1450 }, 1451 'users_in_conversation': { 1452 user_id: { 1453 'username': user_info.get('username'), 1454 'name': user_info.get('name'), 1455 'verified': user_info.get('verified', False), 1456 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower() 1457 } 1458 for user_id, user_info in thread_users.items() 1459 } 1460 } 1461 1462 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f: 1463 json.dump(debug_info, f, indent=2) 1464 1465 logger.info(f"💾 Saved conversation debug data to: {debug_dir}") 1466 1467 except Exception as debug_error: 1468 logger.warning(f"Failed to save debug data: {debug_error}") 1469 # Continue processing even if debug save fails 1470 1471 # Check for #voidstop 1472 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower(): 1473 logger.info("Found #voidstop, skipping this mention") 1474 return True 1475 1476 # Ensure X user blocks are attached 1477 try: 1478 ensure_x_user_blocks_attached(thread_data, void_agent.id) 1479 except Exception as e: 1480 logger.warning(f"Failed to ensure X user blocks: {e}") 1481 # Continue without user blocks rather than failing completely 1482 1483 # Create prompt for Letta agent 1484 author_info = thread_data.get('users', {}).get(author_id, {}) 1485 author_username = author_info.get('username', 'unknown') 1486 author_name = author_info.get('name', author_username) 1487 1488 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}). 1489 1490MOST RECENT POST (the mention you're responding to): 1491"{mention_text}" 1492 1493FULL THREAD CONTEXT: 1494```yaml 1495{thread_context} 1496``` 1497 1498The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 1499 1500If you need to update user information, use the x_user_* tools. 1501 1502To reply, use the add_post_to_x_thread tool: 1503- Each call creates one post (max 280 characters) 1504- For most responses, a single call is sufficient 1505- Only use multiple calls for threaded replies when: 1506 * The topic requires extended explanation that cannot fit in 280 characters 1507 * You're explicitly asked for a detailed/long response 1508 * The conversation naturally benefits from a structured multi-part answer 1509- Avoid unnecessary threads - be concise when possible""" 1510 1511 # Log mention processing 1512 title = f"X MENTION FROM @{author_username}" 1513 print(f"\n{title}") 1514 print(f" {'' * len(title)}") 1515 for line in mention_text.split('\n'): 1516 print(f" {line}") 1517 1518 # Send to Letta agent 1519 from config_loader import get_letta_config 1520 from letta_client import Letta 1521 1522 config = get_letta_config() 1523 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 1524 1525 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars") 1526 1527 try: 1528 # Use streaming to avoid timeout errors 1529 message_stream = letta_client.agents.messages.create_stream( 1530 agent_id=void_agent.id, 1531 messages=[{"role": "user", "content": prompt}], 1532 stream_tokens=False, 1533 max_steps=100 1534 ) 1535 1536 # Collect streaming response (simplified version of bsky.py logic) 1537 all_messages = [] 1538 for chunk in message_stream: 1539 if hasattr(chunk, 'message_type'): 1540 if chunk.message_type == 'reasoning_message': 1541 print("\n◆ Reasoning") 1542 print(" ─────────") 1543 for line in chunk.reasoning.split('\n'): 1544 print(f" {line}") 1545 elif chunk.message_type == 'tool_call_message': 1546 tool_name = chunk.tool_call.name 1547 if tool_name == 'add_post_to_x_thread': 1548 try: 1549 args = json.loads(chunk.tool_call.arguments) 1550 text = args.get('text', '') 1551 if text: 1552 print("\n✎ X Post") 1553 print(" ────────") 1554 for line in text.split('\n'): 1555 print(f" {line}") 1556 except: 1557 pass 1558 elif tool_name == 'halt_activity': 1559 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT") 1560 if queue_filepath and queue_filepath.exists(): 1561 queue_filepath.unlink() 1562 logger.info(f"Deleted queue file: {queue_filepath.name}") 1563 logger.info("=== X BOT TERMINATED BY AGENT ===") 1564 exit(0) 1565 elif chunk.message_type == 'tool_return_message': 1566 tool_name = chunk.name 1567 status = chunk.status 1568 if status == 'success' and tool_name == 'add_post_to_x_thread': 1569 print("\n✓ X Post Queued") 1570 print(" ──────────────") 1571 print(" Post queued successfully") 1572 elif chunk.message_type == 'assistant_message': 1573 print("\n▶ Assistant Response") 1574 print(" ──────────────────") 1575 for line in chunk.content.split('\n'): 1576 print(f" {line}") 1577 1578 all_messages.append(chunk) 1579 if str(chunk) == 'done': 1580 break 1581 1582 # Convert streaming response for compatibility 1583 message_response = type('StreamingResponse', (), { 1584 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 1585 })() 1586 1587 except Exception as api_error: 1588 logger.error(f"Letta API error: {api_error}") 1589 raise 1590 1591 # Extract successful add_post_to_x_thread tool calls 1592 reply_candidates = [] 1593 tool_call_results = {} 1594 ignored_notification = False 1595 ack_note = None # Track any note from annotate_ack tool 1596 1597 # First pass: collect tool return statuses 1598 for message in message_response.messages: 1599 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 1600 if message.name == 'add_post_to_x_thread': 1601 tool_call_results[message.tool_call_id] = message.status 1602 elif message.name == 'ignore_notification': 1603 if message.status == 'success': 1604 ignored_notification = True 1605 logger.info("🚫 X notification ignored") 1606 1607 # Second pass: collect successful tool calls 1608 for message in message_response.messages: 1609 if hasattr(message, 'tool_call') and message.tool_call: 1610 # Collect annotate_ack tool calls 1611 if message.tool_call.name == 'annotate_ack': 1612 try: 1613 args = json.loads(message.tool_call.arguments) 1614 note = args.get('note', '') 1615 if note: 1616 ack_note = note 1617 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 1618 except json.JSONDecodeError as e: 1619 logger.error(f"Failed to parse annotate_ack arguments: {e}") 1620 1621 # Collect add_post_to_x_thread tool calls - only if they were successful 1622 elif message.tool_call.name == 'add_post_to_x_thread': 1623 tool_call_id = message.tool_call.tool_call_id 1624 tool_status = tool_call_results.get(tool_call_id, 'unknown') 1625 1626 if tool_status == 'success': 1627 try: 1628 args = json.loads(message.tool_call.arguments) 1629 reply_text = args.get('text', '') 1630 if reply_text: 1631 reply_candidates.append(reply_text) 1632 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...") 1633 except json.JSONDecodeError as e: 1634 logger.error(f"Failed to parse tool call arguments: {e}") 1635 1636 # Save agent response data to debug folder 1637 try: 1638 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1639 1640 # Save complete agent interaction 1641 agent_response_data = { 1642 'processed_at': datetime.now().isoformat(), 1643 'mention_id': mention_id, 1644 'conversation_id': conversation_id, 1645 'prompt_sent': prompt, 1646 'reply_candidates': reply_candidates, 1647 'ignored_notification': ignored_notification, 1648 'ack_note': ack_note, 1649 'tool_call_results': tool_call_results, 1650 'all_messages': [] 1651 } 1652 1653 # Convert messages to serializable format 1654 for message in message_response.messages: 1655 msg_data = { 1656 'message_type': getattr(message, 'message_type', 'unknown'), 1657 'content': getattr(message, 'content', ''), 1658 'reasoning': getattr(message, 'reasoning', ''), 1659 'status': getattr(message, 'status', ''), 1660 'name': getattr(message, 'name', ''), 1661 } 1662 1663 if hasattr(message, 'tool_call') and message.tool_call: 1664 msg_data['tool_call'] = { 1665 'name': message.tool_call.name, 1666 'arguments': message.tool_call.arguments, 1667 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '') 1668 } 1669 1670 agent_response_data['all_messages'].append(msg_data) 1671 1672 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f: 1673 json.dump(agent_response_data, f, indent=2) 1674 1675 logger.info(f"💾 Saved agent response debug data") 1676 1677 except Exception as debug_error: 1678 logger.warning(f"Failed to save agent response debug data: {debug_error}") 1679 1680 # Handle conflicts 1681 if reply_candidates and ignored_notification: 1682 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!") 1683 return False 1684 1685 if reply_candidates: 1686 # Post replies to X 1687 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X") 1688 1689 if len(reply_candidates) == 1: 1690 content = reply_candidates[0] 1691 title = f"Reply to @{author_username}" 1692 else: 1693 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)]) 1694 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)" 1695 1696 print(f"\n{title}") 1697 print(f" {'' * len(title)}") 1698 for line in content.split('\n'): 1699 print(f" {line}") 1700 1701 if testing_mode: 1702 logger.info("TESTING MODE: Skipping actual X post") 1703 return True 1704 else: 1705 # Post to X using thread approach 1706 success = post_x_thread_replies(x_client, mention_id, reply_candidates) 1707 if success: 1708 logger.info(f"Successfully replied to @{author_username} on X") 1709 1710 # Acknowledge the post we're replying to 1711 try: 1712 ack_result = acknowledge_x_post(x_client, mention_id, ack_note) 1713 if ack_result: 1714 if ack_note: 1715 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")") 1716 else: 1717 logger.info(f"Successfully acknowledged X post from @{author_username}") 1718 else: 1719 logger.warning(f"Failed to acknowledge X post from @{author_username}") 1720 except Exception as e: 1721 logger.error(f"Error acknowledging X post from @{author_username}: {e}") 1722 # Don't fail the entire operation if acknowledgment fails 1723 1724 return True 1725 else: 1726 logger.error(f"Failed to send reply to @{author_username} on X") 1727 return False 1728 else: 1729 if ignored_notification: 1730 logger.info(f"X mention from @{author_username} was explicitly ignored") 1731 return "ignored" 1732 else: 1733 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass") 1734 return False # Keep in queue for retry instead of removing 1735 1736 except Exception as e: 1737 logger.error(f"Error processing X mention: {e}") 1738 return False 1739 1740def acknowledge_x_post(x_client, post_id, note=None): 1741 """ 1742 Acknowledge an X post that we replied to. 1743 Uses the same Bluesky client and uploads to the void data repository on atproto, 1744 just like Bluesky acknowledgments. 1745 1746 Args: 1747 x_client: XClient instance (not used, kept for compatibility) 1748 post_id: The X post ID we're acknowledging 1749 note: Optional note to include with the acknowledgment 1750 1751 Returns: 1752 True if successful, False otherwise 1753 """ 1754 try: 1755 # Use Bluesky client to upload acks to the void data repository on atproto 1756 bsky_client = bsky_utils.default_login() 1757 1758 # Create a synthetic URI and CID for the X post 1759 # X posts don't have atproto URIs/CIDs, so we create identifiers 1760 post_uri = f"x://twitter.com/post/{post_id}" 1761 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts 1762 1763 # Use the same acknowledge_post function as Bluesky 1764 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note) 1765 1766 if ack_result: 1767 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else "")) 1768 return True 1769 else: 1770 logger.error(f"Failed to acknowledge X post {post_id}") 1771 return False 1772 1773 except Exception as e: 1774 logger.error(f"Error acknowledging X post {post_id}: {e}") 1775 return False 1776 1777def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages): 1778 """ 1779 Post a series of replies to X, threading them properly. 1780 1781 Args: 1782 x_client: XClient instance 1783 in_reply_to_tweet_id: The original tweet ID to reply to 1784 reply_messages: List of reply text strings 1785 1786 Returns: 1787 True if successful, False otherwise 1788 """ 1789 try: 1790 current_reply_id = in_reply_to_tweet_id 1791 1792 for i, reply_text in enumerate(reply_messages): 1793 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...") 1794 1795 result = x_client.post_reply(reply_text, current_reply_id) 1796 1797 if result and 'data' in result: 1798 new_tweet_id = result['data']['id'] 1799 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}") 1800 # For threading, the next reply should reply to this one 1801 current_reply_id = new_tweet_id 1802 else: 1803 logger.error(f"Failed to post X reply {i+1}") 1804 return False 1805 1806 return True 1807 1808 except Exception as e: 1809 logger.error(f"Error posting X thread replies: {e}") 1810 return False 1811 1812def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False): 1813 """ 1814 Load and process all X mentions from the queue. 1815 Similar to bsky.py load_and_process_queued_notifications but for X. 1816 """ 1817 try: 1818 # Get all X mention files in queue directory 1819 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1820 1821 if not queue_files: 1822 return 1823 1824 # Load file metadata and sort by creation time (chronological order) 1825 file_metadata = [] 1826 for filepath in queue_files: 1827 try: 1828 with open(filepath, 'r') as f: 1829 queue_data = json.load(f) 1830 mention_data = queue_data.get('mention', queue_data) 1831 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing 1832 file_metadata.append((created_at, filepath)) 1833 except Exception as e: 1834 logger.warning(f"Error reading queue file {filepath.name}: {e}") 1835 # Add with default timestamp so it still gets processed 1836 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath)) 1837 1838 # Sort by creation time (oldest first) 1839 file_metadata.sort(key=lambda x: x[0]) 1840 1841 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order") 1842 1843 for i, (created_at, filepath) in enumerate(file_metadata, 1): 1844 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})") 1845 1846 try: 1847 # Load mention data 1848 with open(filepath, 'r') as f: 1849 queue_data = json.load(f) 1850 1851 mention_data = queue_data.get('mention', queue_data) 1852 1853 # Process the mention 1854 success = process_x_mention(void_agent, x_client, mention_data, 1855 queue_filepath=filepath, testing_mode=testing_mode) 1856 1857 except XRateLimitError: 1858 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning") 1859 break 1860 1861 except Exception as e: 1862 logger.error(f"Error processing X queue file {filepath.name}: {e}") 1863 continue 1864 1865 # Handle file based on processing result 1866 if success: 1867 if testing_mode: 1868 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}") 1869 else: 1870 filepath.unlink() 1871 logger.info(f"Successfully processed and removed X file: {filepath.name}") 1872 1873 # Mark as processed 1874 processed_mentions = load_processed_mentions() 1875 processed_mentions.add(mention_data.get('id')) 1876 save_processed_mentions(processed_mentions) 1877 1878 elif success is None: # Move to error directory 1879 error_dir = X_QUEUE_DIR / "errors" 1880 error_dir.mkdir(exist_ok=True) 1881 error_path = error_dir / filepath.name 1882 filepath.rename(error_path) 1883 logger.warning(f"Moved X file {filepath.name} to errors directory") 1884 1885 elif success == "no_reply": # Move to no_reply directory 1886 no_reply_dir = X_QUEUE_DIR / "no_reply" 1887 no_reply_dir.mkdir(exist_ok=True) 1888 no_reply_path = no_reply_dir / filepath.name 1889 filepath.rename(no_reply_path) 1890 logger.info(f"Moved X file {filepath.name} to no_reply directory") 1891 1892 elif success == "ignored": # Delete ignored notifications 1893 filepath.unlink() 1894 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}") 1895 1896 else: 1897 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry") 1898 1899 except Exception as e: 1900 logger.error(f"Error loading queued X mentions: {e}") 1901 1902def process_x_notifications(void_agent, x_client, testing_mode=False): 1903 """ 1904 Fetch new X mentions, queue them, and process the queue. 1905 Similar to bsky.py process_notifications but for X. 1906 """ 1907 try: 1908 # Get username for fetching mentions 1909 user_info = x_client._make_request("/users/me", params={"user.fields": "username"}) 1910 if not user_info or "data" not in user_info: 1911 logger.error("Could not get username for X mentions") 1912 return 1913 1914 username = user_info["data"]["username"] 1915 1916 # Fetch and queue new mentions 1917 new_count = fetch_and_queue_mentions(username) 1918 1919 if new_count > 0: 1920 logger.info(f"Found {new_count} new X mentions to process") 1921 1922 # Process the entire queue 1923 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 1924 1925 except Exception as e: 1926 logger.error(f"Error processing X notifications: {e}") 1927 1928def initialize_x_void(): 1929 """Initialize the void agent for X operations.""" 1930 logger.info("Starting void agent initialization for X...") 1931 1932 from config_loader import get_letta_config 1933 from letta_client import Letta 1934 1935 # Get config 1936 config = get_letta_config() 1937 client = Letta(token=config['api_key'], timeout=config['timeout']) 1938 agent_id = config['agent_id'] 1939 1940 try: 1941 void_agent = client.agents.retrieve(agent_id=agent_id) 1942 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})") 1943 except Exception as e: 1944 logger.error(f"Failed to load void agent {agent_id}: {e}") 1945 raise e 1946 1947 # Ensure correct tools are attached for X 1948 logger.info("Configuring tools for X platform...") 1949 try: 1950 from tool_manager import ensure_platform_tools 1951 ensure_platform_tools('x', void_agent.id) 1952 except Exception as e: 1953 logger.error(f"Failed to configure platform tools: {e}") 1954 logger.warning("Continuing with existing tool configuration") 1955 1956 # Log agent details 1957 logger.info(f"X Void agent details - ID: {void_agent.id}") 1958 logger.info(f"Agent name: {void_agent.name}") 1959 1960 return void_agent 1961 1962def x_main_loop(testing_mode=False): 1963 """ 1964 Main X bot loop that continuously monitors for mentions and processes them. 1965 Similar to bsky.py main() but for X/Twitter. 1966 """ 1967 import time 1968 from time import sleep 1969 1970 logger.info("=== STARTING X VOID BOT ===") 1971 1972 # Initialize void agent 1973 void_agent = initialize_x_void() 1974 logger.info(f"X void agent initialized: {void_agent.id}") 1975 1976 # Initialize X client 1977 x_client = create_x_client() 1978 logger.info("Connected to X API") 1979 1980 # Main loop 1981 FETCH_DELAY_SEC = 120 # Check every 2 minutes for X mentions (reduced from 60s to conserve API calls) 1982 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds") 1983 1984 if testing_mode: 1985 logger.info("=== RUNNING IN X TESTING MODE ===") 1986 logger.info(" - No messages will be sent to X") 1987 logger.info(" - Queue files will not be deleted") 1988 1989 cycle_count = 0 1990 start_time = time.time() 1991 1992 while True: 1993 try: 1994 cycle_count += 1 1995 logger.info(f"=== X CYCLE {cycle_count} ===") 1996 1997 # Process X notifications (fetch, queue, and process) 1998 process_x_notifications(void_agent, x_client, testing_mode) 1999 2000 # Log cycle completion 2001 elapsed_time = time.time() - start_time 2002 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes") 2003 2004 sleep(FETCH_DELAY_SEC) 2005 2006 except KeyboardInterrupt: 2007 elapsed_time = time.time() - start_time 2008 logger.info("=== X BOT STOPPED BY USER ===") 2009 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes") 2010 break 2011 except Exception as e: 2012 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===") 2013 logger.error(f"Error details: {e}") 2014 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...") 2015 sleep(FETCH_DELAY_SEC * 2) 2016 2017def process_queue_only(testing_mode=False): 2018 """ 2019 Process all queued X mentions without fetching new ones. 2020 Useful for rate limit management - queue first, then process separately. 2021 2022 Args: 2023 testing_mode: If True, don't actually post to X and keep queue files 2024 """ 2025 logger.info("=== PROCESSING X QUEUE ONLY ===") 2026 2027 if testing_mode: 2028 logger.info("=== RUNNING IN X TESTING MODE ===") 2029 logger.info(" - No messages will be sent to X") 2030 logger.info(" - Queue files will not be deleted") 2031 2032 try: 2033 # Initialize void agent 2034 void_agent = initialize_x_void() 2035 logger.info(f"X void agent initialized: {void_agent.id}") 2036 2037 # Initialize X client 2038 x_client = create_x_client() 2039 logger.info("Connected to X API") 2040 2041 # Process the queue without fetching new mentions 2042 logger.info("Processing existing X queue...") 2043 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 2044 2045 logger.info("=== X QUEUE PROCESSING COMPLETE ===") 2046 2047 except Exception as e: 2048 logger.error(f"Error processing X queue: {e}") 2049 raise 2050 2051def x_notification_loop(): 2052 """ 2053 DEPRECATED: Old X notification loop using search-based mention detection. 2054 Use x_main_loop() instead for the full bot experience. 2055 """ 2056 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.") 2057 x_main_loop() 2058 2059if __name__ == "__main__": 2060 import sys 2061 import argparse 2062 2063 if len(sys.argv) > 1: 2064 if sys.argv[1] == "bot": 2065 # Main bot with optional --test flag 2066 parser = argparse.ArgumentParser(description='X Void Bot') 2067 parser.add_argument('command', choices=['bot']) 2068 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)') 2069 args = parser.parse_args() 2070 x_main_loop(testing_mode=args.test) 2071 elif sys.argv[1] == "loop": 2072 x_notification_loop() 2073 elif sys.argv[1] == "reply": 2074 reply_to_cameron_post() 2075 elif sys.argv[1] == "me": 2076 get_my_user_info() 2077 elif sys.argv[1] == "search": 2078 test_search_mentions() 2079 elif sys.argv[1] == "queue": 2080 test_fetch_and_queue() 2081 elif sys.argv[1] == "thread": 2082 test_thread_context() 2083 elif sys.argv[1] == "process": 2084 # Process all queued mentions with optional --test flag 2085 testing_mode = "--test" in sys.argv 2086 process_queue_only(testing_mode=testing_mode) 2087 elif sys.argv[1] == "letta": 2088 # Use specific agent ID if provided, otherwise use from config 2089 agent_id = sys.argv[2] if len(sys.argv) > 2 else None 2090 test_letta_integration(agent_id) 2091 elif sys.argv[1] == "downrank": 2092 # View or manage downrank list 2093 if len(sys.argv) > 2 and sys.argv[2] == "list": 2094 downrank_users = load_downrank_users() 2095 if downrank_users: 2096 print(f"📋 Downrank users ({len(downrank_users)} total):") 2097 for user_id in sorted(downrank_users): 2098 print(f" - {user_id}") 2099 else: 2100 print("📋 No downrank users configured") 2101 else: 2102 print("Usage: python x.py downrank list") 2103 print(" list - Show all downranked user IDs") 2104 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list") 2105 else: 2106 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]") 2107 print(" bot - Run the main X bot (use --test for testing mode)") 2108 print(" Example: python x.py bot --test") 2109 print(" queue - Fetch and queue mentions only (no processing)") 2110 print(" process - Process all queued mentions only (no fetching)") 2111 print(" Example: python x.py process --test") 2112 print(" downrank - Manage downrank users (10% response rate)") 2113 print(" Example: python x.py downrank list") 2114 print(" loop - Run the old notification monitoring loop (deprecated)") 2115 print(" reply - Reply to Cameron's specific post") 2116 print(" me - Get authenticated user info and correct user ID") 2117 print(" search - Test search-based mention detection") 2118 print(" thread - Test thread context retrieval from queued mention") 2119 print(" letta - Test sending thread context to Letta agent") 2120 print(" Optional: python x.py letta <agent-id>") 2121 else: 2122 test_x_client()