a digital person for bluesky
at main 96 kB view raw
1import os 2import logging 3import requests 4import yaml 5import json 6import hashlib 7import random 8import time 9from typing import Optional, Dict, Any, List, Set 10from datetime import datetime 11from pathlib import Path 12from requests_oauthlib import OAuth1 13from rich import print as rprint 14from rich.panel import Panel 15from rich.text import Text 16 17import bsky_utils 18 19class XRateLimitError(Exception): 20 """Exception raised when X API rate limit is exceeded""" 21 pass 22 23 24# Configure logging (will be updated by setup_logging_from_config if called) 25logging.basicConfig( 26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 27) 28logger = logging.getLogger("x_client") 29 30def setup_logging_from_config(config_path: str = "configs/x_config.yaml"): 31 """Configure logging based on configs/x_config.yaml settings.""" 32 try: 33 config = load_x_config(config_path) 34 logging_config = config.get('logging', {}) 35 log_level = logging_config.get('level', 'INFO').upper() 36 37 # Convert string level to logging constant 38 numeric_level = getattr(logging, log_level, logging.INFO) 39 40 # Update the root logger level 41 logging.getLogger().setLevel(numeric_level) 42 # Update our specific logger level 43 logger.setLevel(numeric_level) 44 45 logger.info(f"Logging level set to {log_level}") 46 47 except Exception as e: 48 logger.warning(f"Failed to configure logging from config: {e}, using default INFO level") 49 50# X-specific file paths 51X_QUEUE_DIR = Path("x_queue") 52X_CACHE_DIR = Path("x_cache") 53X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json") 54X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json") 55X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt") 56 57class XClient: 58 """X (Twitter) API client for fetching mentions and managing interactions.""" 59 60 def __init__(self, api_key: str, user_id: str, access_token: str = None, 61 consumer_key: str = None, consumer_secret: str = None, 62 access_token_secret: str = None): 63 self.api_key = api_key 64 self.access_token = access_token 65 self.user_id = user_id 66 self.base_url = "https://api.x.com/2" 67 68 # Check if we have OAuth 1.0a credentials 69 if (consumer_key and consumer_secret and access_token and access_token_secret): 70 # Use OAuth 1.0a for User Context 71 self.oauth = OAuth1( 72 consumer_key, 73 client_secret=consumer_secret, 74 resource_owner_key=access_token, 75 resource_owner_secret=access_token_secret 76 ) 77 self.headers = {"Content-Type": "application/json"} 78 self.auth_method = "oauth1a" 79 logger.info("Using OAuth 1.0a User Context authentication for X API") 80 elif access_token: 81 # Use OAuth 2.0 Bearer token for User Context 82 self.oauth = None 83 self.headers = { 84 "Authorization": f"Bearer {access_token}", 85 "Content-Type": "application/json" 86 } 87 self.auth_method = "oauth2_user" 88 logger.info("Using OAuth 2.0 User Context access token for X API") 89 else: 90 # Use Application-Only Bearer token 91 self.oauth = None 92 self.headers = { 93 "Authorization": f"Bearer {api_key}", 94 "Content-Type": "application/json" 95 } 96 self.auth_method = "bearer" 97 logger.info("Using Application-Only Bearer token for X API") 98 99 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]: 100 """Make a request to the X API with proper error handling and exponential backoff.""" 101 url = f"{self.base_url}{endpoint}" 102 103 # Log the specific API call being made 104 logger.debug(f"Making X API request: {method} {endpoint}") 105 106 for attempt in range(max_retries): 107 try: 108 if method.upper() == "GET": 109 if self.oauth: 110 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth) 111 else: 112 response = requests.get(url, headers=self.headers, params=params) 113 elif method.upper() == "POST": 114 if self.oauth: 115 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth) 116 else: 117 response = requests.post(url, headers=self.headers, json=data) 118 else: 119 raise ValueError(f"Unsupported HTTP method: {method}") 120 121 response.raise_for_status() 122 return response.json() 123 124 except requests.exceptions.HTTPError as e: 125 if response.status_code == 401: 126 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials") 127 logger.error(f"Response: {response.text}") 128 return None # Don't retry auth failures 129 elif response.status_code == 403: 130 logger.error(f"X API forbidden with {self.auth_method} - check app permissions") 131 logger.error(f"Response: {response.text}") 132 return None # Don't retry permission failures 133 elif response.status_code == 429: 134 if attempt < max_retries - 1: 135 # Exponential backoff: 60s, 120s, 240s 136 backoff_time = 60 * (2 ** attempt) 137 logger.warning(f"X API rate limit exceeded on {method} {endpoint} (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry") 138 logger.error(f"Response: {response.text}") 139 time.sleep(backoff_time) 140 continue 141 else: 142 logger.error(f"X API rate limit exceeded on {method} {endpoint} - max retries reached") 143 logger.error(f"Response: {response.text}") 144 raise XRateLimitError(f"X API rate limit exceeded on {method} {endpoint}") 145 else: 146 if attempt < max_retries - 1: 147 # Exponential backoff for other HTTP errors too 148 backoff_time = 30 * (2 ** attempt) 149 logger.warning(f"X API request failed (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 150 logger.error(f"Response: {response.text}") 151 time.sleep(backoff_time) 152 continue 153 else: 154 logger.error(f"X API request failed after {max_retries} attempts: {e}") 155 logger.error(f"Response: {response.text}") 156 return None 157 158 except Exception as e: 159 if attempt < max_retries - 1: 160 backoff_time = 15 * (2 ** attempt) 161 logger.warning(f"Unexpected error making X API request (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 162 time.sleep(backoff_time) 163 continue 164 else: 165 logger.error(f"Unexpected error making X API request after {max_retries} attempts: {e}") 166 return None 167 168 return None 169 170 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]: 171 """ 172 Fetch mentions for the configured user. 173 174 Args: 175 since_id: Minimum Post ID to include (for getting newer mentions) 176 max_results: Number of results to return (5-100) 177 178 Returns: 179 List of mention objects or None if request failed 180 """ 181 endpoint = f"/users/{self.user_id}/mentions" 182 params = { 183 "max_results": min(max(max_results, 5), 100), # Ensure within API limits 184 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets", 185 "user.fields": "id,name,username", 186 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 187 } 188 189 if since_id: 190 params["since_id"] = since_id 191 192 logger.info(f"Fetching mentions for user {self.user_id}") 193 response = self._make_request(endpoint, params) 194 195 if response: 196 logger.debug(f"X API response: {response}") 197 198 if response and "data" in response: 199 mentions = response["data"] 200 logger.info(f"Retrieved {len(mentions)} mentions") 201 return mentions 202 else: 203 if response: 204 logger.info(f"No mentions in response. Full response: {response}") 205 else: 206 logger.warning("Request failed - no response received") 207 return [] 208 209 def get_user_info(self, user_id: str) -> Optional[Dict]: 210 """Get information about a specific user.""" 211 endpoint = f"/users/{user_id}" 212 params = { 213 "user.fields": "id,name,username,description,public_metrics" 214 } 215 216 response = self._make_request(endpoint, params) 217 return response.get("data") if response else None 218 219 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]: 220 """ 221 Search for mentions using the search endpoint instead of mentions endpoint. 222 This might have better rate limits than the direct mentions endpoint. 223 224 Args: 225 username: Username to search for mentions of (without @) 226 max_results: Number of results to return (10-100) 227 since_id: Only return results newer than this tweet ID 228 229 Returns: 230 List of tweets mentioning the username 231 """ 232 endpoint = "/tweets/search/recent" 233 234 # Search for mentions of the username 235 query = f"@{username}" 236 237 params = { 238 "query": query, 239 "max_results": min(max(max_results, 10), 100), 240 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 241 "user.fields": "id,name,username", 242 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 243 } 244 245 if since_id: 246 params["since_id"] = since_id 247 248 logger.info(f"Searching for mentions of @{username}") 249 response = self._make_request(endpoint, params) 250 251 if response and "data" in response: 252 tweets = response["data"] 253 logger.info(f"Found {len(tweets)} mentions via search") 254 return tweets 255 else: 256 if response: 257 logger.info(f"No mentions found via search. Response: {response}") 258 else: 259 logger.warning("Search request failed") 260 return [] 261 262 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]: 263 """ 264 Get all tweets in a conversation thread up to a specific tweet ID. 265 266 Args: 267 conversation_id: The conversation ID to fetch (should be the original tweet ID) 268 use_cache: Whether to use cached data if available 269 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID) 270 271 Returns: 272 List of tweets in the conversation, ordered chronologically 273 """ 274 # Check cache first if enabled 275 if use_cache: 276 cached_data = get_cached_thread_context(conversation_id) 277 if cached_data: 278 return cached_data 279 280 # First, get the original tweet directly since it might not appear in conversation search 281 logger.debug(f"Getting thread context for conversation {conversation_id}") 282 original_tweet = None 283 try: 284 endpoint = f"/tweets/{conversation_id}" 285 params = { 286 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 287 "user.fields": "id,name,username", 288 "expansions": "author_id" 289 } 290 logger.debug(f"Fetching original tweet: GET {endpoint}") 291 response = self._make_request(endpoint, params) 292 if response and "data" in response: 293 original_tweet = response["data"] 294 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}") 295 except Exception as e: 296 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}") 297 298 # Then search for all tweets in this conversation 299 endpoint = "/tweets/search/recent" 300 params = { 301 "query": f"conversation_id:{conversation_id}", 302 "max_results": 100, # Get as many as possible 303 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 304 "user.fields": "id,name,username", 305 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id", 306 "sort_order": "recency" # Get newest first, we'll reverse later 307 } 308 309 # Add until_id parameter to exclude tweets after the mention being processed 310 if until_id: 311 params["until_id"] = until_id 312 logger.info(f"Using until_id={until_id} to exclude future tweets") 313 314 logger.info(f"Fetching thread context for conversation {conversation_id}") 315 logger.debug(f"Searching conversation: GET {endpoint} with query={params['query']}") 316 response = self._make_request(endpoint, params) 317 318 tweets = [] 319 users_data = {} 320 321 # Collect tweets from search 322 if response and "data" in response: 323 tweets.extend(response["data"]) 324 # Store user data for reference 325 if "includes" in response and "users" in response["includes"]: 326 for user in response["includes"]["users"]: 327 users_data[user["id"]] = user 328 329 # Add original tweet if we got it and it's not already in the list 330 if original_tweet: 331 tweet_ids = [t.get('id') for t in tweets] 332 if original_tweet.get('id') not in tweet_ids: 333 tweets.append(original_tweet) 334 logger.info("Added original tweet to thread context") 335 336 # Attempt to fill gaps by fetching referenced tweets that are missing 337 # This helps with X API's incomplete conversation search results 338 tweet_ids = set(t.get('id') for t in tweets) 339 missing_tweet_ids = set() 340 critical_missing_ids = set() 341 342 # Collect referenced tweet IDs, prioritizing critical ones 343 for tweet in tweets: 344 referenced_tweets = tweet.get('referenced_tweets', []) 345 for ref in referenced_tweets: 346 ref_id = ref.get('id') 347 ref_type = ref.get('type') 348 if ref_id and ref_id not in tweet_ids: 349 missing_tweet_ids.add(ref_id) 350 # Prioritize direct replies and quoted tweets over retweets 351 if ref_type in ['replied_to', 'quoted']: 352 critical_missing_ids.add(ref_id) 353 354 # For rate limit efficiency, only fetch critical missing tweets if we have many 355 if len(missing_tweet_ids) > 10: 356 logger.info(f"Many missing tweets ({len(missing_tweet_ids)}), prioritizing {len(critical_missing_ids)} critical ones") 357 missing_tweet_ids = critical_missing_ids 358 359 # Context sufficiency check - skip backfill if we already have enough context 360 if has_sufficient_context(tweets, missing_tweet_ids): 361 logger.info("Thread has sufficient context, skipping missing tweet backfill") 362 missing_tweet_ids = set() 363 364 # Fetch missing referenced tweets in batches (more rate-limit friendly) 365 if missing_tweet_ids: 366 missing_list = list(missing_tweet_ids) 367 368 # First, check cache for missing tweets 369 cached_tweets = get_cached_tweets(missing_list) 370 for tweet_id, cached_tweet in cached_tweets.items(): 371 if cached_tweet.get('conversation_id') == conversation_id: 372 tweets.append(cached_tweet) 373 tweet_ids.add(tweet_id) 374 logger.info(f"Retrieved missing tweet from cache: {tweet_id}") 375 376 # Add user data if available in cache 377 if cached_tweet.get('author_info'): 378 author_id = cached_tweet.get('author_id') 379 if author_id: 380 users_data[author_id] = cached_tweet['author_info'] 381 382 # Only fetch tweets that weren't found in cache 383 uncached_ids = [tid for tid in missing_list if tid not in cached_tweets] 384 385 if uncached_ids: 386 batch_size = 100 # X API limit for bulk tweet lookup 387 388 for i in range(0, len(uncached_ids), batch_size): 389 batch_ids = uncached_ids[i:i + batch_size] 390 try: 391 endpoint = "/tweets" 392 params = { 393 "ids": ",".join(batch_ids), 394 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 395 "user.fields": "id,name,username", 396 "expansions": "author_id" 397 } 398 logger.debug(f"Batch fetching missing tweets: GET {endpoint} (ids: {len(batch_ids)} tweets)") 399 response = self._make_request(endpoint, params) 400 401 if response and "data" in response: 402 fetched_tweets = [] 403 batch_users_data = {} 404 405 for missing_tweet in response["data"]: 406 # Only add if it's actually part of this conversation 407 if missing_tweet.get('conversation_id') == conversation_id: 408 tweets.append(missing_tweet) 409 tweet_ids.add(missing_tweet.get('id')) 410 fetched_tweets.append(missing_tweet) 411 logger.info(f"Retrieved missing referenced tweet: {missing_tweet.get('id')}") 412 413 # Add user data if available 414 if "includes" in response and "users" in response["includes"]: 415 for user in response["includes"]["users"]: 416 users_data[user["id"]] = user 417 batch_users_data[user["id"]] = user 418 419 # Cache the newly fetched tweets 420 if fetched_tweets: 421 save_cached_tweets(fetched_tweets, batch_users_data) 422 423 logger.info(f"Batch fetched {len(response['data'])} missing tweets from {len(batch_ids)} requested") 424 425 # Handle partial success - log any missing tweets that weren't found 426 if response and "errors" in response: 427 for error in response["errors"]: 428 logger.warning(f"Could not fetch tweet {error.get('resource_id')}: {error.get('title')}") 429 430 except Exception as e: 431 logger.warning(f"Could not fetch batch of missing tweets {batch_ids[:3]}...: {e}") 432 else: 433 logger.info(f"All {len(missing_list)} missing tweets found in cache") 434 435 if tweets: 436 # Filter out tweets that occur after until_id (if specified) 437 if until_id: 438 original_count = len(tweets) 439 # Convert until_id to int for comparison (Twitter IDs are sequential) 440 until_id_int = int(until_id) 441 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int] 442 filtered_count = len(tweets) 443 if original_count != filtered_count: 444 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}") 445 446 # Sort chronologically (oldest first) 447 tweets.sort(key=lambda x: x.get('created_at', '')) 448 logger.info(f"Retrieved {len(tweets)} tweets in thread") 449 450 thread_data = {"tweets": tweets, "users": users_data} 451 452 # Cache individual tweets from the thread for future backfill 453 save_cached_tweets(tweets, users_data) 454 455 # Cache the result 456 if use_cache: 457 save_cached_thread_context(conversation_id, thread_data) 458 459 return thread_data 460 else: 461 logger.warning("No tweets found for thread context") 462 return None 463 464 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]: 465 """ 466 Post a reply to a specific tweet. 467 468 Args: 469 reply_text: The text content of the reply 470 in_reply_to_tweet_id: The ID of the tweet to reply to 471 472 Returns: 473 Response data if successful, None if failed 474 """ 475 endpoint = "/tweets" 476 477 payload = { 478 "text": reply_text, 479 "reply": { 480 "in_reply_to_tweet_id": in_reply_to_tweet_id 481 } 482 } 483 484 logger.info(f"Attempting to post reply with {self.auth_method} authentication") 485 logger.debug(f"Posting reply: POST {endpoint}") 486 result = self._make_request(endpoint, method="POST", data=payload) 487 488 if result: 489 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}") 490 return result 491 else: 492 logger.error("Failed to post reply") 493 return None 494 495 def post_tweet(self, tweet_text: str) -> Optional[Dict]: 496 """ 497 Post a standalone tweet (not a reply). 498 499 Args: 500 tweet_text: The text content of the tweet (max 280 characters) 501 502 Returns: 503 Response data if successful, None if failed 504 """ 505 endpoint = "/tweets" 506 507 # Validate tweet length 508 if len(tweet_text) > 280: 509 logger.error(f"Tweet text too long: {len(tweet_text)} characters (max 280)") 510 return None 511 512 payload = { 513 "text": tweet_text 514 } 515 516 logger.info(f"Attempting to post tweet with {self.auth_method} authentication") 517 logger.debug(f"Posting tweet: POST {endpoint}") 518 result = self._make_request(endpoint, method="POST", data=payload) 519 520 if result: 521 logger.info(f"Successfully posted tweet") 522 return result 523 else: 524 logger.error("Failed to post tweet") 525 return None 526 527 def get_user_info(self, fields: Optional[str] = None) -> Optional[Dict]: 528 """ 529 Get the authenticated user's information, using cached data when available. 530 This reduces API calls significantly since user info rarely changes. 531 532 Args: 533 fields: Optional comma-separated list of user fields to fetch 534 535 Returns: 536 User data dict if successful, None if failed 537 """ 538 # First try to get from cache 539 cached_user_info = get_cached_user_info() 540 if cached_user_info: 541 # Check if cached data has all requested fields 542 requested_fields = set(fields.split(',') if fields else ['id', 'username', 'name']) 543 cached_fields = set(cached_user_info.keys()) 544 if requested_fields.issubset(cached_fields): 545 return cached_user_info 546 547 # Cache miss, expired, or missing requested fields - fetch from API 548 logger.debug("Fetching fresh user info from /users/me API") 549 endpoint = "/users/me" 550 params = {"user.fields": fields or "id,username,name,description"} 551 552 response = self._make_request(endpoint, params=params) 553 if response and "data" in response: 554 user_data = response["data"] 555 # Cache the result for future use 556 save_cached_user_info(user_data) 557 return user_data 558 else: 559 logger.error("Failed to get user info from /users/me API") 560 return None 561 562 def get_username(self) -> Optional[str]: 563 """ 564 Get the authenticated user's username, using cached data when available. 565 This reduces API calls significantly since username rarely changes. 566 567 Returns: 568 Username string if successful, None if failed 569 """ 570 user_info = self.get_user_info("id,username,name") 571 return user_info.get("username") if user_info else None 572 573def load_x_config(config_path: str = "configs/x_config.yaml") -> Dict[str, Any]: 574 """Load complete X configuration from configs/x_config.yaml.""" 575 try: 576 with open(config_path, 'r') as f: 577 config = yaml.safe_load(f) 578 579 if not config: 580 raise ValueError(f"Empty or invalid configuration file: {config_path}") 581 582 # Validate required sections 583 x_config = config.get('x', {}) 584 letta_config = config.get('letta', {}) 585 586 if not x_config.get('api_key') or not x_config.get('user_id'): 587 raise ValueError("X API key and user_id must be configured in configs/x_config.yaml") 588 589 if not letta_config.get('api_key') or not letta_config.get('agent_id'): 590 raise ValueError("Letta API key and agent_id must be configured in configs/x_config.yaml") 591 592 return config 593 except Exception as e: 594 logger.error(f"Failed to load X configuration: {e}") 595 raise 596 597def get_x_letta_config(config_path: str = "configs/x_config.yaml") -> Dict[str, Any]: 598 """Get Letta configuration from X config file.""" 599 config = load_x_config(config_path) 600 return config['letta'] 601 602def create_x_client(config_path: str = "configs/x_config.yaml") -> XClient: 603 """Create and return an X client with configuration loaded from file.""" 604 config = load_x_config(config_path) 605 x_config = config['x'] 606 return XClient( 607 api_key=x_config['api_key'], 608 user_id=x_config['user_id'], 609 access_token=x_config.get('access_token'), 610 consumer_key=x_config.get('consumer_key'), 611 consumer_secret=x_config.get('consumer_secret'), 612 access_token_secret=x_config.get('access_token_secret') 613 ) 614 615def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str: 616 """ 617 Convert a mention object to a YAML string for better AI comprehension. 618 Similar to thread_to_yaml_string in bsky_utils.py 619 """ 620 # Extract relevant fields 621 simplified_mention = { 622 'id': mention.get('id'), 623 'text': mention.get('text'), 624 'author_id': mention.get('author_id'), 625 'created_at': mention.get('created_at'), 626 'in_reply_to_user_id': mention.get('in_reply_to_user_id') 627 } 628 629 # Add user information if available 630 if users_data and mention.get('author_id') in users_data: 631 user = users_data[mention.get('author_id')] 632 simplified_mention['author'] = { 633 'username': user.get('username'), 634 'name': user.get('name') 635 } 636 637 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False) 638 639def thread_to_yaml_string(thread_data: Dict) -> str: 640 """ 641 Convert X thread context to YAML string for AI comprehension. 642 Similar to Bluesky's thread_to_yaml_string function. 643 644 Args: 645 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 646 647 Returns: 648 YAML string representation of the thread 649 """ 650 if not thread_data or "tweets" not in thread_data: 651 return "conversation: []\n" 652 653 tweets = thread_data["tweets"] 654 users_data = thread_data.get("users", {}) 655 656 simplified_thread = { 657 "conversation": [] 658 } 659 660 for tweet in tweets: 661 # Get user info 662 author_id = tweet.get('author_id') 663 author_info = {} 664 if author_id and author_id in users_data: 665 user = users_data[author_id] 666 author_info = { 667 'username': user.get('username'), 668 'name': user.get('name') 669 } 670 671 # Build tweet object (simplified for AI consumption) 672 tweet_obj = { 673 'text': tweet.get('text'), 674 'created_at': tweet.get('created_at'), 675 'author': author_info, 676 'author_id': author_id # Include user ID for block management 677 } 678 679 simplified_thread["conversation"].append(tweet_obj) 680 681 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False) 682 683 684# X Caching and Queue System Functions 685 686def load_last_seen_id() -> Optional[str]: 687 """Load the last seen mention ID for incremental fetching.""" 688 if X_LAST_SEEN_FILE.exists(): 689 try: 690 with open(X_LAST_SEEN_FILE, 'r') as f: 691 data = json.load(f) 692 return data.get('last_seen_id') 693 except Exception as e: 694 logger.error(f"Error loading last seen ID: {e}") 695 return None 696 697def save_last_seen_id(mention_id: str): 698 """Save the last seen mention ID.""" 699 try: 700 X_QUEUE_DIR.mkdir(exist_ok=True) 701 with open(X_LAST_SEEN_FILE, 'w') as f: 702 json.dump({ 703 'last_seen_id': mention_id, 704 'updated_at': datetime.now().isoformat() 705 }, f) 706 logger.debug(f"Saved last seen ID: {mention_id}") 707 except Exception as e: 708 logger.error(f"Error saving last seen ID: {e}") 709 710def load_processed_mentions() -> set: 711 """Load the set of processed mention IDs.""" 712 if X_PROCESSED_MENTIONS_FILE.exists(): 713 try: 714 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f: 715 data = json.load(f) 716 # Keep only recent entries (last 10000) 717 if len(data) > 10000: 718 data = data[-10000:] 719 save_processed_mentions(set(data)) 720 return set(data) 721 except Exception as e: 722 logger.error(f"Error loading processed mentions: {e}") 723 return set() 724 725def save_processed_mentions(processed_set: set): 726 """Save the set of processed mention IDs.""" 727 try: 728 X_QUEUE_DIR.mkdir(exist_ok=True) 729 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f: 730 json.dump(list(processed_set), f) 731 except Exception as e: 732 logger.error(f"Error saving processed mentions: {e}") 733 734def load_downrank_users() -> Set[str]: 735 """Load the set of user IDs that should be downranked (responded to 10% of the time).""" 736 try: 737 if not X_DOWNRANK_USERS_FILE.exists(): 738 return set() 739 740 downrank_users = set() 741 with open(X_DOWNRANK_USERS_FILE, 'r') as f: 742 for line in f: 743 line = line.strip() 744 # Skip empty lines and comments 745 if line and not line.startswith('#'): 746 downrank_users.add(line) 747 748 logger.info(f"Loaded {len(downrank_users)} downrank users") 749 return downrank_users 750 except Exception as e: 751 logger.error(f"Error loading downrank users: {e}") 752 return set() 753 754def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool: 755 """ 756 Check if we should respond to a downranked user. 757 Returns True 10% of the time for downranked users, True 100% of the time for others. 758 """ 759 if user_id not in downrank_users: 760 return True 761 762 # 10% chance for downranked users 763 should_respond = random.random() < 0.1 764 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)") 765 return should_respond 766 767def save_mention_to_queue(mention: Dict): 768 """Save a mention to the queue directory for async processing.""" 769 try: 770 mention_id = mention.get('id') 771 if not mention_id: 772 logger.error("Mention missing ID, cannot queue") 773 return 774 775 # Check if already processed 776 processed_mentions = load_processed_mentions() 777 if mention_id in processed_mentions: 778 logger.debug(f"Mention {mention_id} already processed, skipping") 779 return 780 781 # Create queue directory 782 X_QUEUE_DIR.mkdir(exist_ok=True) 783 784 # Create filename using hash (similar to Bluesky system) 785 mention_str = json.dumps(mention, sort_keys=True) 786 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16] 787 filename = f"x_mention_{mention_hash}.json" 788 789 queue_file = X_QUEUE_DIR / filename 790 791 # Save mention data with enhanced debugging information 792 mention_data = { 793 'mention': mention, 794 'queued_at': datetime.now().isoformat(), 795 'type': 'x_mention', 796 # Debug info for conversation tracking 797 'debug_info': { 798 'mention_id': mention.get('id'), 799 'author_id': mention.get('author_id'), 800 'conversation_id': mention.get('conversation_id'), 801 'in_reply_to_user_id': mention.get('in_reply_to_user_id'), 802 'referenced_tweets': mention.get('referenced_tweets', []), 803 'text_preview': mention.get('text', '')[:200], 804 'created_at': mention.get('created_at'), 805 'public_metrics': mention.get('public_metrics', {}), 806 'context_annotations': mention.get('context_annotations', []) 807 } 808 } 809 810 with open(queue_file, 'w') as f: 811 json.dump(mention_data, f, indent=2) 812 813 logger.info(f"Queued X mention {mention_id} -> {filename}") 814 815 except Exception as e: 816 logger.error(f"Error saving mention to queue: {e}") 817 818# X Cache Functions 819def get_cached_thread_context(conversation_id: str) -> Optional[Dict]: 820 """Load cached thread context if available.""" 821 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 822 if cache_file.exists(): 823 try: 824 with open(cache_file, 'r') as f: 825 cached_data = json.load(f) 826 # Check if cache is recent (within 1 hour) 827 from datetime import datetime, timedelta 828 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 829 if datetime.now() - cached_time < timedelta(hours=1): 830 logger.info(f"Using cached thread context for {conversation_id}") 831 return cached_data.get('thread_data') 832 except Exception as e: 833 logger.warning(f"Error loading cached thread context: {e}") 834 return None 835 836def save_cached_thread_context(conversation_id: str, thread_data: Dict): 837 """Save thread context to cache.""" 838 try: 839 X_CACHE_DIR.mkdir(exist_ok=True) 840 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 841 842 cache_data = { 843 'conversation_id': conversation_id, 844 'thread_data': thread_data, 845 'cached_at': datetime.now().isoformat() 846 } 847 848 with open(cache_file, 'w') as f: 849 json.dump(cache_data, f, indent=2) 850 851 logger.debug(f"Cached thread context for {conversation_id}") 852 except Exception as e: 853 logger.error(f"Error caching thread context: {e}") 854 855def get_cached_tweets(tweet_ids: List[str]) -> Dict[str, Dict]: 856 """ 857 Load cached individual tweets if available. 858 Returns dict mapping tweet_id -> tweet_data for found tweets. 859 """ 860 cached_tweets = {} 861 862 for tweet_id in tweet_ids: 863 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 864 if cache_file.exists(): 865 try: 866 with open(cache_file, 'r') as f: 867 cached_data = json.load(f) 868 869 # Use longer cache times for older tweets (24 hours vs 1 hour) 870 from datetime import datetime, timedelta 871 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 872 tweet_created = cached_data.get('tweet_data', {}).get('created_at', '') 873 874 # Parse tweet creation time to determine age 875 try: 876 from dateutil.parser import parse 877 tweet_age = datetime.now() - parse(tweet_created) 878 cache_duration = timedelta(hours=24) if tweet_age > timedelta(hours=24) else timedelta(hours=1) 879 except: 880 cache_duration = timedelta(hours=1) # Default to 1 hour if parsing fails 881 882 if datetime.now() - cached_time < cache_duration: 883 cached_tweets[tweet_id] = cached_data.get('tweet_data') 884 logger.debug(f"Using cached tweet {tweet_id}") 885 886 except Exception as e: 887 logger.warning(f"Error loading cached tweet {tweet_id}: {e}") 888 889 return cached_tweets 890 891def save_cached_tweets(tweets_data: List[Dict], users_data: Dict[str, Dict] = None): 892 """Save individual tweets to cache for future reuse.""" 893 try: 894 X_CACHE_DIR.mkdir(exist_ok=True) 895 896 for tweet in tweets_data: 897 tweet_id = tweet.get('id') 898 if not tweet_id: 899 continue 900 901 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 902 903 # Include user data if available 904 tweet_with_user = tweet.copy() 905 if users_data and tweet.get('author_id') in users_data: 906 tweet_with_user['author_info'] = users_data[tweet.get('author_id')] 907 908 cache_data = { 909 'tweet_id': tweet_id, 910 'tweet_data': tweet_with_user, 911 'cached_at': datetime.now().isoformat() 912 } 913 914 with open(cache_file, 'w') as f: 915 json.dump(cache_data, f, indent=2) 916 917 logger.debug(f"Cached individual tweet {tweet_id}") 918 919 except Exception as e: 920 logger.error(f"Error caching individual tweets: {e}") 921 922def get_cached_user_info() -> Optional[Dict]: 923 """Load cached user info if available and not expired.""" 924 cache_file = X_CACHE_DIR / "user_info.json" 925 if cache_file.exists(): 926 try: 927 with open(cache_file, 'r') as f: 928 cached_data = json.load(f) 929 # Check if cache is recent (within 24 hours) 930 from datetime import datetime, timedelta 931 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 932 if datetime.now() - cached_time < timedelta(hours=24): 933 logger.debug("Using cached user info") 934 return cached_data.get('data') 935 else: 936 logger.debug("Cached user info expired (>24 hours old)") 937 except Exception as e: 938 logger.warning(f"Error loading cached user info: {e}") 939 return None 940 941def save_cached_user_info(user_data: Dict): 942 """Save user info to cache.""" 943 try: 944 X_CACHE_DIR.mkdir(exist_ok=True) 945 cache_file = X_CACHE_DIR / "user_info.json" 946 947 from datetime import datetime 948 cache_data = { 949 'data': user_data, 950 'cached_at': datetime.now().isoformat() 951 } 952 953 with open(cache_file, 'w') as f: 954 json.dump(cache_data, f, indent=2) 955 956 logger.debug(f"Cached user info: {user_data.get('username')}") 957 958 except Exception as e: 959 logger.error(f"Error caching user info: {e}") 960 961def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool: 962 """ 963 Determine if we have sufficient context to skip backfilling missing tweets. 964 965 Args: 966 tweets: List of tweets already in the thread 967 missing_tweet_ids: Set of missing tweet IDs we'd like to fetch 968 969 Returns: 970 True if context is sufficient, False if backfill is needed 971 """ 972 # If no missing tweets, context is sufficient 973 if not missing_tweet_ids: 974 return True 975 976 # If we have a substantial conversation (5+ tweets), likely sufficient 977 if len(tweets) >= 5: 978 logger.debug(f"Thread has {len(tweets)} tweets, considering sufficient") 979 return True 980 981 # If only a few missing tweets and we have some context, might be enough 982 if len(missing_tweet_ids) <= 2 and len(tweets) >= 3: 983 logger.debug(f"Only {len(missing_tweet_ids)} missing tweets with {len(tweets)} existing, considering sufficient") 984 return True 985 986 # Check if we have conversational flow (mentions between users) 987 has_conversation_flow = False 988 for tweet in tweets: 989 text = tweet.get('text', '').lower() 990 # Look for mentions, replies, or conversational indicators 991 if '@' in text or 'reply' in text or len([t for t in tweets if t.get('author_id') != tweet.get('author_id')]) > 1: 992 has_conversation_flow = True 993 break 994 995 # If we have clear conversational flow and reasonable length, sufficient 996 if has_conversation_flow and len(tweets) >= 2: 997 logger.debug("Thread has conversational flow, considering sufficient") 998 return True 999 1000 # Otherwise, we need to backfill 1001 logger.debug(f"Context insufficient: {len(tweets)} tweets, {len(missing_tweet_ids)} missing, no clear flow") 1002 return False 1003 1004def fetch_and_queue_mentions(username: str) -> int: 1005 """ 1006 Single-pass function to fetch new mentions and queue them. 1007 Returns number of new mentions found. 1008 """ 1009 try: 1010 client = create_x_client() 1011 1012 # Load last seen ID for incremental fetching 1013 last_seen_id = load_last_seen_id() 1014 1015 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}") 1016 1017 # Search for mentions - this calls GET /2/tweets/search/recent 1018 logger.debug(f"Calling search_mentions API for @{username}") 1019 mentions = client.search_mentions( 1020 username=username, 1021 since_id=last_seen_id, 1022 max_results=100 # Get as many as possible 1023 ) 1024 1025 if not mentions: 1026 logger.info("No new mentions found") 1027 return 0 1028 1029 # Process mentions (newest first, so reverse to process oldest first) 1030 mentions.reverse() 1031 new_count = 0 1032 1033 for mention in mentions: 1034 save_mention_to_queue(mention) 1035 new_count += 1 1036 1037 # Update last seen ID to the most recent mention 1038 if mentions: 1039 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent 1040 save_last_seen_id(most_recent_id) 1041 1042 logger.info(f"Queued {new_count} new X mentions") 1043 return new_count 1044 1045 except Exception as e: 1046 logger.error(f"Error fetching and queuing mentions: {e}") 1047 return 0 1048 1049# Simple test function 1050def get_my_user_info(): 1051 """Get the authenticated user's information to find correct user ID.""" 1052 try: 1053 client = create_x_client() 1054 1055 # Get authenticated user info using cached method 1056 print("Fetching authenticated user information...") 1057 user_data = client.get_user_info("id,name,username,description") 1058 1059 if user_data: 1060 print(f"✅ Found authenticated user:") 1061 print(f" ID: {user_data.get('id')}") 1062 print(f" Username: @{user_data.get('username')}") 1063 print(f" Name: {user_data.get('name')}") 1064 print(f" Description: {user_data.get('description', 'N/A')[:100]}...") 1065 print(f"\n🔧 Update your x_config.yaml with:") 1066 print(f" user_id: \"{user_data.get('id')}\"") 1067 return user_data 1068 else: 1069 print("❌ Failed to get user information") 1070 print(f"Response: {response}") 1071 return None 1072 1073 except Exception as e: 1074 print(f"Error getting user info: {e}") 1075 return None 1076 1077def test_search_mentions(): 1078 """Test the search-based mention detection.""" 1079 try: 1080 client = create_x_client() 1081 1082 # First get our username 1083 username = client.get_username() 1084 if not username: 1085 print("❌ Could not get username") 1086 return 1087 print(f"🔍 Searching for mentions of @{username}") 1088 1089 mentions = client.search_mentions(username, max_results=5) 1090 1091 if mentions: 1092 print(f"✅ Found {len(mentions)} mentions via search:") 1093 for mention in mentions: 1094 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...") 1095 else: 1096 print("No mentions found via search") 1097 1098 except Exception as e: 1099 print(f"Search test failed: {e}") 1100 1101def test_fetch_and_queue(): 1102 """Test the single-pass fetch and queue function.""" 1103 try: 1104 client = create_x_client() 1105 1106 # Get our username 1107 username = client.get_username() 1108 if not username: 1109 print("❌ Could not get username") 1110 return 1111 print(f"🔄 Fetching and queueing mentions for @{username}") 1112 1113 # Show current state 1114 last_seen = load_last_seen_id() 1115 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}") 1116 1117 # Fetch and queue 1118 new_count = fetch_and_queue_mentions(username) 1119 1120 if new_count > 0: 1121 print(f"✅ Queued {new_count} new mentions") 1122 print(f"📁 Check ./x_queue/ directory for queued mentions") 1123 1124 # Show updated state 1125 new_last_seen = load_last_seen_id() 1126 print(f"📍 Updated last seen ID: {new_last_seen}") 1127 else: 1128 print("ℹ️ No new mentions to queue") 1129 1130 except Exception as e: 1131 print(f"Fetch and queue test failed: {e}") 1132 1133def test_thread_context(): 1134 """Test thread context retrieval from a queued mention.""" 1135 try: 1136 import json 1137 1138 # Find a queued mention file 1139 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1140 if not queue_files: 1141 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1142 return 1143 1144 # Read the first mention 1145 mention_file = queue_files[0] 1146 with open(mention_file, 'r') as f: 1147 mention_data = json.load(f) 1148 1149 mention = mention_data['mention'] 1150 print(f"📄 Using mention: {mention.get('id')}") 1151 print(f"📝 Text: {mention.get('text')}") 1152 1153 # Check if it has a conversation_id 1154 conversation_id = mention.get('conversation_id') 1155 if not conversation_id: 1156 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.") 1157 return 1158 1159 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1160 1161 # Get thread context 1162 client = create_x_client() 1163 thread_data = client.get_thread_context(conversation_id) 1164 1165 if thread_data: 1166 tweets = thread_data.get('tweets', []) 1167 print(f"✅ Retrieved thread with {len(tweets)} tweets") 1168 1169 # Convert to YAML 1170 yaml_thread = thread_to_yaml_string(thread_data) 1171 1172 # Save thread context for inspection 1173 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml" 1174 with open(thread_file, 'w') as f: 1175 f.write(yaml_thread) 1176 1177 print(f"💾 Saved thread context to: {thread_file}") 1178 print("\n📋 Thread preview:") 1179 print(yaml_thread) 1180 else: 1181 print("❌ Failed to retrieve thread context") 1182 1183 except Exception as e: 1184 print(f"Thread context test failed: {e}") 1185 1186def test_letta_integration(agent_id: str = None): 1187 """Test sending X thread context to Letta agent.""" 1188 try: 1189 from letta_client import Letta 1190 import json 1191 import yaml 1192 1193 # Load X config to access letta section 1194 try: 1195 x_config = load_x_config() 1196 letta_config = x_config.get('letta', {}) 1197 api_key = letta_config.get('api_key') 1198 config_agent_id = letta_config.get('agent_id') 1199 1200 # Use agent_id from config if not provided as parameter 1201 if not agent_id: 1202 if config_agent_id: 1203 agent_id = config_agent_id 1204 print(f"ℹ️ Using agent_id from config: {agent_id}") 1205 else: 1206 print("❌ No agent_id found in x_config.yaml") 1207 print("Expected config structure:") 1208 print(" letta:") 1209 print(" agent_id: your-agent-id") 1210 return 1211 else: 1212 print(f"ℹ️ Using provided agent_id: {agent_id}") 1213 1214 if not api_key: 1215 # Try loading from environment as fallback 1216 import os 1217 api_key = os.getenv('LETTA_API_KEY') 1218 if not api_key: 1219 print("❌ LETTA_API_KEY not found in x_config.yaml or environment") 1220 print("Expected config structure:") 1221 print(" letta:") 1222 print(" api_key: your-letta-api-key") 1223 return 1224 else: 1225 print("ℹ️ Using LETTA_API_KEY from environment") 1226 else: 1227 print("ℹ️ Using LETTA_API_KEY from x_config.yaml") 1228 1229 except Exception as e: 1230 print(f"❌ Error loading config: {e}") 1231 return 1232 1233 letta_client = Letta(token=api_key, timeout=600) 1234 print(f"🤖 Connected to Letta, using agent: {agent_id}") 1235 1236 # Find a queued mention file 1237 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1238 if not queue_files: 1239 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1240 return 1241 1242 # Read the first mention 1243 mention_file = queue_files[0] 1244 with open(mention_file, 'r') as f: 1245 mention_data = json.load(f) 1246 1247 mention = mention_data['mention'] 1248 conversation_id = mention.get('conversation_id') 1249 1250 if not conversation_id: 1251 print("❌ No conversation_id found in mention.") 1252 return 1253 1254 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1255 1256 # Get thread context 1257 x_client = create_x_client() 1258 thread_data = x_client.get_thread_context(conversation_id) 1259 1260 if not thread_data: 1261 print("❌ Failed to retrieve thread context") 1262 return 1263 1264 # Convert to YAML 1265 yaml_thread = thread_to_yaml_string(thread_data) 1266 1267 # Create prompt for the agent 1268 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately. 1269 1270Here is the thread context: 1271 1272{yaml_thread} 1273 1274Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona.""" 1275 1276 prompt_char_count = len(prompt) 1277 print(f"📤 Sending thread context to Letta agent... | prompt: {prompt_char_count} chars") 1278 1279 # Print the prompt in a rich panel 1280 rprint(Panel(prompt, title=f"Prompt ({prompt_char_count} chars)", border_style="blue")) 1281 1282 # Send to Letta agent using streaming 1283 message_stream = letta_client.agents.messages.create_stream( 1284 agent_id=agent_id, 1285 messages=[{"role": "user", "content": prompt}], 1286 stream_tokens=False, 1287 max_steps=10 1288 ) 1289 1290 print("🔄 Streaming response from agent...") 1291 response_text = "" 1292 1293 for chunk in message_stream: 1294 print(chunk) 1295 if hasattr(chunk, 'message_type'): 1296 if chunk.message_type == 'assistant_message': 1297 print(f"🤖 Agent response: {chunk.content}") 1298 response_text = chunk.content 1299 elif chunk.message_type == 'reasoning_message': 1300 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...") 1301 elif chunk.message_type == 'tool_call_message': 1302 print(f"🔧 Agent tool call: {chunk.tool_call.name}") 1303 1304 if response_text: 1305 print(f"\n✅ Agent generated response:") 1306 print(f"📝 Response: {response_text}") 1307 else: 1308 print("❌ No response generated by agent") 1309 1310 except Exception as e: 1311 print(f"Letta integration test failed: {e}") 1312 import traceback 1313 traceback.print_exc() 1314 1315def test_x_client(): 1316 """Test the X client by fetching mentions.""" 1317 try: 1318 client = create_x_client() 1319 mentions = client.get_mentions(max_results=5) 1320 1321 if mentions: 1322 print(f"Successfully retrieved {len(mentions)} mentions:") 1323 for mention in mentions: 1324 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...") 1325 else: 1326 print("No mentions retrieved") 1327 1328 except Exception as e: 1329 print(f"Test failed: {e}") 1330 1331def reply_to_cameron_post(): 1332 """ 1333 Reply to Cameron's specific X post. 1334 1335 NOTE: This requires OAuth User Context authentication, not Bearer token. 1336 Current Bearer token is Application-Only which can't post. 1337 """ 1338 try: 1339 client = create_x_client() 1340 1341 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618 1342 cameron_post_id = "1950690566909710618" 1343 1344 # Simple reply message 1345 reply_text = "Hello from void! 🤖 Testing X integration." 1346 1347 print(f"Attempting to reply to post {cameron_post_id}") 1348 print(f"Reply text: {reply_text}") 1349 print("\nNOTE: This will fail with current Bearer token (Application-Only)") 1350 print("Posting requires OAuth User Context authentication") 1351 1352 result = client.post_reply(reply_text, cameron_post_id) 1353 1354 if result: 1355 print(f"✅ Successfully posted reply!") 1356 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}") 1357 else: 1358 print("❌ Failed to post reply (expected with current auth)") 1359 1360 except Exception as e: 1361 print(f"Reply failed: {e}") 1362 1363def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False): 1364 """ 1365 Process an X mention and generate a reply using the Letta agent. 1366 Similar to bsky.py process_mention but for X/Twitter. 1367 1368 Args: 1369 void_agent: The Letta agent instance 1370 x_client: The X API client 1371 mention_data: The mention data dictionary 1372 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 1373 testing_mode: If True, don't actually post to X 1374 1375 Returns: 1376 True: Successfully processed, remove from queue 1377 False: Failed but retryable, keep in queue 1378 None: Failed with non-retryable error, move to errors directory 1379 "no_reply": No reply was generated, move to no_reply directory 1380 """ 1381 try: 1382 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}") 1383 1384 # Extract mention details 1385 if isinstance(mention_data, dict): 1386 # Handle both raw mention and queued mention formats 1387 if 'mention' in mention_data: 1388 mention = mention_data['mention'] 1389 else: 1390 mention = mention_data 1391 else: 1392 mention = mention_data 1393 1394 mention_id = mention.get('id') 1395 mention_text = mention.get('text', '') 1396 author_id = mention.get('author_id') 1397 conversation_id = mention.get('conversation_id') 1398 in_reply_to_user_id = mention.get('in_reply_to_user_id') 1399 referenced_tweets = mention.get('referenced_tweets', []) 1400 1401 # Check downrank list - only respond to downranked users 10% of the time 1402 downrank_users = load_downrank_users() 1403 if not should_respond_to_downranked_user(str(author_id), downrank_users): 1404 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection") 1405 return "no_reply" 1406 1407 # Enhanced conversation tracking for debug - especially important for Grok handling 1408 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}") 1409 logger.info(f" Author ID: {author_id}") 1410 logger.info(f" Conversation ID: {conversation_id}") 1411 logger.info(f" In Reply To User ID: {in_reply_to_user_id}") 1412 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items") 1413 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets 1414 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}") 1415 logger.info(f" Text preview: {mention_text[:100]}...") 1416 1417 if not conversation_id: 1418 logger.warning(f"❌ No conversation_id found for mention {mention_id} - this may cause thread context issues") 1419 return None 1420 1421 # Get thread context with caching enabled for efficiency 1422 # Use mention_id as until_id to exclude tweets that occurred after this mention 1423 try: 1424 thread_data = x_client.get_thread_context(conversation_id, use_cache=True, until_id=mention_id) 1425 if not thread_data: 1426 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}") 1427 return False 1428 1429 # If this mention references a specific tweet, ensure we have that tweet in context 1430 if referenced_tweets: 1431 for ref in referenced_tweets: 1432 if ref.get('type') == 'replied_to': 1433 ref_id = ref.get('id') 1434 # Check if the referenced tweet is in our thread data 1435 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])] 1436 if ref_id and ref_id not in thread_tweet_ids: 1437 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch") 1438 try: 1439 # Fetch the missing referenced tweet directly 1440 endpoint = f"/tweets/{ref_id}" 1441 params = { 1442 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 1443 "user.fields": "id,name,username", 1444 "expansions": "author_id" 1445 } 1446 logger.debug(f"Fetching individual missing tweet: GET {endpoint}") 1447 response = x_client._make_request(endpoint, params) 1448 if response and "data" in response: 1449 missing_tweet = response["data"] 1450 if missing_tweet.get('conversation_id') == conversation_id: 1451 # Add to thread data 1452 if 'tweets' not in thread_data: 1453 thread_data['tweets'] = [] 1454 thread_data['tweets'].append(missing_tweet) 1455 1456 # Add user data if available 1457 if "includes" in response and "users" in response["includes"]: 1458 if 'users' not in thread_data: 1459 thread_data['users'] = {} 1460 for user in response["includes"]["users"]: 1461 thread_data['users'][user["id"]] = user 1462 1463 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context") 1464 else: 1465 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}") 1466 except Exception as e: 1467 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}") 1468 1469 # Enhanced thread context debugging 1470 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}") 1471 thread_posts = thread_data.get('tweets', []) 1472 thread_users = thread_data.get('users', {}) 1473 logger.info(f" Posts in thread: {len(thread_posts)}") 1474 logger.info(f" Users in thread: {len(thread_users)}") 1475 1476 # Log thread participants for Grok detection 1477 for user_id, user_info in thread_users.items(): 1478 username = user_info.get('username', 'unknown') 1479 name = user_info.get('name', 'Unknown') 1480 is_verified = user_info.get('verified', False) 1481 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}") 1482 1483 # Special logging for Grok or AI-related users 1484 if 'grok' in username.lower() or 'grok' in name.lower(): 1485 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})") 1486 1487 # Log conversation structure 1488 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts 1489 post_id = post.get('id') 1490 post_author = post.get('author_id') 1491 post_text = post.get('text', '')[:50] 1492 is_reply = 'in_reply_to_user_id' in post 1493 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...") 1494 1495 except Exception as e: 1496 logger.error(f"❌ Error getting thread context: {e}") 1497 return False 1498 1499 # Convert to YAML string 1500 thread_context = thread_to_yaml_string(thread_data) 1501 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters") 1502 1503 # Save comprehensive conversation data for debugging 1504 try: 1505 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1506 debug_dir.mkdir(parents=True, exist_ok=True) 1507 1508 # Save raw thread data (JSON) 1509 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f: 1510 json.dump(thread_data, f, indent=2) 1511 1512 # Save YAML thread context 1513 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f: 1514 f.write(thread_context) 1515 1516 # Save mention processing debug info 1517 debug_info = { 1518 'processed_at': datetime.now().isoformat(), 1519 'mention_id': mention_id, 1520 'conversation_id': conversation_id, 1521 'author_id': author_id, 1522 'in_reply_to_user_id': in_reply_to_user_id, 1523 'referenced_tweets': referenced_tweets, 1524 'thread_stats': { 1525 'total_posts': len(thread_posts), 1526 'total_users': len(thread_users), 1527 'yaml_length': len(thread_context) 1528 }, 1529 'users_in_conversation': { 1530 user_id: { 1531 'username': user_info.get('username'), 1532 'name': user_info.get('name'), 1533 'verified': user_info.get('verified', False), 1534 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower() 1535 } 1536 for user_id, user_info in thread_users.items() 1537 } 1538 } 1539 1540 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f: 1541 json.dump(debug_info, f, indent=2) 1542 1543 logger.info(f"💾 Saved conversation debug data to: {debug_dir}") 1544 1545 except Exception as debug_error: 1546 logger.warning(f"Failed to save debug data: {debug_error}") 1547 # Continue processing even if debug save fails 1548 1549 # Check for #voidstop 1550 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower(): 1551 logger.info("Found #voidstop, skipping this mention") 1552 return True 1553 1554 # Create prompt for Letta agent 1555 author_info = thread_data.get('users', {}).get(author_id, {}) 1556 author_username = author_info.get('username', 'unknown') 1557 author_name = author_info.get('name', author_username) 1558 1559 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}). 1560 1561MOST RECENT POST (the mention you're responding to): 1562"{mention_text}" 1563 1564FULL THREAD CONTEXT: 1565```yaml 1566{thread_context} 1567``` 1568 1569The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 1570 1571If you need to update user information, use the x_user_* tools. 1572 1573To reply, use the add_post_to_x_thread tool: 1574- Each call creates one post (max 280 characters) 1575- For most responses, a single call is sufficient 1576- Only use multiple calls for threaded replies when: 1577 * The topic requires extended explanation that cannot fit in 280 characters 1578 * You're explicitly asked for a detailed/long response 1579 * The conversation naturally benefits from a structured multi-part answer 1580- Avoid unnecessary threads - be concise when possible""" 1581 1582 # Log mention processing 1583 title = f"X MENTION FROM @{author_username}" 1584 print(f"\n{title}") 1585 print(f" {'' * len(title)}") 1586 for line in mention_text.split('\n'): 1587 print(f" {line}") 1588 1589 # Send to Letta agent 1590 from letta_client import Letta 1591 1592 config = get_x_letta_config() 1593 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 1594 1595 prompt_char_count = len(prompt) 1596 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars | prompt: {prompt_char_count} chars") 1597 1598 try: 1599 # Use streaming to avoid timeout errors 1600 message_stream = letta_client.agents.messages.create_stream( 1601 agent_id=void_agent.id, 1602 messages=[{"role": "user", "content": prompt}], 1603 stream_tokens=False, 1604 max_steps=100 1605 ) 1606 1607 # Collect streaming response (simplified version of bsky.py logic) 1608 all_messages = [] 1609 for chunk in message_stream: 1610 if hasattr(chunk, 'message_type'): 1611 if chunk.message_type == 'reasoning_message': 1612 print("\n◆ Reasoning") 1613 print(" ─────────") 1614 for line in chunk.reasoning.split('\n'): 1615 print(f" {line}") 1616 elif chunk.message_type == 'tool_call_message': 1617 tool_name = chunk.tool_call.name 1618 if tool_name == 'add_post_to_x_thread': 1619 try: 1620 args = json.loads(chunk.tool_call.arguments) 1621 text = args.get('text', '') 1622 if text: 1623 print("\n✎ X Post") 1624 print(" ────────") 1625 for line in text.split('\n'): 1626 print(f" {line}") 1627 except: 1628 pass 1629 elif tool_name == 'halt_activity': 1630 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT") 1631 if queue_filepath and queue_filepath.exists(): 1632 queue_filepath.unlink() 1633 logger.info(f"Deleted queue file: {queue_filepath.name}") 1634 logger.info("=== X BOT TERMINATED BY AGENT ===") 1635 exit(0) 1636 elif chunk.message_type == 'tool_return_message': 1637 tool_name = chunk.name 1638 status = chunk.status 1639 if status == 'success' and tool_name == 'add_post_to_x_thread': 1640 print("\n✓ X Post Queued") 1641 print(" ──────────────") 1642 print(" Post queued successfully") 1643 elif chunk.message_type == 'assistant_message': 1644 print("\n▶ Assistant Response") 1645 print(" ──────────────────") 1646 for line in chunk.content.split('\n'): 1647 print(f" {line}") 1648 1649 all_messages.append(chunk) 1650 if str(chunk) == 'done': 1651 break 1652 1653 # Convert streaming response for compatibility 1654 message_response = type('StreamingResponse', (), { 1655 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 1656 })() 1657 1658 except Exception as api_error: 1659 logger.error(f"Letta API error: {api_error}") 1660 raise 1661 1662 # Extract successful add_post_to_x_thread tool calls 1663 reply_candidates = [] 1664 tool_call_results = {} 1665 ignored_notification = False 1666 ack_note = None # Track any note from annotate_ack tool 1667 1668 # First pass: collect tool return statuses 1669 for message in message_response.messages: 1670 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 1671 if message.name == 'add_post_to_x_thread': 1672 tool_call_results[message.tool_call_id] = message.status 1673 elif message.name == 'ignore_notification': 1674 if message.status == 'success': 1675 ignored_notification = True 1676 logger.info("🚫 X notification ignored") 1677 1678 # Second pass: collect successful tool calls 1679 for message in message_response.messages: 1680 if hasattr(message, 'tool_call') and message.tool_call: 1681 # Collect annotate_ack tool calls 1682 if message.tool_call.name == 'annotate_ack': 1683 try: 1684 args = json.loads(message.tool_call.arguments) 1685 note = args.get('note', '') 1686 if note: 1687 ack_note = note 1688 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 1689 except json.JSONDecodeError as e: 1690 logger.error(f"Failed to parse annotate_ack arguments: {e}") 1691 1692 # Collect add_post_to_x_thread tool calls - only if they were successful 1693 elif message.tool_call.name == 'add_post_to_x_thread': 1694 tool_call_id = message.tool_call.tool_call_id 1695 tool_status = tool_call_results.get(tool_call_id, 'unknown') 1696 1697 if tool_status == 'success': 1698 try: 1699 args = json.loads(message.tool_call.arguments) 1700 reply_text = args.get('text', '') 1701 if reply_text: 1702 reply_candidates.append(reply_text) 1703 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...") 1704 except json.JSONDecodeError as e: 1705 logger.error(f"Failed to parse tool call arguments: {e}") 1706 1707 # Save agent response data to debug folder 1708 try: 1709 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1710 1711 # Save complete agent interaction 1712 agent_response_data = { 1713 'processed_at': datetime.now().isoformat(), 1714 'mention_id': mention_id, 1715 'conversation_id': conversation_id, 1716 'prompt_sent': prompt, 1717 'reply_candidates': reply_candidates, 1718 'ignored_notification': ignored_notification, 1719 'ack_note': ack_note, 1720 'tool_call_results': tool_call_results, 1721 'all_messages': [] 1722 } 1723 1724 # Convert messages to serializable format 1725 for message in message_response.messages: 1726 msg_data = { 1727 'message_type': getattr(message, 'message_type', 'unknown'), 1728 'content': getattr(message, 'content', ''), 1729 'reasoning': getattr(message, 'reasoning', ''), 1730 'status': getattr(message, 'status', ''), 1731 'name': getattr(message, 'name', ''), 1732 } 1733 1734 if hasattr(message, 'tool_call') and message.tool_call: 1735 msg_data['tool_call'] = { 1736 'name': message.tool_call.name, 1737 'arguments': message.tool_call.arguments, 1738 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '') 1739 } 1740 1741 agent_response_data['all_messages'].append(msg_data) 1742 1743 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f: 1744 json.dump(agent_response_data, f, indent=2) 1745 1746 logger.info(f"💾 Saved agent response debug data") 1747 1748 except Exception as debug_error: 1749 logger.warning(f"Failed to save agent response debug data: {debug_error}") 1750 1751 # Handle conflicts 1752 if reply_candidates and ignored_notification: 1753 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!") 1754 return False 1755 1756 if reply_candidates: 1757 # Post replies to X 1758 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X") 1759 1760 if len(reply_candidates) == 1: 1761 content = reply_candidates[0] 1762 title = f"Reply to @{author_username}" 1763 else: 1764 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)]) 1765 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)" 1766 1767 print(f"\n{title}") 1768 print(f" {'' * len(title)}") 1769 for line in content.split('\n'): 1770 print(f" {line}") 1771 1772 if testing_mode: 1773 logger.info("TESTING MODE: Skipping actual X post") 1774 return True 1775 else: 1776 # Post to X using thread approach 1777 success = post_x_thread_replies(x_client, mention_id, reply_candidates) 1778 if success: 1779 logger.info(f"Successfully replied to @{author_username} on X") 1780 1781 # Acknowledge the post we're replying to 1782 try: 1783 ack_result = acknowledge_x_post(x_client, mention_id, ack_note) 1784 if ack_result: 1785 if ack_note: 1786 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")") 1787 else: 1788 logger.info(f"Successfully acknowledged X post from @{author_username}") 1789 else: 1790 logger.warning(f"Failed to acknowledge X post from @{author_username}") 1791 except Exception as e: 1792 logger.error(f"Error acknowledging X post from @{author_username}: {e}") 1793 # Don't fail the entire operation if acknowledgment fails 1794 1795 return True 1796 else: 1797 logger.error(f"Failed to send reply to @{author_username} on X") 1798 return False 1799 else: 1800 if ignored_notification: 1801 logger.info(f"X mention from @{author_username} was explicitly ignored") 1802 return "ignored" 1803 else: 1804 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass") 1805 return False # Keep in queue for retry instead of removing 1806 1807 except Exception as e: 1808 logger.error(f"Error processing X mention: {e}") 1809 return False 1810 1811def acknowledge_x_post(x_client, post_id, note=None): 1812 """ 1813 Acknowledge an X post that we replied to. 1814 Uses the same Bluesky client and uploads to the void data repository on atproto, 1815 just like Bluesky acknowledgments. 1816 1817 Args: 1818 x_client: XClient instance (not used, kept for compatibility) 1819 post_id: The X post ID we're acknowledging 1820 note: Optional note to include with the acknowledgment 1821 1822 Returns: 1823 True if successful, False otherwise 1824 """ 1825 try: 1826 # Use Bluesky client to upload acks to the void data repository on atproto 1827 bsky_client = bsky_utils.default_login() 1828 1829 # Create a synthetic URI and CID for the X post 1830 # X posts don't have atproto URIs/CIDs, so we create identifiers 1831 post_uri = f"x://twitter.com/post/{post_id}" 1832 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts 1833 1834 # Use the same acknowledge_post function as Bluesky 1835 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note) 1836 1837 if ack_result: 1838 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else "")) 1839 return True 1840 else: 1841 logger.error(f"Failed to acknowledge X post {post_id}") 1842 return False 1843 1844 except Exception as e: 1845 logger.error(f"Error acknowledging X post {post_id}: {e}") 1846 return False 1847 1848def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages): 1849 """ 1850 Post a series of replies to X, threading them properly. 1851 1852 Args: 1853 x_client: XClient instance 1854 in_reply_to_tweet_id: The original tweet ID to reply to 1855 reply_messages: List of reply text strings 1856 1857 Returns: 1858 True if successful, False otherwise 1859 """ 1860 try: 1861 current_reply_id = in_reply_to_tweet_id 1862 1863 for i, reply_text in enumerate(reply_messages): 1864 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...") 1865 1866 result = x_client.post_reply(reply_text, current_reply_id) 1867 1868 if result and 'data' in result: 1869 new_tweet_id = result['data']['id'] 1870 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}") 1871 # For threading, the next reply should reply to this one 1872 current_reply_id = new_tweet_id 1873 else: 1874 logger.error(f"Failed to post X reply {i+1}") 1875 return False 1876 1877 return True 1878 1879 except Exception as e: 1880 logger.error(f"Error posting X thread replies: {e}") 1881 return False 1882 1883def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False): 1884 """ 1885 Load and process all X mentions from the queue. 1886 Similar to bsky.py load_and_process_queued_notifications but for X. 1887 """ 1888 try: 1889 # Get all X mention files in queue directory 1890 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1891 1892 if not queue_files: 1893 return 1894 1895 # Load file metadata and sort by creation time (chronological order) 1896 file_metadata = [] 1897 for filepath in queue_files: 1898 try: 1899 with open(filepath, 'r') as f: 1900 queue_data = json.load(f) 1901 mention_data = queue_data.get('mention', queue_data) 1902 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing 1903 file_metadata.append((created_at, filepath)) 1904 except Exception as e: 1905 logger.warning(f"Error reading queue file {filepath.name}: {e}") 1906 # Add with default timestamp so it still gets processed 1907 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath)) 1908 1909 # Sort by creation time (oldest first) 1910 file_metadata.sort(key=lambda x: x[0]) 1911 1912 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order") 1913 1914 for i, (created_at, filepath) in enumerate(file_metadata, 1): 1915 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})") 1916 1917 try: 1918 # Load mention data 1919 with open(filepath, 'r') as f: 1920 queue_data = json.load(f) 1921 1922 mention_data = queue_data.get('mention', queue_data) 1923 1924 # Process the mention 1925 success = process_x_mention(void_agent, x_client, mention_data, 1926 queue_filepath=filepath, testing_mode=testing_mode) 1927 1928 except XRateLimitError: 1929 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning") 1930 break 1931 1932 except Exception as e: 1933 logger.error(f"Error processing X queue file {filepath.name}: {e}") 1934 continue 1935 1936 # Handle file based on processing result 1937 if success: 1938 if testing_mode: 1939 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}") 1940 else: 1941 filepath.unlink() 1942 logger.info(f"Successfully processed and removed X file: {filepath.name}") 1943 1944 # Mark as processed 1945 processed_mentions = load_processed_mentions() 1946 processed_mentions.add(mention_data.get('id')) 1947 save_processed_mentions(processed_mentions) 1948 1949 elif success is None: # Move to error directory 1950 error_dir = X_QUEUE_DIR / "errors" 1951 error_dir.mkdir(exist_ok=True) 1952 error_path = error_dir / filepath.name 1953 filepath.rename(error_path) 1954 logger.warning(f"Moved X file {filepath.name} to errors directory") 1955 1956 elif success == "no_reply": # Move to no_reply directory 1957 no_reply_dir = X_QUEUE_DIR / "no_reply" 1958 no_reply_dir.mkdir(exist_ok=True) 1959 no_reply_path = no_reply_dir / filepath.name 1960 filepath.rename(no_reply_path) 1961 logger.info(f"Moved X file {filepath.name} to no_reply directory") 1962 1963 elif success == "ignored": # Delete ignored notifications 1964 filepath.unlink() 1965 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}") 1966 1967 else: 1968 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry") 1969 1970 except Exception as e: 1971 logger.error(f"Error loading queued X mentions: {e}") 1972 1973def process_x_notifications(void_agent, x_client, testing_mode=False): 1974 """ 1975 Fetch new X mentions, queue them, and process the queue. 1976 Similar to bsky.py process_notifications but for X. 1977 """ 1978 try: 1979 # Get username for fetching mentions - uses cached data to avoid rate limits 1980 username = x_client.get_username() 1981 if not username: 1982 logger.error("Could not get username for X mentions") 1983 return 1984 1985 # Fetch and queue new mentions 1986 new_count = fetch_and_queue_mentions(username) 1987 1988 if new_count > 0: 1989 logger.info(f"Found {new_count} new X mentions to process") 1990 1991 # Process the entire queue 1992 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 1993 1994 except Exception as e: 1995 logger.error(f"Error processing X notifications: {e}") 1996 1997def initialize_x_void(): 1998 """Initialize the void agent for X operations.""" 1999 logger.info("Starting void agent initialization for X...") 2000 2001 from letta_client import Letta 2002 2003 # Get config 2004 config = get_x_letta_config() 2005 client = Letta(token=config['api_key'], timeout=config['timeout']) 2006 agent_id = config['agent_id'] 2007 2008 try: 2009 void_agent = client.agents.retrieve(agent_id=agent_id) 2010 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})") 2011 except Exception as e: 2012 logger.error(f"Failed to load void agent {agent_id}: {e}") 2013 raise e 2014 2015 # Ensure correct tools are attached for X 2016 logger.info("Configuring tools for X platform...") 2017 try: 2018 from tool_manager import ensure_platform_tools 2019 ensure_platform_tools('x', void_agent.id, config['api_key']) 2020 except Exception as e: 2021 logger.error(f"Failed to configure platform tools: {e}") 2022 logger.warning("Continuing with existing tool configuration") 2023 2024 # Log agent details 2025 logger.info(f"X Void agent details - ID: {void_agent.id}") 2026 logger.info(f"Agent name: {void_agent.name}") 2027 2028 return void_agent 2029 2030def x_main_loop(testing_mode=False, cleanup_interval=10): 2031 """ 2032 Main X bot loop that continuously monitors for mentions and processes them. 2033 Similar to bsky.py main() but for X/Twitter. 2034 2035 Args: 2036 testing_mode: If True, don't actually post to X 2037 cleanup_interval: Run user block cleanup every N cycles (0 to disable) 2038 """ 2039 import time 2040 from time import sleep 2041 from letta_client import Letta 2042 2043 logger.info("=== STARTING X VOID BOT ===") 2044 2045 # Configure logging from config file 2046 setup_logging_from_config() 2047 2048 # Initialize void agent 2049 void_agent = initialize_x_void() 2050 logger.info(f"X void agent initialized: {void_agent.id}") 2051 2052 # Initialize X client 2053 x_client = create_x_client() 2054 logger.info("Connected to X API") 2055 2056 # Get Letta client for periodic cleanup 2057 config = get_x_letta_config() 2058 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 2059 2060 # Main loop 2061 FETCH_DELAY_SEC = 120 # Check every 2 minutes for X mentions (reduced from 60s to conserve API calls) 2062 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds") 2063 2064 if testing_mode: 2065 logger.info("=== RUNNING IN X TESTING MODE ===") 2066 logger.info(" - No messages will be sent to X") 2067 logger.info(" - Queue files will not be deleted") 2068 2069 if cleanup_interval > 0: 2070 logger.info(f"User block cleanup enabled every {cleanup_interval} cycles") 2071 else: 2072 logger.info("User block cleanup disabled") 2073 2074 cycle_count = 0 2075 start_time = time.time() 2076 2077 while True: 2078 try: 2079 cycle_count += 1 2080 logger.info(f"=== X CYCLE {cycle_count} ===") 2081 2082 # Process X notifications (fetch, queue, and process) 2083 process_x_notifications(void_agent, x_client, testing_mode) 2084 2085 # Log cycle completion 2086 elapsed_time = time.time() - start_time 2087 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes") 2088 2089 sleep(FETCH_DELAY_SEC) 2090 2091 except KeyboardInterrupt: 2092 elapsed_time = time.time() - start_time 2093 logger.info("=== X BOT STOPPED BY USER ===") 2094 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes") 2095 break 2096 except Exception as e: 2097 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===") 2098 logger.error(f"Error details: {e}") 2099 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...") 2100 sleep(FETCH_DELAY_SEC * 2) 2101 2102def process_queue_only(testing_mode=False): 2103 """ 2104 Process all queued X mentions without fetching new ones. 2105 Useful for rate limit management - queue first, then process separately. 2106 2107 Args: 2108 testing_mode: If True, don't actually post to X and keep queue files 2109 """ 2110 logger.info("=== PROCESSING X QUEUE ONLY ===") 2111 2112 if testing_mode: 2113 logger.info("=== RUNNING IN X TESTING MODE ===") 2114 logger.info(" - No messages will be sent to X") 2115 logger.info(" - Queue files will not be deleted") 2116 2117 try: 2118 # Initialize void agent 2119 void_agent = initialize_x_void() 2120 logger.info(f"X void agent initialized: {void_agent.id}") 2121 2122 # Initialize X client 2123 x_client = create_x_client() 2124 logger.info("Connected to X API") 2125 2126 # Process the queue without fetching new mentions 2127 logger.info("Processing existing X queue...") 2128 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 2129 2130 logger.info("=== X QUEUE PROCESSING COMPLETE ===") 2131 2132 except Exception as e: 2133 logger.error(f"Error processing X queue: {e}") 2134 raise 2135 2136def x_notification_loop(): 2137 """ 2138 DEPRECATED: Old X notification loop using search-based mention detection. 2139 Use x_main_loop() instead for the full bot experience. 2140 """ 2141 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.") 2142 x_main_loop() 2143 2144if __name__ == "__main__": 2145 import sys 2146 import argparse 2147 2148 if len(sys.argv) > 1: 2149 if sys.argv[1] == "bot": 2150 # Main bot with optional --test flag and cleanup interval 2151 parser = argparse.ArgumentParser(description='X Void Bot') 2152 parser.add_argument('command', choices=['bot']) 2153 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)') 2154 parser.add_argument('--cleanup-interval', type=int, default=10, 2155 help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 2156 args = parser.parse_args() 2157 x_main_loop(testing_mode=args.test, cleanup_interval=args.cleanup_interval) 2158 elif sys.argv[1] == "loop": 2159 x_notification_loop() 2160 elif sys.argv[1] == "reply": 2161 reply_to_cameron_post() 2162 elif sys.argv[1] == "me": 2163 get_my_user_info() 2164 elif sys.argv[1] == "search": 2165 test_search_mentions() 2166 elif sys.argv[1] == "queue": 2167 test_fetch_and_queue() 2168 elif sys.argv[1] == "thread": 2169 test_thread_context() 2170 elif sys.argv[1] == "process": 2171 # Process all queued mentions with optional --test flag 2172 testing_mode = "--test" in sys.argv 2173 process_queue_only(testing_mode=testing_mode) 2174 elif sys.argv[1] == "letta": 2175 # Use specific agent ID if provided, otherwise use from config 2176 agent_id = sys.argv[2] if len(sys.argv) > 2 else None 2177 test_letta_integration(agent_id) 2178 elif sys.argv[1] == "downrank": 2179 # View or manage downrank list 2180 if len(sys.argv) > 2 and sys.argv[2] == "list": 2181 downrank_users = load_downrank_users() 2182 if downrank_users: 2183 print(f"📋 Downrank users ({len(downrank_users)} total):") 2184 for user_id in sorted(downrank_users): 2185 print(f" - {user_id}") 2186 else: 2187 print("📋 No downrank users configured") 2188 else: 2189 print("Usage: python x.py downrank list") 2190 print(" list - Show all downranked user IDs") 2191 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list") 2192 else: 2193 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]") 2194 print(" bot - Run the main X bot (use --test for testing mode)") 2195 print(" Example: python x.py bot --test") 2196 print(" queue - Fetch and queue mentions only (no processing)") 2197 print(" process - Process all queued mentions only (no fetching)") 2198 print(" Example: python x.py process --test") 2199 print(" downrank - Manage downrank users (10% response rate)") 2200 print(" Example: python x.py downrank list") 2201 print(" loop - Run the old notification monitoring loop (deprecated)") 2202 print(" reply - Reply to Cameron's specific post") 2203 print(" me - Get authenticated user info and correct user ID") 2204 print(" search - Test search-based mention detection") 2205 print(" thread - Test thread context retrieval from queued mention") 2206 print(" letta - Test sending thread context to Letta agent") 2207 print(" Optional: python x.py letta <agent-id>") 2208 else: 2209 test_x_client()