a digital person for bluesky
at x 102 kB view raw
1import os 2import logging 3import requests 4import yaml 5import json 6import hashlib 7import random 8import time 9from typing import Optional, Dict, Any, List, Set 10from datetime import datetime 11from pathlib import Path 12from requests_oauthlib import OAuth1 13from rich import print as rprint 14from rich.panel import Panel 15from rich.text import Text 16 17import bsky_utils 18 19class XRateLimitError(Exception): 20 """Exception raised when X API rate limit is exceeded""" 21 pass 22 23 24# Configure logging 25logging.basicConfig( 26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 27) 28logger = logging.getLogger("x_client") 29 30# X-specific file paths 31X_QUEUE_DIR = Path("x_queue") 32X_CACHE_DIR = Path("x_cache") 33X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json") 34X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json") 35X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt") 36 37class XClient: 38 """X (Twitter) API client for fetching mentions and managing interactions.""" 39 40 def __init__(self, api_key: str, user_id: str, access_token: str = None, 41 consumer_key: str = None, consumer_secret: str = None, 42 access_token_secret: str = None): 43 self.api_key = api_key 44 self.access_token = access_token 45 self.user_id = user_id 46 self.base_url = "https://api.x.com/2" 47 48 # Check if we have OAuth 1.0a credentials 49 if (consumer_key and consumer_secret and access_token and access_token_secret): 50 # Use OAuth 1.0a for User Context 51 self.oauth = OAuth1( 52 consumer_key, 53 client_secret=consumer_secret, 54 resource_owner_key=access_token, 55 resource_owner_secret=access_token_secret 56 ) 57 self.headers = {"Content-Type": "application/json"} 58 self.auth_method = "oauth1a" 59 logger.info("Using OAuth 1.0a User Context authentication for X API") 60 elif access_token: 61 # Use OAuth 2.0 Bearer token for User Context 62 self.oauth = None 63 self.headers = { 64 "Authorization": f"Bearer {access_token}", 65 "Content-Type": "application/json" 66 } 67 self.auth_method = "oauth2_user" 68 logger.info("Using OAuth 2.0 User Context access token for X API") 69 else: 70 # Use Application-Only Bearer token 71 self.oauth = None 72 self.headers = { 73 "Authorization": f"Bearer {api_key}", 74 "Content-Type": "application/json" 75 } 76 self.auth_method = "bearer" 77 logger.info("Using Application-Only Bearer token for X API") 78 79 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]: 80 """Make a request to the X API with proper error handling and exponential backoff.""" 81 url = f"{self.base_url}{endpoint}" 82 83 for attempt in range(max_retries): 84 try: 85 if method.upper() == "GET": 86 if self.oauth: 87 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth) 88 else: 89 response = requests.get(url, headers=self.headers, params=params) 90 elif method.upper() == "POST": 91 if self.oauth: 92 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth) 93 else: 94 response = requests.post(url, headers=self.headers, json=data) 95 else: 96 raise ValueError(f"Unsupported HTTP method: {method}") 97 98 response.raise_for_status() 99 return response.json() 100 101 except requests.exceptions.HTTPError as e: 102 if response.status_code == 401: 103 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials") 104 logger.error(f"Response: {response.text}") 105 return None # Don't retry auth failures 106 elif response.status_code == 403: 107 logger.error(f"X API forbidden with {self.auth_method} - check app permissions") 108 logger.error(f"Response: {response.text}") 109 return None # Don't retry permission failures 110 elif response.status_code == 429: 111 if attempt < max_retries - 1: 112 # Exponential backoff: 60s, 120s, 240s 113 backoff_time = 60 * (2 ** attempt) 114 logger.warning(f"X API rate limit exceeded (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry") 115 logger.error(f"Response: {response.text}") 116 time.sleep(backoff_time) 117 continue 118 else: 119 logger.error("X API rate limit exceeded - max retries reached") 120 logger.error(f"Response: {response.text}") 121 raise XRateLimitError("X API rate limit exceeded") 122 else: 123 if attempt < max_retries - 1: 124 # Exponential backoff for other HTTP errors too 125 backoff_time = 30 * (2 ** attempt) 126 logger.warning(f"X API request failed (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 127 logger.error(f"Response: {response.text}") 128 time.sleep(backoff_time) 129 continue 130 else: 131 logger.error(f"X API request failed after {max_retries} attempts: {e}") 132 logger.error(f"Response: {response.text}") 133 return None 134 135 except Exception as e: 136 if attempt < max_retries - 1: 137 backoff_time = 15 * (2 ** attempt) 138 logger.warning(f"Unexpected error making X API request (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 139 time.sleep(backoff_time) 140 continue 141 else: 142 logger.error(f"Unexpected error making X API request after {max_retries} attempts: {e}") 143 return None 144 145 return None 146 147 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[Dict]: 148 """ 149 Fetch mentions for the configured user with user data. 150 151 Args: 152 since_id: Minimum Post ID to include (for getting newer mentions) 153 max_results: Number of results to return (5-100) 154 155 Returns: 156 Dict with 'mentions' and 'users' keys, or None if request failed 157 """ 158 endpoint = f"/users/{self.user_id}/mentions" 159 params = { 160 "max_results": min(max(max_results, 5), 100), # Ensure within API limits 161 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets", 162 "user.fields": "id,name,username", 163 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 164 } 165 166 if since_id: 167 params["since_id"] = since_id 168 169 logger.info(f"Fetching mentions for user {self.user_id}") 170 response = self._make_request(endpoint, params) 171 172 if response: 173 logger.debug(f"X API response: {response}") 174 175 if response and "data" in response: 176 mentions = response["data"] 177 users_data = {} 178 179 # Extract user data from includes 180 if "includes" in response and "users" in response["includes"]: 181 for user in response["includes"]["users"]: 182 users_data[user["id"]] = user 183 logger.info(f"Retrieved user data for {len(users_data)} users") 184 185 logger.info(f"Retrieved {len(mentions)} mentions") 186 return {"mentions": mentions, "users": users_data} 187 else: 188 if response: 189 logger.info(f"No mentions in response. Full response: {response}") 190 else: 191 logger.warning("Request failed - no response received") 192 return {"mentions": [], "users": {}} 193 194 def get_user_info(self, user_id: str) -> Optional[Dict]: 195 """Get information about a specific user.""" 196 endpoint = f"/users/{user_id}" 197 params = { 198 "user.fields": "id,name,username,description,public_metrics" 199 } 200 201 response = self._make_request(endpoint, params) 202 return response.get("data") if response else None 203 204 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]: 205 """ 206 Search for mentions using the search endpoint instead of mentions endpoint. 207 This might have better rate limits than the direct mentions endpoint. 208 209 Args: 210 username: Username to search for mentions of (without @) 211 max_results: Number of results to return (10-100) 212 since_id: Only return results newer than this tweet ID 213 214 Returns: 215 List of tweets mentioning the username 216 """ 217 endpoint = "/tweets/search/recent" 218 219 # Search for mentions of the username 220 query = f"@{username}" 221 222 params = { 223 "query": query, 224 "max_results": min(max(max_results, 10), 100), 225 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 226 "user.fields": "id,name,username", 227 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 228 } 229 230 if since_id: 231 params["since_id"] = since_id 232 233 logger.info(f"Searching for mentions of @{username}") 234 response = self._make_request(endpoint, params) 235 236 if response and "data" in response: 237 tweets = response["data"] 238 logger.info(f"Found {len(tweets)} mentions via search") 239 return tweets 240 else: 241 if response: 242 logger.info(f"No mentions found via search. Response: {response}") 243 else: 244 logger.warning("Search request failed") 245 return [] 246 247 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]: 248 """ 249 Get all tweets in a conversation thread up to a specific tweet ID. 250 251 Args: 252 conversation_id: The conversation ID to fetch (should be the original tweet ID) 253 use_cache: Whether to use cached data if available 254 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID) 255 256 Returns: 257 List of tweets in the conversation, ordered chronologically 258 """ 259 # Check cache first if enabled 260 if use_cache: 261 cached_data = get_cached_thread_context(conversation_id) 262 if cached_data: 263 return cached_data 264 265 # First, get the original tweet directly since it might not appear in conversation search 266 original_tweet = None 267 try: 268 endpoint = f"/tweets/{conversation_id}" 269 params = { 270 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 271 "user.fields": "id,name,username", 272 "expansions": "author_id" 273 } 274 response = self._make_request(endpoint, params) 275 if response and "data" in response: 276 original_tweet = response["data"] 277 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}") 278 except Exception as e: 279 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}") 280 281 # Then search for all tweets in this conversation 282 endpoint = "/tweets/search/recent" 283 params = { 284 "query": f"conversation_id:{conversation_id}", 285 "max_results": 100, # Get as many as possible 286 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 287 "user.fields": "id,name,username", 288 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id", 289 "sort_order": "recency" # Get newest first, we'll reverse later 290 } 291 292 # Add until_id parameter to exclude tweets after the mention being processed 293 if until_id: 294 params["until_id"] = until_id 295 logger.info(f"Using until_id={until_id} to exclude future tweets") 296 297 logger.info(f"Fetching thread context for conversation {conversation_id}") 298 response = self._make_request(endpoint, params) 299 300 tweets = [] 301 users_data = {} 302 303 # Collect tweets from search 304 if response and "data" in response: 305 tweets.extend(response["data"]) 306 # Store user data for reference 307 if "includes" in response and "users" in response["includes"]: 308 for user in response["includes"]["users"]: 309 users_data[user["id"]] = user 310 311 # Add original tweet if we got it and it's not already in the list 312 if original_tweet: 313 tweet_ids = [t.get('id') for t in tweets] 314 if original_tweet.get('id') not in tweet_ids: 315 tweets.append(original_tweet) 316 logger.info("Added original tweet to thread context") 317 318 # Attempt to fill gaps by fetching referenced tweets that are missing 319 # This helps with X API's incomplete conversation search results 320 tweet_ids = set(t.get('id') for t in tweets) 321 missing_tweet_ids = set() 322 critical_missing_ids = set() 323 324 # Collect referenced tweet IDs, prioritizing critical ones 325 for tweet in tweets: 326 referenced_tweets = tweet.get('referenced_tweets', []) 327 for ref in referenced_tweets: 328 ref_id = ref.get('id') 329 ref_type = ref.get('type') 330 if ref_id and ref_id not in tweet_ids: 331 missing_tweet_ids.add(ref_id) 332 # Prioritize direct replies and quoted tweets over retweets 333 if ref_type in ['replied_to', 'quoted']: 334 critical_missing_ids.add(ref_id) 335 336 # For rate limit efficiency, only fetch critical missing tweets if we have many 337 if len(missing_tweet_ids) > 10: 338 logger.info(f"Many missing tweets ({len(missing_tweet_ids)}), prioritizing {len(critical_missing_ids)} critical ones") 339 missing_tweet_ids = critical_missing_ids 340 341 # Context sufficiency check - skip backfill if we already have enough context 342 if has_sufficient_context(tweets, missing_tweet_ids): 343 logger.info("Thread has sufficient context, skipping missing tweet backfill") 344 missing_tweet_ids = set() 345 346 # Fetch missing referenced tweets in batches (more rate-limit friendly) 347 if missing_tweet_ids: 348 missing_list = list(missing_tweet_ids) 349 350 # First, check cache for missing tweets 351 cached_tweets = get_cached_tweets(missing_list) 352 for tweet_id, cached_tweet in cached_tweets.items(): 353 if cached_tweet.get('conversation_id') == conversation_id: 354 tweets.append(cached_tweet) 355 tweet_ids.add(tweet_id) 356 logger.info(f"Retrieved missing tweet from cache: {tweet_id}") 357 358 # Add user data if available in cache 359 if cached_tweet.get('author_info'): 360 author_id = cached_tweet.get('author_id') 361 if author_id: 362 users_data[author_id] = cached_tweet['author_info'] 363 364 # Only fetch tweets that weren't found in cache 365 uncached_ids = [tid for tid in missing_list if tid not in cached_tweets] 366 367 if uncached_ids: 368 batch_size = 100 # X API limit for bulk tweet lookup 369 370 for i in range(0, len(uncached_ids), batch_size): 371 batch_ids = uncached_ids[i:i + batch_size] 372 try: 373 endpoint = "/tweets" 374 params = { 375 "ids": ",".join(batch_ids), 376 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 377 "user.fields": "id,name,username", 378 "expansions": "author_id" 379 } 380 response = self._make_request(endpoint, params) 381 382 if response and "data" in response: 383 fetched_tweets = [] 384 batch_users_data = {} 385 386 for missing_tweet in response["data"]: 387 # Only add if it's actually part of this conversation 388 if missing_tweet.get('conversation_id') == conversation_id: 389 tweets.append(missing_tweet) 390 tweet_ids.add(missing_tweet.get('id')) 391 fetched_tweets.append(missing_tweet) 392 logger.info(f"Retrieved missing referenced tweet: {missing_tweet.get('id')}") 393 394 # Add user data if available 395 if "includes" in response and "users" in response["includes"]: 396 for user in response["includes"]["users"]: 397 users_data[user["id"]] = user 398 batch_users_data[user["id"]] = user 399 400 # Cache the newly fetched tweets 401 if fetched_tweets: 402 save_cached_tweets(fetched_tweets, batch_users_data) 403 404 logger.info(f"Batch fetched {len(response['data'])} missing tweets from {len(batch_ids)} requested") 405 406 # Handle partial success - log any missing tweets that weren't found 407 if response and "errors" in response: 408 for error in response["errors"]: 409 logger.warning(f"Could not fetch tweet {error.get('resource_id')}: {error.get('title')}") 410 411 except Exception as e: 412 logger.warning(f"Could not fetch batch of missing tweets {batch_ids[:3]}...: {e}") 413 else: 414 logger.info(f"All {len(missing_list)} missing tweets found in cache") 415 416 if tweets: 417 # Filter out tweets that occur after until_id (if specified) 418 if until_id: 419 original_count = len(tweets) 420 # Convert until_id to int for comparison (Twitter IDs are sequential) 421 until_id_int = int(until_id) 422 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int] 423 filtered_count = len(tweets) 424 if original_count != filtered_count: 425 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}") 426 427 # Sort chronologically (oldest first) 428 tweets.sort(key=lambda x: x.get('created_at', '')) 429 logger.info(f"Retrieved {len(tweets)} tweets in thread") 430 431 thread_data = {"tweets": tweets, "users": users_data} 432 433 # Cache individual tweets from the thread for future backfill 434 save_cached_tweets(tweets, users_data) 435 436 # Cache the result 437 if use_cache: 438 save_cached_thread_context(conversation_id, thread_data) 439 440 return thread_data 441 else: 442 logger.warning("No tweets found for thread context") 443 return None 444 445 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]: 446 """ 447 Post a reply to a specific tweet. 448 449 Args: 450 reply_text: The text content of the reply 451 in_reply_to_tweet_id: The ID of the tweet to reply to 452 453 Returns: 454 Response data if successful, None if failed 455 """ 456 endpoint = "/tweets" 457 458 payload = { 459 "text": reply_text, 460 "reply": { 461 "in_reply_to_tweet_id": in_reply_to_tweet_id 462 } 463 } 464 465 logger.info(f"Attempting to post reply with {self.auth_method} authentication") 466 result = self._make_request(endpoint, method="POST", data=payload) 467 468 if result: 469 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}") 470 return result 471 else: 472 logger.error("Failed to post reply") 473 return None 474 475 def post_tweet(self, tweet_text: str) -> Optional[Dict]: 476 """ 477 Post a standalone tweet (not a reply). 478 479 Args: 480 tweet_text: The text content of the tweet (max 280 characters) 481 482 Returns: 483 Response data if successful, None if failed 484 """ 485 endpoint = "/tweets" 486 487 # Validate tweet length 488 if len(tweet_text) > 280: 489 logger.error(f"Tweet text too long: {len(tweet_text)} characters (max 280)") 490 return None 491 492 payload = { 493 "text": tweet_text 494 } 495 496 logger.info(f"Attempting to post tweet with {self.auth_method} authentication") 497 result = self._make_request(endpoint, method="POST", data=payload) 498 499 if result: 500 logger.info(f"Successfully posted tweet") 501 return result 502 else: 503 logger.error("Failed to post tweet") 504 return None 505 506def load_x_config(config_path: str = "config.yaml") -> Dict[str, str]: 507 """Load X configuration from config file.""" 508 try: 509 with open(config_path, 'r') as f: 510 config = yaml.safe_load(f) 511 512 x_config = config.get('x', {}) 513 if not x_config.get('api_key') or not x_config.get('user_id'): 514 raise ValueError("X API key and user_id must be configured in config.yaml") 515 516 return x_config 517 except Exception as e: 518 logger.error(f"Failed to load X configuration: {e}") 519 raise 520 521def create_x_client(config_path: str = "config.yaml") -> XClient: 522 """Create and return an X client with configuration loaded from file.""" 523 config = load_x_config(config_path) 524 return XClient( 525 api_key=config['api_key'], 526 user_id=config['user_id'], 527 access_token=config.get('access_token'), 528 consumer_key=config.get('consumer_key'), 529 consumer_secret=config.get('consumer_secret'), 530 access_token_secret=config.get('access_token_secret') 531 ) 532 533def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str: 534 """ 535 Convert a mention object to a YAML string for better AI comprehension. 536 Similar to thread_to_yaml_string in bsky_utils.py 537 """ 538 # Extract relevant fields 539 simplified_mention = { 540 'id': mention.get('id'), 541 'text': mention.get('text'), 542 'author_id': mention.get('author_id'), 543 'created_at': mention.get('created_at'), 544 'in_reply_to_user_id': mention.get('in_reply_to_user_id') 545 } 546 547 # Add user information if available 548 if users_data and mention.get('author_id') in users_data: 549 user = users_data[mention.get('author_id')] 550 simplified_mention['author'] = { 551 'username': user.get('username'), 552 'name': user.get('name') 553 } 554 555 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False) 556 557def thread_to_yaml_string(thread_data: Dict) -> str: 558 """ 559 Convert X thread context to YAML string for AI comprehension. 560 Similar to Bluesky's thread_to_yaml_string function. 561 562 Args: 563 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 564 565 Returns: 566 YAML string representation of the thread 567 """ 568 if not thread_data or "tweets" not in thread_data: 569 return "conversation: []\n" 570 571 tweets = thread_data["tweets"] 572 users_data = thread_data.get("users", {}) 573 574 simplified_thread = { 575 "conversation": [] 576 } 577 578 for tweet in tweets: 579 # Get user info 580 author_id = tweet.get('author_id') 581 author_info = {} 582 if author_id and author_id in users_data: 583 user = users_data[author_id] 584 author_info = { 585 'username': user.get('username'), 586 'name': user.get('name') 587 } 588 589 # Build tweet object (simplified for AI consumption) 590 tweet_obj = { 591 'text': tweet.get('text'), 592 'created_at': tweet.get('created_at'), 593 'author': author_info, 594 'author_id': author_id # Include user ID for block management 595 } 596 597 simplified_thread["conversation"].append(tweet_obj) 598 599 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False) 600 601 602 603# X Caching and Queue System Functions 604 605def load_last_seen_id() -> Optional[str]: 606 """Load the last seen mention ID for incremental fetching.""" 607 if X_LAST_SEEN_FILE.exists(): 608 try: 609 with open(X_LAST_SEEN_FILE, 'r') as f: 610 data = json.load(f) 611 return data.get('last_seen_id') 612 except Exception as e: 613 logger.error(f"Error loading last seen ID: {e}") 614 return None 615 616def save_last_seen_id(mention_id: str): 617 """Save the last seen mention ID.""" 618 try: 619 X_QUEUE_DIR.mkdir(exist_ok=True) 620 with open(X_LAST_SEEN_FILE, 'w') as f: 621 json.dump({ 622 'last_seen_id': mention_id, 623 'updated_at': datetime.now().isoformat() 624 }, f) 625 logger.debug(f"Saved last seen ID: {mention_id}") 626 except Exception as e: 627 logger.error(f"Error saving last seen ID: {e}") 628 629def load_processed_mentions() -> set: 630 """Load the set of processed mention IDs.""" 631 if X_PROCESSED_MENTIONS_FILE.exists(): 632 try: 633 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f: 634 data = json.load(f) 635 # Keep only recent entries (last 10000) 636 if len(data) > 10000: 637 data = data[-10000:] 638 save_processed_mentions(set(data)) 639 return set(data) 640 except Exception as e: 641 logger.error(f"Error loading processed mentions: {e}") 642 return set() 643 644def save_processed_mentions(processed_set: set): 645 """Save the set of processed mention IDs.""" 646 try: 647 X_QUEUE_DIR.mkdir(exist_ok=True) 648 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f: 649 json.dump(list(processed_set), f) 650 except Exception as e: 651 logger.error(f"Error saving processed mentions: {e}") 652 653def load_downrank_users() -> Set[str]: 654 """Load the set of user IDs that should be downranked (responded to 10% of the time).""" 655 try: 656 if not X_DOWNRANK_USERS_FILE.exists(): 657 return set() 658 659 downrank_users = set() 660 with open(X_DOWNRANK_USERS_FILE, 'r') as f: 661 for line in f: 662 line = line.strip() 663 # Skip empty lines and comments 664 if line and not line.startswith('#'): 665 downrank_users.add(line) 666 667 logger.info(f"Loaded {len(downrank_users)} downrank users") 668 return downrank_users 669 except Exception as e: 670 logger.error(f"Error loading downrank users: {e}") 671 return set() 672 673def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool: 674 """ 675 Check if we should respond to a downranked user. 676 Returns True 10% of the time for downranked users, True 100% of the time for others. 677 """ 678 if user_id not in downrank_users: 679 return True 680 681 # 10% chance for downranked users 682 should_respond = random.random() < 0.1 683 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)") 684 return should_respond 685 686def save_mention_to_queue(mention: Dict, users_data: Optional[Dict] = None): 687 """Save a mention to the queue directory for async processing with author info. 688 689 Args: 690 mention: The mention data from X API 691 users_data: Optional dict mapping user IDs to user data (including usernames) 692 """ 693 try: 694 mention_id = mention.get('id') 695 if not mention_id: 696 logger.error("Mention missing ID, cannot queue") 697 return 698 699 # Check if already processed 700 processed_mentions = load_processed_mentions() 701 if mention_id in processed_mentions: 702 logger.debug(f"Mention {mention_id} already processed, skipping") 703 return 704 705 # Create queue directory 706 X_QUEUE_DIR.mkdir(exist_ok=True) 707 708 # Create filename using hash (similar to Bluesky system) 709 mention_str = json.dumps(mention, sort_keys=True) 710 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16] 711 filename = f"x_mention_{mention_hash}.json" 712 713 queue_file = X_QUEUE_DIR / filename 714 715 # Extract author info if users_data provided 716 author_id = mention.get('author_id') 717 author_username = None 718 author_name = None 719 720 if users_data and author_id and author_id in users_data: 721 user_info = users_data[author_id] 722 author_username = user_info.get('username') 723 author_name = user_info.get('name') 724 logger.info(f"Caching author info for @{author_username} ({author_name})") 725 726 # Save mention data with enhanced debugging information 727 mention_data = { 728 'mention': mention, 729 'queued_at': datetime.now().isoformat(), 730 'type': 'x_mention', 731 # Cache author info for later use 732 'author_info': { 733 'username': author_username, 734 'name': author_name, 735 'id': author_id 736 }, 737 # Debug info for conversation tracking 738 'debug_info': { 739 'mention_id': mention.get('id'), 740 'author_id': mention.get('author_id'), 741 'conversation_id': mention.get('conversation_id'), 742 'in_reply_to_user_id': mention.get('in_reply_to_user_id'), 743 'referenced_tweets': mention.get('referenced_tweets', []), 744 'text_preview': mention.get('text', '')[:200], 745 'created_at': mention.get('created_at'), 746 'public_metrics': mention.get('public_metrics', {}), 747 'context_annotations': mention.get('context_annotations', []) 748 } 749 } 750 751 with open(queue_file, 'w') as f: 752 json.dump(mention_data, f, indent=2) 753 754 logger.info(f"Queued X mention {mention_id} -> {filename}") 755 756 except Exception as e: 757 logger.error(f"Error saving mention to queue: {e}") 758 759# X Cache Functions 760def get_cached_thread_context(conversation_id: str) -> Optional[Dict]: 761 """Load cached thread context if available.""" 762 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 763 if cache_file.exists(): 764 try: 765 with open(cache_file, 'r') as f: 766 cached_data = json.load(f) 767 # Check if cache is recent (within 1 hour) 768 from datetime import datetime, timedelta 769 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 770 if datetime.now() - cached_time < timedelta(hours=1): 771 logger.info(f"Using cached thread context for {conversation_id}") 772 return cached_data.get('thread_data') 773 except Exception as e: 774 logger.warning(f"Error loading cached thread context: {e}") 775 return None 776 777def save_cached_thread_context(conversation_id: str, thread_data: Dict): 778 """Save thread context to cache.""" 779 try: 780 X_CACHE_DIR.mkdir(exist_ok=True) 781 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 782 783 cache_data = { 784 'conversation_id': conversation_id, 785 'thread_data': thread_data, 786 'cached_at': datetime.now().isoformat() 787 } 788 789 with open(cache_file, 'w') as f: 790 json.dump(cache_data, f, indent=2) 791 792 logger.debug(f"Cached thread context for {conversation_id}") 793 except Exception as e: 794 logger.error(f"Error caching thread context: {e}") 795 796def get_cached_tweets(tweet_ids: List[str]) -> Dict[str, Dict]: 797 """ 798 Load cached individual tweets if available. 799 Returns dict mapping tweet_id -> tweet_data for found tweets. 800 """ 801 cached_tweets = {} 802 803 for tweet_id in tweet_ids: 804 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 805 if cache_file.exists(): 806 try: 807 with open(cache_file, 'r') as f: 808 cached_data = json.load(f) 809 810 # Use longer cache times for older tweets (24 hours vs 1 hour) 811 from datetime import datetime, timedelta 812 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 813 tweet_created = cached_data.get('tweet_data', {}).get('created_at', '') 814 815 # Parse tweet creation time to determine age 816 try: 817 from dateutil.parser import parse 818 tweet_age = datetime.now() - parse(tweet_created) 819 cache_duration = timedelta(hours=24) if tweet_age > timedelta(hours=24) else timedelta(hours=1) 820 except: 821 cache_duration = timedelta(hours=1) # Default to 1 hour if parsing fails 822 823 if datetime.now() - cached_time < cache_duration: 824 cached_tweets[tweet_id] = cached_data.get('tweet_data') 825 logger.debug(f"Using cached tweet {tweet_id}") 826 827 except Exception as e: 828 logger.warning(f"Error loading cached tweet {tweet_id}: {e}") 829 830 return cached_tweets 831 832def save_cached_tweets(tweets_data: List[Dict], users_data: Dict[str, Dict] = None): 833 """Save individual tweets to cache for future reuse.""" 834 try: 835 X_CACHE_DIR.mkdir(exist_ok=True) 836 837 for tweet in tweets_data: 838 tweet_id = tweet.get('id') 839 if not tweet_id: 840 continue 841 842 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 843 844 # Include user data if available 845 tweet_with_user = tweet.copy() 846 if users_data and tweet.get('author_id') in users_data: 847 tweet_with_user['author_info'] = users_data[tweet.get('author_id')] 848 849 cache_data = { 850 'tweet_id': tweet_id, 851 'tweet_data': tweet_with_user, 852 'cached_at': datetime.now().isoformat() 853 } 854 855 with open(cache_file, 'w') as f: 856 json.dump(cache_data, f, indent=2) 857 858 logger.debug(f"Cached individual tweet {tweet_id}") 859 860 except Exception as e: 861 logger.error(f"Error caching individual tweets: {e}") 862 863def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool: 864 """ 865 Determine if we have sufficient context to skip backfilling missing tweets. 866 867 Args: 868 tweets: List of tweets already in the thread 869 missing_tweet_ids: Set of missing tweet IDs we'd like to fetch 870 871 Returns: 872 True if context is sufficient, False if backfill is needed 873 """ 874 # If no missing tweets, context is sufficient 875 if not missing_tweet_ids: 876 return True 877 878 # If we have a substantial conversation (5+ tweets), likely sufficient 879 if len(tweets) >= 5: 880 logger.debug(f"Thread has {len(tweets)} tweets, considering sufficient") 881 return True 882 883 # If only a few missing tweets and we have some context, might be enough 884 if len(missing_tweet_ids) <= 2 and len(tweets) >= 3: 885 logger.debug(f"Only {len(missing_tweet_ids)} missing tweets with {len(tweets)} existing, considering sufficient") 886 return True 887 888 # Check if we have conversational flow (mentions between users) 889 has_conversation_flow = False 890 for tweet in tweets: 891 text = tweet.get('text', '').lower() 892 # Look for mentions, replies, or conversational indicators 893 if '@' in text or 'reply' in text or len([t for t in tweets if t.get('author_id') != tweet.get('author_id')]) > 1: 894 has_conversation_flow = True 895 break 896 897 # If we have clear conversational flow and reasonable length, sufficient 898 if has_conversation_flow and len(tweets) >= 2: 899 logger.debug("Thread has conversational flow, considering sufficient") 900 return True 901 902 # Otherwise, we need to backfill 903 logger.debug(f"Context insufficient: {len(tweets)} tweets, {len(missing_tweet_ids)} missing, no clear flow") 904 return False 905 906def fetch_and_queue_mentions(username: str) -> int: 907 """ 908 Single-pass function to fetch new mentions and queue them. 909 Returns number of new mentions found. 910 """ 911 try: 912 client = create_x_client() 913 914 # Load last seen ID for incremental fetching 915 last_seen_id = load_last_seen_id() 916 917 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}") 918 919 # Search for mentions 920 # Get mentions with user data 921 result = client.get_mentions( 922 since_id=last_seen_id, 923 max_results=100 # Get as many as possible 924 ) 925 926 if not result or not result["mentions"]: 927 logger.info("No new mentions found") 928 return 0 929 930 mentions = result["mentions"] 931 users_data = result["users"] 932 933 # Process mentions (newest first, so reverse to process oldest first) 934 mentions.reverse() 935 new_count = 0 936 937 for mention in mentions: 938 save_mention_to_queue(mention, users_data) 939 new_count += 1 940 941 # Update last seen ID to the most recent mention 942 if mentions: 943 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent 944 save_last_seen_id(most_recent_id) 945 946 logger.info(f"Queued {new_count} new X mentions") 947 return new_count 948 949 except Exception as e: 950 logger.error(f"Error fetching and queuing mentions: {e}") 951 return 0 952 953# Simple test function 954def get_my_user_info(): 955 """Get the authenticated user's information to find correct user ID.""" 956 try: 957 client = create_x_client() 958 959 # Use the /2/users/me endpoint to get authenticated user info 960 endpoint = "/users/me" 961 params = { 962 "user.fields": "id,name,username,description" 963 } 964 965 print("Fetching authenticated user information...") 966 response = client._make_request(endpoint, params=params) 967 968 if response and "data" in response: 969 user_data = response["data"] 970 print(f"✅ Found authenticated user:") 971 print(f" ID: {user_data.get('id')}") 972 print(f" Username: @{user_data.get('username')}") 973 print(f" Name: {user_data.get('name')}") 974 print(f" Description: {user_data.get('description', 'N/A')[:100]}...") 975 print(f"\n🔧 Update your config.yaml with:") 976 print(f" user_id: \"{user_data.get('id')}\"") 977 return user_data 978 else: 979 print("❌ Failed to get user information") 980 print(f"Response: {response}") 981 return None 982 983 except Exception as e: 984 print(f"Error getting user info: {e}") 985 return None 986 987def test_search_mentions(): 988 """Test the search-based mention detection.""" 989 try: 990 client = create_x_client() 991 992 # First get our username 993 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 994 if not user_info or "data" not in user_info: 995 print("❌ Could not get username") 996 return 997 998 username = user_info["data"]["username"] 999 print(f"🔍 Searching for mentions of @{username}") 1000 1001 mentions = client.search_mentions(username, max_results=5) 1002 1003 if mentions: 1004 print(f"✅ Found {len(mentions)} mentions via search:") 1005 for mention in mentions: 1006 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...") 1007 else: 1008 print("No mentions found via search") 1009 1010 except Exception as e: 1011 print(f"Search test failed: {e}") 1012 1013def test_fetch_and_queue(): 1014 """Test the single-pass fetch and queue function.""" 1015 try: 1016 client = create_x_client() 1017 1018 # Get our username 1019 user_info = client._make_request("/users/me", params={"user.fields": "username"}) 1020 if not user_info or "data" not in user_info: 1021 print("❌ Could not get username") 1022 return 1023 1024 username = user_info["data"]["username"] 1025 print(f"🔄 Fetching and queueing mentions for @{username}") 1026 1027 # Show current state 1028 last_seen = load_last_seen_id() 1029 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}") 1030 1031 # Fetch and queue 1032 new_count = fetch_and_queue_mentions(username) 1033 1034 if new_count > 0: 1035 print(f"✅ Queued {new_count} new mentions") 1036 print(f"📁 Check ./x_queue/ directory for queued mentions") 1037 1038 # Show updated state 1039 new_last_seen = load_last_seen_id() 1040 print(f"📍 Updated last seen ID: {new_last_seen}") 1041 else: 1042 print("ℹ️ No new mentions to queue") 1043 1044 except Exception as e: 1045 print(f"Fetch and queue test failed: {e}") 1046 1047def test_thread_context(): 1048 """Test thread context retrieval from a queued mention.""" 1049 try: 1050 import json 1051 1052 # Find a queued mention file 1053 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1054 if not queue_files: 1055 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1056 return 1057 1058 # Read the first mention 1059 mention_file = queue_files[0] 1060 with open(mention_file, 'r') as f: 1061 mention_data = json.load(f) 1062 1063 mention = mention_data['mention'] 1064 print(f"📄 Using mention: {mention.get('id')}") 1065 print(f"📝 Text: {mention.get('text')}") 1066 1067 # Check if it has a conversation_id 1068 conversation_id = mention.get('conversation_id') 1069 if not conversation_id: 1070 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.") 1071 return 1072 1073 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1074 1075 # Get thread context 1076 client = create_x_client() 1077 thread_data = client.get_thread_context(conversation_id) 1078 1079 if thread_data: 1080 tweets = thread_data.get('tweets', []) 1081 print(f"✅ Retrieved thread with {len(tweets)} tweets") 1082 1083 # Convert to YAML 1084 yaml_thread = thread_to_yaml_string(thread_data) 1085 1086 # Save thread context for inspection 1087 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml" 1088 with open(thread_file, 'w') as f: 1089 f.write(yaml_thread) 1090 1091 print(f"💾 Saved thread context to: {thread_file}") 1092 print("\n📋 Thread preview:") 1093 print(yaml_thread) 1094 else: 1095 print("❌ Failed to retrieve thread context") 1096 1097 except Exception as e: 1098 print(f"Thread context test failed: {e}") 1099 1100def test_letta_integration(agent_id: str = None): 1101 """Test sending X thread context to Letta agent.""" 1102 try: 1103 from letta_client import Letta 1104 import json 1105 import yaml 1106 1107 # Load full config to access letta section 1108 try: 1109 with open("config.yaml", 'r') as f: 1110 full_config = yaml.safe_load(f) 1111 1112 letta_config = full_config.get('letta', {}) 1113 api_key = letta_config.get('api_key') 1114 config_agent_id = letta_config.get('agent_id') 1115 1116 # Use agent_id from config if not provided as parameter 1117 if not agent_id: 1118 if config_agent_id: 1119 agent_id = config_agent_id 1120 print(f"ℹ️ Using agent_id from config: {agent_id}") 1121 else: 1122 print("❌ No agent_id found in config.yaml") 1123 print("Expected config structure:") 1124 print(" letta:") 1125 print(" agent_id: your-agent-id") 1126 return 1127 else: 1128 print(f"ℹ️ Using provided agent_id: {agent_id}") 1129 1130 if not api_key: 1131 # Try loading from environment as fallback 1132 import os 1133 api_key = os.getenv('LETTA_API_KEY') 1134 if not api_key: 1135 print("❌ LETTA_API_KEY not found in config.yaml or environment") 1136 print("Expected config structure:") 1137 print(" letta:") 1138 print(" api_key: your-letta-api-key") 1139 return 1140 else: 1141 print("ℹ️ Using LETTA_API_KEY from environment") 1142 else: 1143 print("ℹ️ Using LETTA_API_KEY from config.yaml") 1144 1145 except Exception as e: 1146 print(f"❌ Error loading config: {e}") 1147 return 1148 1149 letta_client = Letta(token=api_key, timeout=600) 1150 print(f"🤖 Connected to Letta, using agent: {agent_id}") 1151 1152 # Find a queued mention file 1153 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1154 if not queue_files: 1155 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1156 return 1157 1158 # Read the first mention 1159 mention_file = queue_files[0] 1160 with open(mention_file, 'r') as f: 1161 mention_data = json.load(f) 1162 1163 mention = mention_data['mention'] 1164 conversation_id = mention.get('conversation_id') 1165 1166 if not conversation_id: 1167 print("❌ No conversation_id found in mention.") 1168 return 1169 1170 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1171 1172 # Get thread context 1173 x_client = create_x_client() 1174 thread_data = x_client.get_thread_context(conversation_id) 1175 1176 if not thread_data: 1177 print("❌ Failed to retrieve thread context") 1178 return 1179 1180 # Convert to YAML 1181 yaml_thread = thread_to_yaml_string(thread_data) 1182 1183 # Create prompt for the agent 1184 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately. 1185 1186Here is the thread context: 1187 1188{yaml_thread} 1189 1190Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona.""" 1191 1192 prompt_char_count = len(prompt) 1193 print(f"📤 Sending thread context to Letta agent... | prompt: {prompt_char_count} chars") 1194 1195 # Print the prompt in a rich panel 1196 rprint(Panel(prompt, title=f"Prompt ({prompt_char_count} chars)", border_style="blue")) 1197 1198 # Send to Letta agent using streaming 1199 message_stream = letta_client.agents.messages.create_stream( 1200 agent_id=agent_id, 1201 messages=[{"role": "user", "content": prompt}], 1202 stream_tokens=False, 1203 max_steps=10 1204 ) 1205 1206 print("🔄 Streaming response from agent...") 1207 response_text = "" 1208 1209 for chunk in message_stream: 1210 print(chunk) 1211 if hasattr(chunk, 'message_type'): 1212 if chunk.message_type == 'assistant_message': 1213 print(f"🤖 Agent response: {chunk.content}") 1214 response_text = chunk.content 1215 elif chunk.message_type == 'reasoning_message': 1216 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...") 1217 elif chunk.message_type == 'tool_call_message': 1218 print(f"🔧 Agent tool call: {chunk.tool_call.name}") 1219 1220 if response_text: 1221 print(f"\n✅ Agent generated response:") 1222 print(f"📝 Response: {response_text}") 1223 else: 1224 print("❌ No response generated by agent") 1225 1226 except Exception as e: 1227 print(f"Letta integration test failed: {e}") 1228 import traceback 1229 traceback.print_exc() 1230 1231def test_x_client(): 1232 """Test the X client by fetching mentions.""" 1233 try: 1234 client = create_x_client() 1235 result = client.get_mentions(max_results=5) 1236 1237 if result and result["mentions"]: 1238 mentions = result["mentions"] 1239 users_data = result["users"] 1240 print(f"Successfully retrieved {len(mentions)} mentions:") 1241 print(f"User data available for {len(users_data)} users") 1242 for mention in mentions: 1243 author_id = mention.get('author_id') 1244 author_info = users_data.get(author_id, {}) 1245 username = author_info.get('username', 'unknown') 1246 print(f"- {mention.get('id')} from @{username}: {mention.get('text')[:50]}...") 1247 else: 1248 print("No mentions retrieved") 1249 1250 except Exception as e: 1251 print(f"Test failed: {e}") 1252 1253def reply_to_cameron_post(): 1254 """ 1255 Reply to Cameron's specific X post. 1256 1257 NOTE: This requires OAuth User Context authentication, not Bearer token. 1258 Current Bearer token is Application-Only which can't post. 1259 """ 1260 try: 1261 client = create_x_client() 1262 1263 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618 1264 cameron_post_id = "1950690566909710618" 1265 1266 # Simple reply message 1267 reply_text = "Hello from void! 🤖 Testing X integration." 1268 1269 print(f"Attempting to reply to post {cameron_post_id}") 1270 print(f"Reply text: {reply_text}") 1271 print("\nNOTE: This will fail with current Bearer token (Application-Only)") 1272 print("Posting requires OAuth User Context authentication") 1273 1274 result = client.post_reply(reply_text, cameron_post_id) 1275 1276 if result: 1277 print(f"✅ Successfully posted reply!") 1278 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}") 1279 else: 1280 print("❌ Failed to post reply (expected with current auth)") 1281 1282 except Exception as e: 1283 print(f"Reply failed: {e}") 1284 1285def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False): 1286 """ 1287 Process an X mention and generate a reply using the Letta agent. 1288 Similar to bsky.py process_mention but for X/Twitter. 1289 1290 Args: 1291 void_agent: The Letta agent instance 1292 x_client: The X API client 1293 mention_data: The mention data dictionary 1294 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 1295 testing_mode: If True, don't actually post to X 1296 1297 Returns: 1298 True: Successfully processed, remove from queue 1299 False: Failed but retryable, keep in queue 1300 None: Failed with non-retryable error, move to errors directory 1301 "no_reply": No reply was generated, move to no_reply directory 1302 """ 1303 try: 1304 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}") 1305 1306 # Extract mention details 1307 if isinstance(mention_data, dict): 1308 # Handle both raw mention and queued mention formats 1309 if 'mention' in mention_data: 1310 mention = mention_data['mention'] 1311 else: 1312 mention = mention_data 1313 else: 1314 mention = mention_data 1315 1316 mention_id = mention.get('id') 1317 mention_text = mention.get('text', '') 1318 author_id = mention.get('author_id') 1319 conversation_id = mention.get('conversation_id') 1320 in_reply_to_user_id = mention.get('in_reply_to_user_id') 1321 referenced_tweets = mention.get('referenced_tweets', []) 1322 1323 # Check downrank list - only respond to downranked users 10% of the time 1324 downrank_users = load_downrank_users() 1325 if not should_respond_to_downranked_user(str(author_id), downrank_users): 1326 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection") 1327 return "no_reply" 1328 1329 # Enhanced conversation tracking for debug - especially important for Grok handling 1330 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}") 1331 logger.info(f" Author ID: {author_id}") 1332 logger.info(f" Conversation ID: {conversation_id}") 1333 logger.info(f" In Reply To User ID: {in_reply_to_user_id}") 1334 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items") 1335 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets 1336 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}") 1337 logger.info(f" Text preview: {mention_text[:100]}...") 1338 1339 # If no conversation_id, try to use referenced tweet as conversation root 1340 if not conversation_id and referenced_tweets: 1341 # For replies, use the tweet being replied to as conversation root 1342 for ref in referenced_tweets: 1343 if ref.get('type') == 'replied_to': 1344 conversation_id = ref.get('id') 1345 logger.info(f"📎 No conversation_id, using replied_to tweet {conversation_id} as conversation root") 1346 break 1347 1348 if not conversation_id: 1349 # If still no conversation ID, use the mention itself as a standalone 1350 conversation_id = mention_id 1351 logger.warning(f"⚠️ No conversation_id found for mention {mention_id} - treating as standalone tweet") 1352 1353 # Get thread context with caching enabled for efficiency 1354 # Use mention_id as until_id to exclude tweets that occurred after this mention 1355 try: 1356 thread_data = x_client.get_thread_context(conversation_id, use_cache=True, until_id=mention_id) 1357 if not thread_data: 1358 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}") 1359 return False 1360 1361 # If this mention references a specific tweet, ensure we have that tweet in context 1362 if referenced_tweets: 1363 for ref in referenced_tweets: 1364 if ref.get('type') == 'replied_to': 1365 ref_id = ref.get('id') 1366 # Check if the referenced tweet is in our thread data 1367 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])] 1368 if ref_id and ref_id not in thread_tweet_ids: 1369 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch") 1370 try: 1371 # Fetch the missing referenced tweet directly 1372 endpoint = f"/tweets/{ref_id}" 1373 params = { 1374 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 1375 "user.fields": "id,name,username", 1376 "expansions": "author_id" 1377 } 1378 response = x_client._make_request(endpoint, params) 1379 if response and "data" in response: 1380 missing_tweet = response["data"] 1381 if missing_tweet.get('conversation_id') == conversation_id: 1382 # Add to thread data 1383 if 'tweets' not in thread_data: 1384 thread_data['tweets'] = [] 1385 thread_data['tweets'].append(missing_tweet) 1386 1387 # Add user data if available 1388 if "includes" in response and "users" in response["includes"]: 1389 if 'users' not in thread_data: 1390 thread_data['users'] = {} 1391 for user in response["includes"]["users"]: 1392 thread_data['users'][user["id"]] = user 1393 1394 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context") 1395 else: 1396 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}") 1397 except Exception as e: 1398 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}") 1399 1400 # Enhanced thread context debugging 1401 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}") 1402 thread_posts = thread_data.get('tweets', []) 1403 thread_users = thread_data.get('users', {}) 1404 logger.info(f" Posts in thread: {len(thread_posts)}") 1405 logger.info(f" Users in thread: {len(thread_users)}") 1406 1407 # Log thread participants for Grok detection 1408 for user_id, user_info in thread_users.items(): 1409 username = user_info.get('username', 'unknown') 1410 name = user_info.get('name', 'Unknown') 1411 is_verified = user_info.get('verified', False) 1412 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}") 1413 1414 # Special logging for Grok or AI-related users 1415 if 'grok' in username.lower() or 'grok' in name.lower(): 1416 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})") 1417 1418 # Log conversation structure 1419 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts 1420 post_id = post.get('id') 1421 post_author = post.get('author_id') 1422 post_text = post.get('text', '')[:50] 1423 is_reply = 'in_reply_to_user_id' in post 1424 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...") 1425 1426 except Exception as e: 1427 logger.error(f"❌ Error getting thread context: {e}") 1428 return False 1429 1430 # Convert to YAML string 1431 thread_context = thread_to_yaml_string(thread_data) 1432 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters") 1433 1434 # Save comprehensive conversation data for debugging 1435 try: 1436 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1437 debug_dir.mkdir(parents=True, exist_ok=True) 1438 1439 # Save raw thread data (JSON) 1440 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f: 1441 json.dump(thread_data, f, indent=2) 1442 1443 # Save YAML thread context 1444 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f: 1445 f.write(thread_context) 1446 1447 # Save mention processing debug info 1448 debug_info = { 1449 'processed_at': datetime.now().isoformat(), 1450 'mention_id': mention_id, 1451 'conversation_id': conversation_id, 1452 'author_id': author_id, 1453 'in_reply_to_user_id': in_reply_to_user_id, 1454 'referenced_tweets': referenced_tweets, 1455 'thread_stats': { 1456 'total_posts': len(thread_posts), 1457 'total_users': len(thread_users), 1458 'yaml_length': len(thread_context) 1459 }, 1460 'users_in_conversation': { 1461 user_id: { 1462 'username': user_info.get('username'), 1463 'name': user_info.get('name'), 1464 'verified': user_info.get('verified', False), 1465 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower() 1466 } 1467 for user_id, user_info in thread_users.items() 1468 } 1469 } 1470 1471 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f: 1472 json.dump(debug_info, f, indent=2) 1473 1474 logger.info(f"💾 Saved conversation debug data to: {debug_dir}") 1475 1476 except Exception as debug_error: 1477 logger.warning(f"Failed to save debug data: {debug_error}") 1478 # Continue processing even if debug save fails 1479 1480 # Check for #voidstop 1481 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower(): 1482 logger.info("Found #voidstop, skipping this mention") 1483 return True 1484 1485 # Note: X user block attachment removed - no longer using user-specific memory blocks 1486 1487 # Create prompt for Letta agent 1488 # First try to use cached author info from queued mention 1489 author_username = 'unknown' 1490 author_name = 'unknown' 1491 1492 if 'author_info' in mention_data: 1493 # Use cached author info from when mention was queued 1494 cached_info = mention_data['author_info'] 1495 if cached_info.get('username'): 1496 author_username = cached_info['username'] 1497 author_name = cached_info.get('name', author_username) 1498 logger.info(f"Using cached author info: @{author_username} ({author_name})") 1499 1500 # If not cached, try thread data 1501 if author_username == 'unknown': 1502 author_info = thread_data.get('users', {}).get(author_id, {}) 1503 author_username = author_info.get('username', 'unknown') 1504 author_name = author_info.get('name', author_username) 1505 1506 # Final fallback: if username is still unknown, try to find it in the thread tweets 1507 if author_username == 'unknown' and 'tweets' in thread_data: 1508 for tweet in thread_data['tweets']: 1509 if tweet.get('author_id') == author_id and 'author' in tweet: 1510 tweet_author = tweet['author'] 1511 if tweet_author.get('username'): 1512 author_username = tweet_author['username'] 1513 author_name = tweet_author.get('name', author_username) 1514 logger.info(f"Resolved unknown author via thread fallback: @{author_username} ({author_name})") 1515 break 1516 1517 # Build user ID mapping from thread data 1518 user_id_mapping = {} 1519 if 'tweets' in thread_data: 1520 for tweet in thread_data['tweets']: 1521 author_id_tweet = tweet.get('author_id') 1522 if author_id_tweet and 'author' in tweet: 1523 tweet_author = tweet['author'] 1524 username = tweet_author.get('username') 1525 if username and author_id_tweet not in user_id_mapping: 1526 user_id_mapping[author_id_tweet] = f"@{username}" 1527 1528 # Also add users from the users dict if available 1529 if 'users' in thread_data: 1530 for user_id, user_data in thread_data['users'].items(): 1531 username = user_data.get('username') 1532 if username and user_id not in user_id_mapping: 1533 user_id_mapping[user_id] = f"@{username}" 1534 1535 # Format user ID mapping for prompt 1536 id_mapping_text = "" 1537 if user_id_mapping: 1538 id_mapping_lines = [f" {user_id}: {handle}" for user_id, handle in user_id_mapping.items()] 1539 id_mapping_text = f"\n\nUSER_ID_KEY:\n" + "\n".join(id_mapping_lines) 1540 1541 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}). 1542 1543MOST RECENT POST (the mention you're responding to): 1544"{mention_text}" 1545 1546FULL THREAD CONTEXT: 1547```yaml 1548{thread_context} 1549``` 1550 1551The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.{id_mapping_text} 1552 1553If you need to update user information, use the x_user_* tools. 1554 1555To reply, use the add_post_to_x_thread tool: 1556- Each call creates one post (max 280 characters) 1557- For most responses, a single call is sufficient 1558- Only use multiple calls for threaded replies when: 1559 * The topic requires extended explanation that cannot fit in 280 characters 1560 * You're explicitly asked for a detailed/long response 1561 * The conversation naturally benefits from a structured multi-part answer 1562- Avoid unnecessary threads - be concise when possible""" 1563 1564 # Log mention processing 1565 title = f"X MENTION FROM @{author_username}" 1566 print(f"\n{title}") 1567 print(f" {'' * len(title)}") 1568 for line in mention_text.split('\n'): 1569 print(f" {line}") 1570 1571 # Send to Letta agent 1572 from config_loader import get_letta_config 1573 from letta_client import Letta 1574 1575 config = get_letta_config() 1576 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 1577 1578 prompt_char_count = len(prompt) 1579 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars | prompt: {prompt_char_count} chars") 1580 1581 try: 1582 # Use streaming to avoid timeout errors 1583 message_stream = letta_client.agents.messages.create_stream( 1584 agent_id=void_agent.id, 1585 messages=[{"role": "user", "content": prompt}], 1586 stream_tokens=False, 1587 max_steps=100 1588 ) 1589 1590 # Collect streaming response (simplified version of bsky.py logic) 1591 all_messages = [] 1592 for chunk in message_stream: 1593 if hasattr(chunk, 'message_type'): 1594 if chunk.message_type == 'reasoning_message': 1595 print("\n◆ Reasoning") 1596 print(" ─────────") 1597 for line in chunk.reasoning.split('\n'): 1598 print(f" {line}") 1599 elif chunk.message_type == 'tool_call_message': 1600 tool_name = chunk.tool_call.name 1601 if tool_name == 'add_post_to_x_thread': 1602 try: 1603 args = json.loads(chunk.tool_call.arguments) 1604 text = args.get('text', '') 1605 if text: 1606 print("\n✎ X Post") 1607 print(" ────────") 1608 for line in text.split('\n'): 1609 print(f" {line}") 1610 except: 1611 pass 1612 elif tool_name == 'halt_activity': 1613 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT") 1614 if queue_filepath and queue_filepath.exists(): 1615 queue_filepath.unlink() 1616 logger.info(f"Deleted queue file: {queue_filepath.name}") 1617 logger.info("=== X BOT TERMINATED BY AGENT ===") 1618 exit(0) 1619 elif chunk.message_type == 'tool_return_message': 1620 tool_name = chunk.name 1621 status = chunk.status 1622 if status == 'success' and tool_name == 'add_post_to_x_thread': 1623 print("\n✓ X Post Queued") 1624 print(" ──────────────") 1625 print(" Post queued successfully") 1626 elif chunk.message_type == 'assistant_message': 1627 print("\n▶ Assistant Response") 1628 print(" ──────────────────") 1629 for line in chunk.content.split('\n'): 1630 print(f" {line}") 1631 1632 all_messages.append(chunk) 1633 if str(chunk) == 'done': 1634 break 1635 1636 # Convert streaming response for compatibility 1637 message_response = type('StreamingResponse', (), { 1638 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 1639 })() 1640 1641 except Exception as api_error: 1642 logger.error(f"Letta API error: {api_error}") 1643 raise 1644 1645 # Extract successful add_post_to_x_thread tool calls 1646 reply_candidates = [] 1647 tool_call_results = {} 1648 ignored_notification = False 1649 ack_note = None # Track any note from annotate_ack tool 1650 1651 # First pass: collect tool return statuses 1652 for message in message_response.messages: 1653 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 1654 if message.name == 'add_post_to_x_thread': 1655 tool_call_results[message.tool_call_id] = message.status 1656 elif message.name == 'ignore_notification': 1657 if message.status == 'success': 1658 ignored_notification = True 1659 logger.info("🚫 X notification ignored") 1660 1661 # Second pass: collect successful tool calls 1662 for message in message_response.messages: 1663 if hasattr(message, 'tool_call') and message.tool_call: 1664 # Collect annotate_ack tool calls 1665 if message.tool_call.name == 'annotate_ack': 1666 try: 1667 args = json.loads(message.tool_call.arguments) 1668 note = args.get('note', '') 1669 if note: 1670 ack_note = note 1671 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 1672 except json.JSONDecodeError as e: 1673 logger.error(f"Failed to parse annotate_ack arguments: {e}") 1674 1675 # Collect add_post_to_x_thread tool calls - only if they were successful 1676 elif message.tool_call.name == 'add_post_to_x_thread': 1677 tool_call_id = message.tool_call.tool_call_id 1678 tool_status = tool_call_results.get(tool_call_id, 'unknown') 1679 1680 if tool_status == 'success': 1681 try: 1682 args = json.loads(message.tool_call.arguments) 1683 reply_text = args.get('text', '') 1684 if reply_text: 1685 reply_candidates.append(reply_text) 1686 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...") 1687 except json.JSONDecodeError as e: 1688 logger.error(f"Failed to parse tool call arguments: {e}") 1689 1690 # Save agent response data to debug folder 1691 try: 1692 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1693 1694 # Save complete agent interaction 1695 agent_response_data = { 1696 'processed_at': datetime.now().isoformat(), 1697 'mention_id': mention_id, 1698 'conversation_id': conversation_id, 1699 'prompt_sent': prompt, 1700 'reply_candidates': reply_candidates, 1701 'ignored_notification': ignored_notification, 1702 'ack_note': ack_note, 1703 'tool_call_results': tool_call_results, 1704 'all_messages': [] 1705 } 1706 1707 # Convert messages to serializable format 1708 for message in message_response.messages: 1709 msg_data = { 1710 'message_type': getattr(message, 'message_type', 'unknown'), 1711 'content': getattr(message, 'content', ''), 1712 'reasoning': getattr(message, 'reasoning', ''), 1713 'status': getattr(message, 'status', ''), 1714 'name': getattr(message, 'name', ''), 1715 } 1716 1717 if hasattr(message, 'tool_call') and message.tool_call: 1718 msg_data['tool_call'] = { 1719 'name': message.tool_call.name, 1720 'arguments': message.tool_call.arguments, 1721 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '') 1722 } 1723 1724 agent_response_data['all_messages'].append(msg_data) 1725 1726 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f: 1727 json.dump(agent_response_data, f, indent=2) 1728 1729 logger.info(f"💾 Saved agent response debug data") 1730 1731 except Exception as debug_error: 1732 logger.warning(f"Failed to save agent response debug data: {debug_error}") 1733 1734 # Handle conflicts 1735 if reply_candidates and ignored_notification: 1736 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!") 1737 return False 1738 1739 if reply_candidates: 1740 # Post replies to X 1741 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X") 1742 1743 if len(reply_candidates) == 1: 1744 content = reply_candidates[0] 1745 title = f"Reply to @{author_username}" 1746 else: 1747 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)]) 1748 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)" 1749 1750 print(f"\n{title}") 1751 print(f" {'' * len(title)}") 1752 for line in content.split('\n'): 1753 print(f" {line}") 1754 1755 if testing_mode: 1756 logger.info("TESTING MODE: Skipping actual X post") 1757 return True 1758 else: 1759 # Post to X using thread approach 1760 success = post_x_thread_replies(x_client, mention_id, reply_candidates) 1761 if success: 1762 logger.info(f"Successfully replied to @{author_username} on X") 1763 1764 # Acknowledge the post we're replying to 1765 try: 1766 ack_result = acknowledge_x_post(x_client, mention_id, ack_note) 1767 if ack_result: 1768 if ack_note: 1769 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")") 1770 else: 1771 logger.info(f"Successfully acknowledged X post from @{author_username}") 1772 else: 1773 logger.warning(f"Failed to acknowledge X post from @{author_username}") 1774 except Exception as e: 1775 logger.error(f"Error acknowledging X post from @{author_username}: {e}") 1776 # Don't fail the entire operation if acknowledgment fails 1777 1778 return True 1779 else: 1780 logger.error(f"Failed to send reply to @{author_username} on X") 1781 return False 1782 else: 1783 if ignored_notification: 1784 logger.info(f"X mention from @{author_username} was explicitly ignored") 1785 return "ignored" 1786 else: 1787 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass") 1788 return False # Keep in queue for retry instead of removing 1789 1790 except Exception as e: 1791 logger.error(f"Error processing X mention: {e}") 1792 return False 1793 1794def acknowledge_x_post(x_client, post_id, note=None): 1795 """ 1796 Acknowledge an X post that we replied to. 1797 Uses the same Bluesky client and uploads to the void data repository on atproto, 1798 just like Bluesky acknowledgments. 1799 1800 Args: 1801 x_client: XClient instance (not used, kept for compatibility) 1802 post_id: The X post ID we're acknowledging 1803 note: Optional note to include with the acknowledgment 1804 1805 Returns: 1806 True if successful, False otherwise 1807 """ 1808 try: 1809 # Use Bluesky client to upload acks to the void data repository on atproto 1810 bsky_client = bsky_utils.default_login() 1811 1812 # Create a synthetic URI and CID for the X post 1813 # X posts don't have atproto URIs/CIDs, so we create identifiers 1814 post_uri = f"x://twitter.com/post/{post_id}" 1815 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts 1816 1817 # Use the same acknowledge_post function as Bluesky 1818 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note) 1819 1820 if ack_result: 1821 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else "")) 1822 return True 1823 else: 1824 logger.error(f"Failed to acknowledge X post {post_id}") 1825 return False 1826 1827 except Exception as e: 1828 logger.error(f"Error acknowledging X post {post_id}: {e}") 1829 return False 1830 1831def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages): 1832 """ 1833 Post a series of replies to X, threading them properly. 1834 1835 Args: 1836 x_client: XClient instance 1837 in_reply_to_tweet_id: The original tweet ID to reply to 1838 reply_messages: List of reply text strings 1839 1840 Returns: 1841 True if successful, False otherwise 1842 """ 1843 try: 1844 current_reply_id = in_reply_to_tweet_id 1845 1846 for i, reply_text in enumerate(reply_messages): 1847 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...") 1848 1849 result = x_client.post_reply(reply_text, current_reply_id) 1850 1851 if result and 'data' in result: 1852 new_tweet_id = result['data']['id'] 1853 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}") 1854 # For threading, the next reply should reply to this one 1855 current_reply_id = new_tweet_id 1856 else: 1857 logger.error(f"Failed to post X reply {i+1}") 1858 return False 1859 1860 return True 1861 1862 except Exception as e: 1863 logger.error(f"Error posting X thread replies: {e}") 1864 return False 1865 1866def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False): 1867 """ 1868 Load and process all X mentions from the queue. 1869 Similar to bsky.py load_and_process_queued_notifications but for X. 1870 """ 1871 try: 1872 # Get all X mention files in queue directory 1873 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1874 1875 if not queue_files: 1876 return 1877 1878 # Load file metadata and sort by creation time (chronological order) 1879 file_metadata = [] 1880 for filepath in queue_files: 1881 try: 1882 with open(filepath, 'r') as f: 1883 queue_data = json.load(f) 1884 mention_data = queue_data.get('mention', queue_data) 1885 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing 1886 file_metadata.append((created_at, filepath)) 1887 except Exception as e: 1888 logger.warning(f"Error reading queue file {filepath.name}: {e}") 1889 # Add with default timestamp so it still gets processed 1890 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath)) 1891 1892 # Sort by creation time (oldest first) 1893 file_metadata.sort(key=lambda x: x[0]) 1894 1895 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order") 1896 1897 for i, (created_at, filepath) in enumerate(file_metadata, 1): 1898 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})") 1899 1900 try: 1901 # Load mention data 1902 with open(filepath, 'r') as f: 1903 queue_data = json.load(f) 1904 1905 # Process the mention (pass full queue_data to have access to author_info) 1906 success = process_x_mention(void_agent, x_client, queue_data, 1907 queue_filepath=filepath, testing_mode=testing_mode) 1908 1909 except XRateLimitError: 1910 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning") 1911 break 1912 1913 except Exception as e: 1914 logger.error(f"Error processing X queue file {filepath.name}: {e}") 1915 continue 1916 1917 # Handle file based on processing result 1918 if success: 1919 if testing_mode: 1920 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}") 1921 else: 1922 filepath.unlink() 1923 logger.info(f"Successfully processed and removed X file: {filepath.name}") 1924 1925 # Mark as processed 1926 processed_mentions = load_processed_mentions() 1927 processed_mentions.add(mention_data.get('id')) 1928 save_processed_mentions(processed_mentions) 1929 1930 elif success is None: # Move to error directory 1931 error_dir = X_QUEUE_DIR / "errors" 1932 error_dir.mkdir(exist_ok=True) 1933 error_path = error_dir / filepath.name 1934 filepath.rename(error_path) 1935 logger.warning(f"Moved X file {filepath.name} to errors directory") 1936 1937 elif success == "no_reply": # Move to no_reply directory 1938 no_reply_dir = X_QUEUE_DIR / "no_reply" 1939 no_reply_dir.mkdir(exist_ok=True) 1940 no_reply_path = no_reply_dir / filepath.name 1941 filepath.rename(no_reply_path) 1942 logger.info(f"Moved X file {filepath.name} to no_reply directory") 1943 1944 elif success == "ignored": # Delete ignored notifications 1945 filepath.unlink() 1946 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}") 1947 1948 else: 1949 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry") 1950 1951 except Exception as e: 1952 logger.error(f"Error loading queued X mentions: {e}") 1953 1954def process_x_notifications(void_agent, x_client, testing_mode=False): 1955 """ 1956 Fetch new X mentions, queue them, and process the queue. 1957 Similar to bsky.py process_notifications but for X. 1958 """ 1959 try: 1960 # Get username for fetching mentions 1961 user_info = x_client._make_request("/users/me", params={"user.fields": "username"}) 1962 if not user_info or "data" not in user_info: 1963 logger.error("Could not get username for X mentions") 1964 return 1965 1966 username = user_info["data"]["username"] 1967 1968 # Fetch and queue new mentions 1969 new_count = fetch_and_queue_mentions(username) 1970 1971 if new_count > 0: 1972 logger.info(f"Found {new_count} new X mentions to process") 1973 1974 # Process the entire queue 1975 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 1976 1977 except Exception as e: 1978 logger.error(f"Error processing X notifications: {e}") 1979 1980def periodic_user_block_cleanup(client, agent_id: str) -> None: 1981 """ 1982 Detach all user blocks from the agent to prevent memory bloat. 1983 This should be called periodically to ensure clean state. 1984 """ 1985 try: 1986 # Get all blocks attached to the agent 1987 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 1988 1989 user_blocks_to_detach = [] 1990 for block in attached_blocks: 1991 if hasattr(block, 'label') and block.label.startswith('user_'): 1992 user_blocks_to_detach.append({ 1993 'label': block.label, 1994 'id': block.id 1995 }) 1996 1997 if not user_blocks_to_detach: 1998 logger.debug("No user blocks found to detach during periodic cleanup") 1999 return 2000 2001 logger.info(f"Found {len(user_blocks_to_detach)} user blocks to detach") 2002 2003 # Detach each user block 2004 for block in user_blocks_to_detach: 2005 try: 2006 client.agents.blocks.detach( 2007 agent_id=agent_id, 2008 block_id=block['id'] 2009 ) 2010 logger.debug(f"Detached user block: {block['label']}") 2011 except Exception as e: 2012 logger.error(f"Failed to detach user block {block['label']}: {e}") 2013 2014 logger.info(f"Periodic cleanup complete: detached {len(user_blocks_to_detach)} user blocks") 2015 2016 except Exception as e: 2017 logger.error(f"Error during periodic user block cleanup: {e}") 2018 2019def initialize_x_void(): 2020 """Initialize the void agent for X operations.""" 2021 logger.info("Starting void agent initialization for X...") 2022 2023 from config_loader import get_letta_config 2024 from letta_client import Letta 2025 2026 # Get config 2027 config = get_letta_config() 2028 client = Letta(token=config['api_key'], timeout=config['timeout']) 2029 agent_id = config['agent_id'] 2030 2031 try: 2032 void_agent = client.agents.retrieve(agent_id=agent_id) 2033 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})") 2034 except Exception as e: 2035 logger.error(f"Failed to load void agent {agent_id}: {e}") 2036 raise e 2037 2038 # Clean up all user blocks at startup 2039 logger.info("🧹 Cleaning up user blocks at X startup...") 2040 periodic_user_block_cleanup(client, agent_id) 2041 2042 # Ensure correct tools are attached for X 2043 logger.info("Configuring tools for X platform...") 2044 try: 2045 from tool_manager import ensure_platform_tools 2046 ensure_platform_tools('x', void_agent.id) 2047 except Exception as e: 2048 logger.error(f"Failed to configure platform tools: {e}") 2049 logger.warning("Continuing with existing tool configuration") 2050 2051 # Log agent details 2052 logger.info(f"X Void agent details - ID: {void_agent.id}") 2053 logger.info(f"Agent name: {void_agent.name}") 2054 2055 return void_agent 2056 2057def send_x_synthesis_message(letta_client, agent_id): 2058 """ 2059 Send a simple synthesis message to the X agent. 2060 Simplified version for X bot without atproto dependencies. 2061 """ 2062 try: 2063 from datetime import date 2064 today = date.today() 2065 2066 synthesis_prompt = f"""Time for synthesis and reflection on your X (Twitter) experiences. 2067 2068Today's date: {today.strftime('%B %d, %Y')} 2069 2070Please reflect on: 20711. Recent X interactions and conversations you've had 20722. Patterns you've observed in X discussions 20733. Your evolving understanding of the X platform dynamics 20744. Notable exchanges or insights from X users 20755. How your presence on X is developing 2076 2077Use your memory tools to record significant insights and observations.""" 2078 2079 logger.info("🧠 Sending X synthesis message to agent") 2080 2081 # Send simple synthesis message 2082 response = letta_client.agents.messages.create( 2083 agent_id=agent_id, 2084 messages=[{"role": "user", "content": synthesis_prompt}], 2085 max_steps=50 2086 ) 2087 2088 logger.info("✅ X synthesis message completed successfully") 2089 return True 2090 2091 except Exception as e: 2092 logger.error(f"Error sending X synthesis message: {e}") 2093 return False 2094 2095def x_main_loop(testing_mode=False, cleanup_interval=10, synthesis_interval=600, synthesis_only=False): 2096 """ 2097 Main X bot loop that continuously monitors for mentions and processes them. 2098 Similar to bsky.py main() but for X/Twitter. 2099 2100 Args: 2101 testing_mode: If True, don't actually post to X 2102 cleanup_interval: Run user block cleanup every N cycles (0 to disable) 2103 synthesis_interval: Send synthesis message every N seconds (0 to disable) 2104 synthesis_only: If True, only send synthesis messages (no notification processing) 2105 """ 2106 import time 2107 from time import sleep 2108 from config_loader import get_letta_config 2109 from letta_client import Letta 2110 # We'll create our own X synthesis function instead 2111 2112 logger.info("=== STARTING X VOID BOT ===") 2113 2114 # Initialize void agent 2115 void_agent = initialize_x_void() 2116 logger.info(f"X void agent initialized: {void_agent.id}") 2117 2118 # Initialize X client 2119 x_client = create_x_client() 2120 logger.info("Connected to X API") 2121 2122 # Get Letta client for periodic cleanup 2123 config = get_letta_config() 2124 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 2125 2126 # Main loop 2127 FETCH_DELAY_SEC = 120 # Check every 2 minutes for X mentions (reduced from 60s to conserve API calls) 2128 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds") 2129 2130 if testing_mode: 2131 logger.info("=== RUNNING IN X TESTING MODE ===") 2132 logger.info(" - No messages will be sent to X") 2133 logger.info(" - Queue files will not be deleted") 2134 2135 if cleanup_interval > 0: 2136 logger.info(f"User block cleanup enabled every {cleanup_interval} cycles") 2137 else: 2138 logger.info("User block cleanup disabled") 2139 2140 if synthesis_interval > 0: 2141 logger.info(f"Synthesis messages enabled every {synthesis_interval} seconds ({synthesis_interval/60:.1f} minutes)") 2142 else: 2143 logger.info("Synthesis messages disabled") 2144 2145 # Synthesis-only mode 2146 if synthesis_only: 2147 if synthesis_interval <= 0: 2148 logger.error("Synthesis-only mode requires --synthesis-interval > 0") 2149 return 2150 2151 logger.info(f"Starting X synthesis-only mode, interval: {synthesis_interval} seconds ({synthesis_interval/60:.1f} minutes)") 2152 2153 while True: 2154 try: 2155 # Send synthesis message immediately on first run 2156 logger.info("🧠 Sending X synthesis message") 2157 send_x_synthesis_message(letta_client, void_agent.id) 2158 2159 # Wait for next interval 2160 logger.info(f"Waiting {synthesis_interval} seconds until next synthesis...") 2161 sleep(synthesis_interval) 2162 2163 except KeyboardInterrupt: 2164 logger.info("=== X SYNTHESIS MODE STOPPED BY USER ===") 2165 break 2166 except Exception as e: 2167 logger.error(f"Error in X synthesis loop: {e}") 2168 logger.info(f"Sleeping for {synthesis_interval} seconds due to error...") 2169 sleep(synthesis_interval) 2170 2171 cycle_count = 0 2172 start_time = time.time() 2173 last_synthesis_time = time.time() 2174 2175 while True: 2176 try: 2177 cycle_count += 1 2178 logger.info(f"=== X CYCLE {cycle_count} ===") 2179 2180 # Process X notifications (fetch, queue, and process) 2181 process_x_notifications(void_agent, x_client, testing_mode) 2182 2183 # Run periodic cleanup every N cycles 2184 if cleanup_interval > 0 and cycle_count % cleanup_interval == 0: 2185 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 2186 periodic_user_block_cleanup(letta_client, void_agent.id) 2187 2188 # Check if synthesis interval has passed 2189 if synthesis_interval > 0: 2190 current_time = time.time() 2191 if current_time - last_synthesis_time >= synthesis_interval: 2192 logger.info(f"{synthesis_interval/60:.1f} minutes have passed, triggering X synthesis") 2193 send_x_synthesis_message(letta_client, void_agent.id) 2194 last_synthesis_time = current_time 2195 2196 # Log cycle completion 2197 elapsed_time = time.time() - start_time 2198 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes") 2199 2200 sleep(FETCH_DELAY_SEC) 2201 2202 except KeyboardInterrupt: 2203 elapsed_time = time.time() - start_time 2204 logger.info("=== X BOT STOPPED BY USER ===") 2205 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes") 2206 break 2207 except Exception as e: 2208 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===") 2209 logger.error(f"Error details: {e}") 2210 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...") 2211 sleep(FETCH_DELAY_SEC * 2) 2212 2213def process_queue_only(testing_mode=False): 2214 """ 2215 Process all queued X mentions without fetching new ones. 2216 Useful for rate limit management - queue first, then process separately. 2217 2218 Args: 2219 testing_mode: If True, don't actually post to X and keep queue files 2220 """ 2221 logger.info("=== PROCESSING X QUEUE ONLY ===") 2222 2223 if testing_mode: 2224 logger.info("=== RUNNING IN X TESTING MODE ===") 2225 logger.info(" - No messages will be sent to X") 2226 logger.info(" - Queue files will not be deleted") 2227 2228 try: 2229 # Initialize void agent 2230 void_agent = initialize_x_void() 2231 logger.info(f"X void agent initialized: {void_agent.id}") 2232 2233 # Initialize X client 2234 x_client = create_x_client() 2235 logger.info("Connected to X API") 2236 2237 # Process the queue without fetching new mentions 2238 logger.info("Processing existing X queue...") 2239 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 2240 2241 logger.info("=== X QUEUE PROCESSING COMPLETE ===") 2242 2243 except Exception as e: 2244 logger.error(f"Error processing X queue: {e}") 2245 raise 2246 2247def x_notification_loop(): 2248 """ 2249 DEPRECATED: Old X notification loop using search-based mention detection. 2250 Use x_main_loop() instead for the full bot experience. 2251 """ 2252 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.") 2253 x_main_loop() 2254 2255if __name__ == "__main__": 2256 import sys 2257 import argparse 2258 2259 if len(sys.argv) > 1: 2260 if sys.argv[1] == "bot": 2261 # Main bot with optional --test flag and cleanup interval 2262 parser = argparse.ArgumentParser(description='X Void Bot') 2263 parser.add_argument('command', choices=['bot']) 2264 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)') 2265 parser.add_argument('--cleanup-interval', type=int, default=10, 2266 help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 2267 parser.add_argument('--synthesis-interval', type=int, default=600, 2268 help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') 2269 parser.add_argument('--synthesis-only', action='store_true', 2270 help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') 2271 args = parser.parse_args() 2272 x_main_loop(testing_mode=args.test, cleanup_interval=args.cleanup_interval, 2273 synthesis_interval=args.synthesis_interval, synthesis_only=args.synthesis_only) 2274 elif sys.argv[1] == "loop": 2275 x_notification_loop() 2276 elif sys.argv[1] == "reply": 2277 reply_to_cameron_post() 2278 elif sys.argv[1] == "me": 2279 get_my_user_info() 2280 elif sys.argv[1] == "search": 2281 test_search_mentions() 2282 elif sys.argv[1] == "queue": 2283 test_fetch_and_queue() 2284 elif sys.argv[1] == "thread": 2285 test_thread_context() 2286 elif sys.argv[1] == "process": 2287 # Process all queued mentions with optional --test flag 2288 testing_mode = "--test" in sys.argv 2289 process_queue_only(testing_mode=testing_mode) 2290 elif sys.argv[1] == "letta": 2291 # Use specific agent ID if provided, otherwise use from config 2292 agent_id = sys.argv[2] if len(sys.argv) > 2 else None 2293 test_letta_integration(agent_id) 2294 elif sys.argv[1] == "downrank": 2295 # View or manage downrank list 2296 if len(sys.argv) > 2 and sys.argv[2] == "list": 2297 downrank_users = load_downrank_users() 2298 if downrank_users: 2299 print(f"📋 Downrank users ({len(downrank_users)} total):") 2300 for user_id in sorted(downrank_users): 2301 print(f" - {user_id}") 2302 else: 2303 print("📋 No downrank users configured") 2304 else: 2305 print("Usage: python x.py downrank list") 2306 print(" list - Show all downranked user IDs") 2307 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list") 2308 else: 2309 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]") 2310 print(" bot - Run the main X bot (use --test for testing mode)") 2311 print(" Example: python x.py bot --test") 2312 print(" queue - Fetch and queue mentions only (no processing)") 2313 print(" process - Process all queued mentions only (no fetching)") 2314 print(" Example: python x.py process --test") 2315 print(" downrank - Manage downrank users (10% response rate)") 2316 print(" Example: python x.py downrank list") 2317 print(" loop - Run the old notification monitoring loop (deprecated)") 2318 print(" reply - Reply to Cameron's specific post") 2319 print(" me - Get authenticated user info and correct user ID") 2320 print(" search - Test search-based mention detection") 2321 print(" thread - Test thread context retrieval from queued mention") 2322 print(" letta - Test sending thread context to Letta agent") 2323 print(" Optional: python x.py letta <agent-id>") 2324 else: 2325 test_x_client()