a digital person for bluesky
1import os 2import logging 3import requests 4import yaml 5import json 6import hashlib 7import random 8import time 9from typing import Optional, Dict, Any, List, Set 10from datetime import datetime 11from pathlib import Path 12from requests_oauthlib import OAuth1 13from rich import print as rprint 14from rich.panel import Panel 15from rich.text import Text 16 17import bsky_utils 18 19class XRateLimitError(Exception): 20 """Exception raised when X API rate limit is exceeded""" 21 pass 22 23 24# Configure logging (will be updated by setup_logging_from_config if called) 25logging.basicConfig( 26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 27) 28logger = logging.getLogger("x_client") 29 30def setup_logging_from_config(config_path: str = "x_config.yaml"): 31 """Configure logging based on x_config.yaml settings.""" 32 try: 33 config = load_x_config(config_path) 34 logging_config = config.get('logging', {}) 35 log_level = logging_config.get('level', 'INFO').upper() 36 37 # Convert string level to logging constant 38 numeric_level = getattr(logging, log_level, logging.INFO) 39 40 # Update the root logger level 41 logging.getLogger().setLevel(numeric_level) 42 # Update our specific logger level 43 logger.setLevel(numeric_level) 44 45 logger.info(f"Logging level set to {log_level}") 46 47 except Exception as e: 48 logger.warning(f"Failed to configure logging from config: {e}, using default INFO level") 49 50# X-specific file paths 51X_QUEUE_DIR = Path("x_queue") 52X_CACHE_DIR = Path("x_cache") 53X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json") 54X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json") 55X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt") 56 57class XClient: 58 """X (Twitter) API client for fetching mentions and managing interactions.""" 59 60 def __init__(self, api_key: str, user_id: str, access_token: str = None, 61 consumer_key: str = None, consumer_secret: str = None, 62 access_token_secret: str = None): 63 self.api_key = api_key 64 self.access_token = access_token 65 self.user_id = user_id 66 self.base_url = "https://api.x.com/2" 67 68 # Check if we have OAuth 1.0a credentials 69 if (consumer_key and consumer_secret and access_token and access_token_secret): 70 # Use OAuth 1.0a for User Context 71 self.oauth = OAuth1( 72 consumer_key, 73 client_secret=consumer_secret, 74 resource_owner_key=access_token, 75 resource_owner_secret=access_token_secret 76 ) 77 self.headers = {"Content-Type": "application/json"} 78 self.auth_method = "oauth1a" 79 logger.info("Using OAuth 1.0a User Context authentication for X API") 80 elif access_token: 81 # Use OAuth 2.0 Bearer token for User Context 82 self.oauth = None 83 self.headers = { 84 "Authorization": f"Bearer {access_token}", 85 "Content-Type": "application/json" 86 } 87 self.auth_method = "oauth2_user" 88 logger.info("Using OAuth 2.0 User Context access token for X API") 89 else: 90 # Use Application-Only Bearer token 91 self.oauth = None 92 self.headers = { 93 "Authorization": f"Bearer {api_key}", 94 "Content-Type": "application/json" 95 } 96 self.auth_method = "bearer" 97 logger.info("Using Application-Only Bearer token for X API") 98 99 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]: 100 """Make a request to the X API with proper error handling and exponential backoff.""" 101 url = f"{self.base_url}{endpoint}" 102 103 # Log the specific API call being made 104 logger.debug(f"Making X API request: {method} {endpoint}") 105 106 for attempt in range(max_retries): 107 try: 108 if method.upper() == "GET": 109 if self.oauth: 110 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth) 111 else: 112 response = requests.get(url, headers=self.headers, params=params) 113 elif method.upper() == "POST": 114 if self.oauth: 115 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth) 116 else: 117 response = requests.post(url, headers=self.headers, json=data) 118 else: 119 raise ValueError(f"Unsupported HTTP method: {method}") 120 121 response.raise_for_status() 122 return response.json() 123 124 except requests.exceptions.HTTPError as e: 125 if response.status_code == 401: 126 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials") 127 logger.error(f"Response: {response.text}") 128 return None # Don't retry auth failures 129 elif response.status_code == 403: 130 logger.error(f"X API forbidden with {self.auth_method} - check app permissions") 131 logger.error(f"Response: {response.text}") 132 return None # Don't retry permission failures 133 elif response.status_code == 429: 134 if attempt < max_retries - 1: 135 # Exponential backoff: 60s, 120s, 240s 136 backoff_time = 60 * (2 ** attempt) 137 logger.warning(f"X API rate limit exceeded on {method} {endpoint} (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry") 138 logger.error(f"Response: {response.text}") 139 time.sleep(backoff_time) 140 continue 141 else: 142 logger.error(f"X API rate limit exceeded on {method} {endpoint} - max retries reached") 143 logger.error(f"Response: {response.text}") 144 raise XRateLimitError(f"X API rate limit exceeded on {method} {endpoint}") 145 else: 146 if attempt < max_retries - 1: 147 # Exponential backoff for other HTTP errors too 148 backoff_time = 30 * (2 ** attempt) 149 logger.warning(f"X API request failed (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 150 logger.error(f"Response: {response.text}") 151 time.sleep(backoff_time) 152 continue 153 else: 154 logger.error(f"X API request failed after {max_retries} attempts: {e}") 155 logger.error(f"Response: {response.text}") 156 return None 157 158 except Exception as e: 159 if attempt < max_retries - 1: 160 backoff_time = 15 * (2 ** attempt) 161 logger.warning(f"Unexpected error making X API request (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s") 162 time.sleep(backoff_time) 163 continue 164 else: 165 logger.error(f"Unexpected error making X API request after {max_retries} attempts: {e}") 166 return None 167 168 return None 169 170 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]: 171 """ 172 Fetch mentions for the configured user. 173 174 Args: 175 since_id: Minimum Post ID to include (for getting newer mentions) 176 max_results: Number of results to return (5-100) 177 178 Returns: 179 List of mention objects or None if request failed 180 """ 181 endpoint = f"/users/{self.user_id}/mentions" 182 params = { 183 "max_results": min(max(max_results, 5), 100), # Ensure within API limits 184 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets", 185 "user.fields": "id,name,username", 186 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 187 } 188 189 if since_id: 190 params["since_id"] = since_id 191 192 logger.info(f"Fetching mentions for user {self.user_id}") 193 response = self._make_request(endpoint, params) 194 195 if response: 196 logger.debug(f"X API response: {response}") 197 198 if response and "data" in response: 199 mentions = response["data"] 200 logger.info(f"Retrieved {len(mentions)} mentions") 201 return mentions 202 else: 203 if response: 204 logger.info(f"No mentions in response. Full response: {response}") 205 else: 206 logger.warning("Request failed - no response received") 207 return [] 208 209 def get_user_info(self, user_id: str) -> Optional[Dict]: 210 """Get information about a specific user.""" 211 endpoint = f"/users/{user_id}" 212 params = { 213 "user.fields": "id,name,username,description,public_metrics" 214 } 215 216 response = self._make_request(endpoint, params) 217 return response.get("data") if response else None 218 219 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]: 220 """ 221 Search for mentions using the search endpoint instead of mentions endpoint. 222 This might have better rate limits than the direct mentions endpoint. 223 224 Args: 225 username: Username to search for mentions of (without @) 226 max_results: Number of results to return (10-100) 227 since_id: Only return results newer than this tweet ID 228 229 Returns: 230 List of tweets mentioning the username 231 """ 232 endpoint = "/tweets/search/recent" 233 234 # Search for mentions of the username 235 query = f"@{username}" 236 237 params = { 238 "query": query, 239 "max_results": min(max(max_results, 10), 100), 240 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 241 "user.fields": "id,name,username", 242 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 243 } 244 245 if since_id: 246 params["since_id"] = since_id 247 248 logger.info(f"Searching for mentions of @{username}") 249 response = self._make_request(endpoint, params) 250 251 if response and "data" in response: 252 tweets = response["data"] 253 logger.info(f"Found {len(tweets)} mentions via search") 254 return tweets 255 else: 256 if response: 257 logger.info(f"No mentions found via search. Response: {response}") 258 else: 259 logger.warning("Search request failed") 260 return [] 261 262 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]: 263 """ 264 Get all tweets in a conversation thread up to a specific tweet ID. 265 266 Args: 267 conversation_id: The conversation ID to fetch (should be the original tweet ID) 268 use_cache: Whether to use cached data if available 269 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID) 270 271 Returns: 272 List of tweets in the conversation, ordered chronologically 273 """ 274 # Check cache first if enabled 275 if use_cache: 276 cached_data = get_cached_thread_context(conversation_id) 277 if cached_data: 278 return cached_data 279 280 # First, get the original tweet directly since it might not appear in conversation search 281 logger.debug(f"Getting thread context for conversation {conversation_id}") 282 original_tweet = None 283 try: 284 endpoint = f"/tweets/{conversation_id}" 285 params = { 286 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 287 "user.fields": "id,name,username", 288 "expansions": "author_id" 289 } 290 logger.debug(f"Fetching original tweet: GET {endpoint}") 291 response = self._make_request(endpoint, params) 292 if response and "data" in response: 293 original_tweet = response["data"] 294 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}") 295 except Exception as e: 296 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}") 297 298 # Then search for all tweets in this conversation 299 endpoint = "/tweets/search/recent" 300 params = { 301 "query": f"conversation_id:{conversation_id}", 302 "max_results": 100, # Get as many as possible 303 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 304 "user.fields": "id,name,username", 305 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id", 306 "sort_order": "recency" # Get newest first, we'll reverse later 307 } 308 309 # Add until_id parameter to exclude tweets after the mention being processed 310 if until_id: 311 params["until_id"] = until_id 312 logger.info(f"Using until_id={until_id} to exclude future tweets") 313 314 logger.info(f"Fetching thread context for conversation {conversation_id}") 315 logger.debug(f"Searching conversation: GET {endpoint} with query={params['query']}") 316 response = self._make_request(endpoint, params) 317 318 tweets = [] 319 users_data = {} 320 321 # Collect tweets from search 322 if response and "data" in response: 323 tweets.extend(response["data"]) 324 # Store user data for reference 325 if "includes" in response and "users" in response["includes"]: 326 for user in response["includes"]["users"]: 327 users_data[user["id"]] = user 328 329 # Add original tweet if we got it and it's not already in the list 330 if original_tweet: 331 tweet_ids = [t.get('id') for t in tweets] 332 if original_tweet.get('id') not in tweet_ids: 333 tweets.append(original_tweet) 334 logger.info("Added original tweet to thread context") 335 336 # Attempt to fill gaps by fetching referenced tweets that are missing 337 # This helps with X API's incomplete conversation search results 338 tweet_ids = set(t.get('id') for t in tweets) 339 missing_tweet_ids = set() 340 critical_missing_ids = set() 341 342 # Collect referenced tweet IDs, prioritizing critical ones 343 for tweet in tweets: 344 referenced_tweets = tweet.get('referenced_tweets', []) 345 for ref in referenced_tweets: 346 ref_id = ref.get('id') 347 ref_type = ref.get('type') 348 if ref_id and ref_id not in tweet_ids: 349 missing_tweet_ids.add(ref_id) 350 # Prioritize direct replies and quoted tweets over retweets 351 if ref_type in ['replied_to', 'quoted']: 352 critical_missing_ids.add(ref_id) 353 354 # For rate limit efficiency, only fetch critical missing tweets if we have many 355 if len(missing_tweet_ids) > 10: 356 logger.info(f"Many missing tweets ({len(missing_tweet_ids)}), prioritizing {len(critical_missing_ids)} critical ones") 357 missing_tweet_ids = critical_missing_ids 358 359 # Context sufficiency check - skip backfill if we already have enough context 360 if has_sufficient_context(tweets, missing_tweet_ids): 361 logger.info("Thread has sufficient context, skipping missing tweet backfill") 362 missing_tweet_ids = set() 363 364 # Fetch missing referenced tweets in batches (more rate-limit friendly) 365 if missing_tweet_ids: 366 missing_list = list(missing_tweet_ids) 367 368 # First, check cache for missing tweets 369 cached_tweets = get_cached_tweets(missing_list) 370 for tweet_id, cached_tweet in cached_tweets.items(): 371 if cached_tweet.get('conversation_id') == conversation_id: 372 tweets.append(cached_tweet) 373 tweet_ids.add(tweet_id) 374 logger.info(f"Retrieved missing tweet from cache: {tweet_id}") 375 376 # Add user data if available in cache 377 if cached_tweet.get('author_info'): 378 author_id = cached_tweet.get('author_id') 379 if author_id: 380 users_data[author_id] = cached_tweet['author_info'] 381 382 # Only fetch tweets that weren't found in cache 383 uncached_ids = [tid for tid in missing_list if tid not in cached_tweets] 384 385 if uncached_ids: 386 batch_size = 100 # X API limit for bulk tweet lookup 387 388 for i in range(0, len(uncached_ids), batch_size): 389 batch_ids = uncached_ids[i:i + batch_size] 390 try: 391 endpoint = "/tweets" 392 params = { 393 "ids": ",".join(batch_ids), 394 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 395 "user.fields": "id,name,username", 396 "expansions": "author_id" 397 } 398 logger.debug(f"Batch fetching missing tweets: GET {endpoint} (ids: {len(batch_ids)} tweets)") 399 response = self._make_request(endpoint, params) 400 401 if response and "data" in response: 402 fetched_tweets = [] 403 batch_users_data = {} 404 405 for missing_tweet in response["data"]: 406 # Only add if it's actually part of this conversation 407 if missing_tweet.get('conversation_id') == conversation_id: 408 tweets.append(missing_tweet) 409 tweet_ids.add(missing_tweet.get('id')) 410 fetched_tweets.append(missing_tweet) 411 logger.info(f"Retrieved missing referenced tweet: {missing_tweet.get('id')}") 412 413 # Add user data if available 414 if "includes" in response and "users" in response["includes"]: 415 for user in response["includes"]["users"]: 416 users_data[user["id"]] = user 417 batch_users_data[user["id"]] = user 418 419 # Cache the newly fetched tweets 420 if fetched_tweets: 421 save_cached_tweets(fetched_tweets, batch_users_data) 422 423 logger.info(f"Batch fetched {len(response['data'])} missing tweets from {len(batch_ids)} requested") 424 425 # Handle partial success - log any missing tweets that weren't found 426 if response and "errors" in response: 427 for error in response["errors"]: 428 logger.warning(f"Could not fetch tweet {error.get('resource_id')}: {error.get('title')}") 429 430 except Exception as e: 431 logger.warning(f"Could not fetch batch of missing tweets {batch_ids[:3]}...: {e}") 432 else: 433 logger.info(f"All {len(missing_list)} missing tweets found in cache") 434 435 if tweets: 436 # Filter out tweets that occur after until_id (if specified) 437 if until_id: 438 original_count = len(tweets) 439 # Convert until_id to int for comparison (Twitter IDs are sequential) 440 until_id_int = int(until_id) 441 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int] 442 filtered_count = len(tweets) 443 if original_count != filtered_count: 444 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}") 445 446 # Sort chronologically (oldest first) 447 tweets.sort(key=lambda x: x.get('created_at', '')) 448 logger.info(f"Retrieved {len(tweets)} tweets in thread") 449 450 thread_data = {"tweets": tweets, "users": users_data} 451 452 # Cache individual tweets from the thread for future backfill 453 save_cached_tweets(tweets, users_data) 454 455 # Cache the result 456 if use_cache: 457 save_cached_thread_context(conversation_id, thread_data) 458 459 return thread_data 460 else: 461 logger.warning("No tweets found for thread context") 462 return None 463 464 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]: 465 """ 466 Post a reply to a specific tweet. 467 468 Args: 469 reply_text: The text content of the reply 470 in_reply_to_tweet_id: The ID of the tweet to reply to 471 472 Returns: 473 Response data if successful, None if failed 474 """ 475 endpoint = "/tweets" 476 477 payload = { 478 "text": reply_text, 479 "reply": { 480 "in_reply_to_tweet_id": in_reply_to_tweet_id 481 } 482 } 483 484 logger.info(f"Attempting to post reply with {self.auth_method} authentication") 485 logger.debug(f"Posting reply: POST {endpoint}") 486 result = self._make_request(endpoint, method="POST", data=payload) 487 488 if result: 489 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}") 490 return result 491 else: 492 logger.error("Failed to post reply") 493 return None 494 495 def post_tweet(self, tweet_text: str) -> Optional[Dict]: 496 """ 497 Post a standalone tweet (not a reply). 498 499 Args: 500 tweet_text: The text content of the tweet (max 280 characters) 501 502 Returns: 503 Response data if successful, None if failed 504 """ 505 endpoint = "/tweets" 506 507 # Validate tweet length 508 if len(tweet_text) > 280: 509 logger.error(f"Tweet text too long: {len(tweet_text)} characters (max 280)") 510 return None 511 512 payload = { 513 "text": tweet_text 514 } 515 516 logger.info(f"Attempting to post tweet with {self.auth_method} authentication") 517 logger.debug(f"Posting tweet: POST {endpoint}") 518 result = self._make_request(endpoint, method="POST", data=payload) 519 520 if result: 521 logger.info(f"Successfully posted tweet") 522 return result 523 else: 524 logger.error("Failed to post tweet") 525 return None 526 527 def get_user_info(self, fields: Optional[str] = None) -> Optional[Dict]: 528 """ 529 Get the authenticated user's information, using cached data when available. 530 This reduces API calls significantly since user info rarely changes. 531 532 Args: 533 fields: Optional comma-separated list of user fields to fetch 534 535 Returns: 536 User data dict if successful, None if failed 537 """ 538 # First try to get from cache 539 cached_user_info = get_cached_user_info() 540 if cached_user_info: 541 # Check if cached data has all requested fields 542 requested_fields = set(fields.split(',') if fields else ['id', 'username', 'name']) 543 cached_fields = set(cached_user_info.keys()) 544 if requested_fields.issubset(cached_fields): 545 return cached_user_info 546 547 # Cache miss, expired, or missing requested fields - fetch from API 548 logger.debug("Fetching fresh user info from /users/me API") 549 endpoint = "/users/me" 550 params = {"user.fields": fields or "id,username,name,description"} 551 552 response = self._make_request(endpoint, params=params) 553 if response and "data" in response: 554 user_data = response["data"] 555 # Cache the result for future use 556 save_cached_user_info(user_data) 557 return user_data 558 else: 559 logger.error("Failed to get user info from /users/me API") 560 return None 561 562 def get_username(self) -> Optional[str]: 563 """ 564 Get the authenticated user's username, using cached data when available. 565 This reduces API calls significantly since username rarely changes. 566 567 Returns: 568 Username string if successful, None if failed 569 """ 570 user_info = self.get_user_info("id,username,name") 571 return user_info.get("username") if user_info else None 572 573def load_x_config(config_path: str = "x_config.yaml") -> Dict[str, Any]: 574 """Load complete X configuration from x_config.yaml.""" 575 try: 576 with open(config_path, 'r') as f: 577 config = yaml.safe_load(f) 578 579 if not config: 580 raise ValueError(f"Empty or invalid configuration file: {config_path}") 581 582 # Validate required sections 583 x_config = config.get('x', {}) 584 letta_config = config.get('letta', {}) 585 586 if not x_config.get('api_key') or not x_config.get('user_id'): 587 raise ValueError("X API key and user_id must be configured in x_config.yaml") 588 589 if not letta_config.get('api_key') or not letta_config.get('agent_id'): 590 raise ValueError("Letta API key and agent_id must be configured in x_config.yaml") 591 592 return config 593 except Exception as e: 594 logger.error(f"Failed to load X configuration: {e}") 595 raise 596 597def get_x_letta_config(config_path: str = "x_config.yaml") -> Dict[str, Any]: 598 """Get Letta configuration from X config file.""" 599 config = load_x_config(config_path) 600 return config['letta'] 601 602def create_x_client(config_path: str = "x_config.yaml") -> XClient: 603 """Create and return an X client with configuration loaded from file.""" 604 config = load_x_config(config_path) 605 x_config = config['x'] 606 return XClient( 607 api_key=x_config['api_key'], 608 user_id=x_config['user_id'], 609 access_token=x_config.get('access_token'), 610 consumer_key=x_config.get('consumer_key'), 611 consumer_secret=x_config.get('consumer_secret'), 612 access_token_secret=x_config.get('access_token_secret') 613 ) 614 615def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str: 616 """ 617 Convert a mention object to a YAML string for better AI comprehension. 618 Similar to thread_to_yaml_string in bsky_utils.py 619 """ 620 # Extract relevant fields 621 simplified_mention = { 622 'id': mention.get('id'), 623 'text': mention.get('text'), 624 'author_id': mention.get('author_id'), 625 'created_at': mention.get('created_at'), 626 'in_reply_to_user_id': mention.get('in_reply_to_user_id') 627 } 628 629 # Add user information if available 630 if users_data and mention.get('author_id') in users_data: 631 user = users_data[mention.get('author_id')] 632 simplified_mention['author'] = { 633 'username': user.get('username'), 634 'name': user.get('name') 635 } 636 637 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False) 638 639def thread_to_yaml_string(thread_data: Dict) -> str: 640 """ 641 Convert X thread context to YAML string for AI comprehension. 642 Similar to Bluesky's thread_to_yaml_string function. 643 644 Args: 645 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 646 647 Returns: 648 YAML string representation of the thread 649 """ 650 if not thread_data or "tweets" not in thread_data: 651 return "conversation: []\n" 652 653 tweets = thread_data["tweets"] 654 users_data = thread_data.get("users", {}) 655 656 simplified_thread = { 657 "conversation": [] 658 } 659 660 for tweet in tweets: 661 # Get user info 662 author_id = tweet.get('author_id') 663 author_info = {} 664 if author_id and author_id in users_data: 665 user = users_data[author_id] 666 author_info = { 667 'username': user.get('username'), 668 'name': user.get('name') 669 } 670 671 # Build tweet object (simplified for AI consumption) 672 tweet_obj = { 673 'text': tweet.get('text'), 674 'created_at': tweet.get('created_at'), 675 'author': author_info, 676 'author_id': author_id # Include user ID for block management 677 } 678 679 simplified_thread["conversation"].append(tweet_obj) 680 681 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False) 682 683 684def ensure_x_user_blocks_attached(thread_data: Dict, agent_id: Optional[str] = None) -> None: 685 """ 686 Ensure all users in the thread have their X user blocks attached. 687 Creates blocks with initial content including their handle if they don't exist. 688 689 Args: 690 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context() 691 agent_id: The Letta agent ID to attach blocks to (defaults to config agent_id) 692 """ 693 if not thread_data or "users" not in thread_data: 694 return 695 696 try: 697 from tools.blocks import attach_x_user_blocks, x_user_note_set 698 from letta_client import Letta 699 700 # Get Letta client and agent_id from X config 701 config = get_x_letta_config() 702 client = Letta(token=config['api_key'], timeout=config['timeout']) 703 704 # Use provided agent_id or get from config 705 if agent_id is None: 706 agent_id = config['agent_id'] 707 708 # Get agent info to create a mock agent_state for the functions 709 class MockAgentState: 710 def __init__(self, agent_id): 711 self.id = agent_id 712 713 agent_state = MockAgentState(agent_id) 714 715 users_data = thread_data["users"] 716 user_ids = list(users_data.keys()) 717 718 if not user_ids: 719 return 720 721 logger.info(f"Ensuring X user blocks for {len(user_ids)} users: {user_ids}") 722 723 # Get current blocks to check which users already have blocks with content 724 current_blocks = client.agents.blocks.list(agent_id=agent_id) 725 existing_user_blocks = {} 726 727 for block in current_blocks: 728 if block.label.startswith("x_user_"): 729 user_id = block.label.replace("x_user_", "") 730 existing_user_blocks[user_id] = block 731 732 # Attach all user blocks (this will create missing ones with basic content) 733 attach_result = attach_x_user_blocks(user_ids, agent_state) 734 logger.info(f"X user block attachment result: {attach_result}") 735 736 # For newly created blocks, update with user handle information 737 for user_id in user_ids: 738 if user_id not in existing_user_blocks: 739 user_info = users_data[user_id] 740 username = user_info.get('username', 'unknown') 741 name = user_info.get('name', 'Unknown') 742 743 # Set initial content with handle information 744 initial_content = f"# X User: {user_id}\n\n**Handle:** @{username}\n**Name:** {name}\n\nNo additional information about this user yet." 745 746 try: 747 x_user_note_set(user_id, initial_content, agent_state) 748 logger.info(f"Set initial content for X user {user_id} (@{username})") 749 except Exception as e: 750 logger.error(f"Failed to set initial content for X user {user_id}: {e}") 751 752 except Exception as e: 753 logger.error(f"Error ensuring X user blocks: {e}") 754 755 756# X Caching and Queue System Functions 757 758def load_last_seen_id() -> Optional[str]: 759 """Load the last seen mention ID for incremental fetching.""" 760 if X_LAST_SEEN_FILE.exists(): 761 try: 762 with open(X_LAST_SEEN_FILE, 'r') as f: 763 data = json.load(f) 764 return data.get('last_seen_id') 765 except Exception as e: 766 logger.error(f"Error loading last seen ID: {e}") 767 return None 768 769def save_last_seen_id(mention_id: str): 770 """Save the last seen mention ID.""" 771 try: 772 X_QUEUE_DIR.mkdir(exist_ok=True) 773 with open(X_LAST_SEEN_FILE, 'w') as f: 774 json.dump({ 775 'last_seen_id': mention_id, 776 'updated_at': datetime.now().isoformat() 777 }, f) 778 logger.debug(f"Saved last seen ID: {mention_id}") 779 except Exception as e: 780 logger.error(f"Error saving last seen ID: {e}") 781 782def load_processed_mentions() -> set: 783 """Load the set of processed mention IDs.""" 784 if X_PROCESSED_MENTIONS_FILE.exists(): 785 try: 786 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f: 787 data = json.load(f) 788 # Keep only recent entries (last 10000) 789 if len(data) > 10000: 790 data = data[-10000:] 791 save_processed_mentions(set(data)) 792 return set(data) 793 except Exception as e: 794 logger.error(f"Error loading processed mentions: {e}") 795 return set() 796 797def save_processed_mentions(processed_set: set): 798 """Save the set of processed mention IDs.""" 799 try: 800 X_QUEUE_DIR.mkdir(exist_ok=True) 801 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f: 802 json.dump(list(processed_set), f) 803 except Exception as e: 804 logger.error(f"Error saving processed mentions: {e}") 805 806def load_downrank_users() -> Set[str]: 807 """Load the set of user IDs that should be downranked (responded to 10% of the time).""" 808 try: 809 if not X_DOWNRANK_USERS_FILE.exists(): 810 return set() 811 812 downrank_users = set() 813 with open(X_DOWNRANK_USERS_FILE, 'r') as f: 814 for line in f: 815 line = line.strip() 816 # Skip empty lines and comments 817 if line and not line.startswith('#'): 818 downrank_users.add(line) 819 820 logger.info(f"Loaded {len(downrank_users)} downrank users") 821 return downrank_users 822 except Exception as e: 823 logger.error(f"Error loading downrank users: {e}") 824 return set() 825 826def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool: 827 """ 828 Check if we should respond to a downranked user. 829 Returns True 10% of the time for downranked users, True 100% of the time for others. 830 """ 831 if user_id not in downrank_users: 832 return True 833 834 # 10% chance for downranked users 835 should_respond = random.random() < 0.1 836 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)") 837 return should_respond 838 839def save_mention_to_queue(mention: Dict): 840 """Save a mention to the queue directory for async processing.""" 841 try: 842 mention_id = mention.get('id') 843 if not mention_id: 844 logger.error("Mention missing ID, cannot queue") 845 return 846 847 # Check if already processed 848 processed_mentions = load_processed_mentions() 849 if mention_id in processed_mentions: 850 logger.debug(f"Mention {mention_id} already processed, skipping") 851 return 852 853 # Create queue directory 854 X_QUEUE_DIR.mkdir(exist_ok=True) 855 856 # Create filename using hash (similar to Bluesky system) 857 mention_str = json.dumps(mention, sort_keys=True) 858 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16] 859 filename = f"x_mention_{mention_hash}.json" 860 861 queue_file = X_QUEUE_DIR / filename 862 863 # Save mention data with enhanced debugging information 864 mention_data = { 865 'mention': mention, 866 'queued_at': datetime.now().isoformat(), 867 'type': 'x_mention', 868 # Debug info for conversation tracking 869 'debug_info': { 870 'mention_id': mention.get('id'), 871 'author_id': mention.get('author_id'), 872 'conversation_id': mention.get('conversation_id'), 873 'in_reply_to_user_id': mention.get('in_reply_to_user_id'), 874 'referenced_tweets': mention.get('referenced_tweets', []), 875 'text_preview': mention.get('text', '')[:200], 876 'created_at': mention.get('created_at'), 877 'public_metrics': mention.get('public_metrics', {}), 878 'context_annotations': mention.get('context_annotations', []) 879 } 880 } 881 882 with open(queue_file, 'w') as f: 883 json.dump(mention_data, f, indent=2) 884 885 logger.info(f"Queued X mention {mention_id} -> {filename}") 886 887 except Exception as e: 888 logger.error(f"Error saving mention to queue: {e}") 889 890# X Cache Functions 891def get_cached_thread_context(conversation_id: str) -> Optional[Dict]: 892 """Load cached thread context if available.""" 893 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 894 if cache_file.exists(): 895 try: 896 with open(cache_file, 'r') as f: 897 cached_data = json.load(f) 898 # Check if cache is recent (within 1 hour) 899 from datetime import datetime, timedelta 900 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 901 if datetime.now() - cached_time < timedelta(hours=1): 902 logger.info(f"Using cached thread context for {conversation_id}") 903 return cached_data.get('thread_data') 904 except Exception as e: 905 logger.warning(f"Error loading cached thread context: {e}") 906 return None 907 908def save_cached_thread_context(conversation_id: str, thread_data: Dict): 909 """Save thread context to cache.""" 910 try: 911 X_CACHE_DIR.mkdir(exist_ok=True) 912 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json" 913 914 cache_data = { 915 'conversation_id': conversation_id, 916 'thread_data': thread_data, 917 'cached_at': datetime.now().isoformat() 918 } 919 920 with open(cache_file, 'w') as f: 921 json.dump(cache_data, f, indent=2) 922 923 logger.debug(f"Cached thread context for {conversation_id}") 924 except Exception as e: 925 logger.error(f"Error caching thread context: {e}") 926 927def get_cached_tweets(tweet_ids: List[str]) -> Dict[str, Dict]: 928 """ 929 Load cached individual tweets if available. 930 Returns dict mapping tweet_id -> tweet_data for found tweets. 931 """ 932 cached_tweets = {} 933 934 for tweet_id in tweet_ids: 935 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 936 if cache_file.exists(): 937 try: 938 with open(cache_file, 'r') as f: 939 cached_data = json.load(f) 940 941 # Use longer cache times for older tweets (24 hours vs 1 hour) 942 from datetime import datetime, timedelta 943 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 944 tweet_created = cached_data.get('tweet_data', {}).get('created_at', '') 945 946 # Parse tweet creation time to determine age 947 try: 948 from dateutil.parser import parse 949 tweet_age = datetime.now() - parse(tweet_created) 950 cache_duration = timedelta(hours=24) if tweet_age > timedelta(hours=24) else timedelta(hours=1) 951 except: 952 cache_duration = timedelta(hours=1) # Default to 1 hour if parsing fails 953 954 if datetime.now() - cached_time < cache_duration: 955 cached_tweets[tweet_id] = cached_data.get('tweet_data') 956 logger.debug(f"Using cached tweet {tweet_id}") 957 958 except Exception as e: 959 logger.warning(f"Error loading cached tweet {tweet_id}: {e}") 960 961 return cached_tweets 962 963def save_cached_tweets(tweets_data: List[Dict], users_data: Dict[str, Dict] = None): 964 """Save individual tweets to cache for future reuse.""" 965 try: 966 X_CACHE_DIR.mkdir(exist_ok=True) 967 968 for tweet in tweets_data: 969 tweet_id = tweet.get('id') 970 if not tweet_id: 971 continue 972 973 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json" 974 975 # Include user data if available 976 tweet_with_user = tweet.copy() 977 if users_data and tweet.get('author_id') in users_data: 978 tweet_with_user['author_info'] = users_data[tweet.get('author_id')] 979 980 cache_data = { 981 'tweet_id': tweet_id, 982 'tweet_data': tweet_with_user, 983 'cached_at': datetime.now().isoformat() 984 } 985 986 with open(cache_file, 'w') as f: 987 json.dump(cache_data, f, indent=2) 988 989 logger.debug(f"Cached individual tweet {tweet_id}") 990 991 except Exception as e: 992 logger.error(f"Error caching individual tweets: {e}") 993 994def get_cached_user_info() -> Optional[Dict]: 995 """Load cached user info if available and not expired.""" 996 cache_file = X_CACHE_DIR / "user_info.json" 997 if cache_file.exists(): 998 try: 999 with open(cache_file, 'r') as f: 1000 cached_data = json.load(f) 1001 # Check if cache is recent (within 24 hours) 1002 from datetime import datetime, timedelta 1003 cached_time = datetime.fromisoformat(cached_data.get('cached_at', '')) 1004 if datetime.now() - cached_time < timedelta(hours=24): 1005 logger.debug("Using cached user info") 1006 return cached_data.get('data') 1007 else: 1008 logger.debug("Cached user info expired (>24 hours old)") 1009 except Exception as e: 1010 logger.warning(f"Error loading cached user info: {e}") 1011 return None 1012 1013def save_cached_user_info(user_data: Dict): 1014 """Save user info to cache.""" 1015 try: 1016 X_CACHE_DIR.mkdir(exist_ok=True) 1017 cache_file = X_CACHE_DIR / "user_info.json" 1018 1019 from datetime import datetime 1020 cache_data = { 1021 'data': user_data, 1022 'cached_at': datetime.now().isoformat() 1023 } 1024 1025 with open(cache_file, 'w') as f: 1026 json.dump(cache_data, f, indent=2) 1027 1028 logger.debug(f"Cached user info: {user_data.get('username')}") 1029 1030 except Exception as e: 1031 logger.error(f"Error caching user info: {e}") 1032 1033def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool: 1034 """ 1035 Determine if we have sufficient context to skip backfilling missing tweets. 1036 1037 Args: 1038 tweets: List of tweets already in the thread 1039 missing_tweet_ids: Set of missing tweet IDs we'd like to fetch 1040 1041 Returns: 1042 True if context is sufficient, False if backfill is needed 1043 """ 1044 # If no missing tweets, context is sufficient 1045 if not missing_tweet_ids: 1046 return True 1047 1048 # If we have a substantial conversation (5+ tweets), likely sufficient 1049 if len(tweets) >= 5: 1050 logger.debug(f"Thread has {len(tweets)} tweets, considering sufficient") 1051 return True 1052 1053 # If only a few missing tweets and we have some context, might be enough 1054 if len(missing_tweet_ids) <= 2 and len(tweets) >= 3: 1055 logger.debug(f"Only {len(missing_tweet_ids)} missing tweets with {len(tweets)} existing, considering sufficient") 1056 return True 1057 1058 # Check if we have conversational flow (mentions between users) 1059 has_conversation_flow = False 1060 for tweet in tweets: 1061 text = tweet.get('text', '').lower() 1062 # Look for mentions, replies, or conversational indicators 1063 if '@' in text or 'reply' in text or len([t for t in tweets if t.get('author_id') != tweet.get('author_id')]) > 1: 1064 has_conversation_flow = True 1065 break 1066 1067 # If we have clear conversational flow and reasonable length, sufficient 1068 if has_conversation_flow and len(tweets) >= 2: 1069 logger.debug("Thread has conversational flow, considering sufficient") 1070 return True 1071 1072 # Otherwise, we need to backfill 1073 logger.debug(f"Context insufficient: {len(tweets)} tweets, {len(missing_tweet_ids)} missing, no clear flow") 1074 return False 1075 1076def fetch_and_queue_mentions(username: str) -> int: 1077 """ 1078 Single-pass function to fetch new mentions and queue them. 1079 Returns number of new mentions found. 1080 """ 1081 try: 1082 client = create_x_client() 1083 1084 # Load last seen ID for incremental fetching 1085 last_seen_id = load_last_seen_id() 1086 1087 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}") 1088 1089 # Search for mentions - this calls GET /2/tweets/search/recent 1090 logger.debug(f"Calling search_mentions API for @{username}") 1091 mentions = client.search_mentions( 1092 username=username, 1093 since_id=last_seen_id, 1094 max_results=100 # Get as many as possible 1095 ) 1096 1097 if not mentions: 1098 logger.info("No new mentions found") 1099 return 0 1100 1101 # Process mentions (newest first, so reverse to process oldest first) 1102 mentions.reverse() 1103 new_count = 0 1104 1105 for mention in mentions: 1106 save_mention_to_queue(mention) 1107 new_count += 1 1108 1109 # Update last seen ID to the most recent mention 1110 if mentions: 1111 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent 1112 save_last_seen_id(most_recent_id) 1113 1114 logger.info(f"Queued {new_count} new X mentions") 1115 return new_count 1116 1117 except Exception as e: 1118 logger.error(f"Error fetching and queuing mentions: {e}") 1119 return 0 1120 1121# Simple test function 1122def get_my_user_info(): 1123 """Get the authenticated user's information to find correct user ID.""" 1124 try: 1125 client = create_x_client() 1126 1127 # Get authenticated user info using cached method 1128 print("Fetching authenticated user information...") 1129 user_data = client.get_user_info("id,name,username,description") 1130 1131 if user_data: 1132 print(f"✅ Found authenticated user:") 1133 print(f" ID: {user_data.get('id')}") 1134 print(f" Username: @{user_data.get('username')}") 1135 print(f" Name: {user_data.get('name')}") 1136 print(f" Description: {user_data.get('description', 'N/A')[:100]}...") 1137 print(f"\n🔧 Update your x_config.yaml with:") 1138 print(f" user_id: \"{user_data.get('id')}\"") 1139 return user_data 1140 else: 1141 print("❌ Failed to get user information") 1142 print(f"Response: {response}") 1143 return None 1144 1145 except Exception as e: 1146 print(f"Error getting user info: {e}") 1147 return None 1148 1149def test_search_mentions(): 1150 """Test the search-based mention detection.""" 1151 try: 1152 client = create_x_client() 1153 1154 # First get our username 1155 username = client.get_username() 1156 if not username: 1157 print("❌ Could not get username") 1158 return 1159 print(f"🔍 Searching for mentions of @{username}") 1160 1161 mentions = client.search_mentions(username, max_results=5) 1162 1163 if mentions: 1164 print(f"✅ Found {len(mentions)} mentions via search:") 1165 for mention in mentions: 1166 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...") 1167 else: 1168 print("No mentions found via search") 1169 1170 except Exception as e: 1171 print(f"Search test failed: {e}") 1172 1173def test_fetch_and_queue(): 1174 """Test the single-pass fetch and queue function.""" 1175 try: 1176 client = create_x_client() 1177 1178 # Get our username 1179 username = client.get_username() 1180 if not username: 1181 print("❌ Could not get username") 1182 return 1183 print(f"🔄 Fetching and queueing mentions for @{username}") 1184 1185 # Show current state 1186 last_seen = load_last_seen_id() 1187 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}") 1188 1189 # Fetch and queue 1190 new_count = fetch_and_queue_mentions(username) 1191 1192 if new_count > 0: 1193 print(f"✅ Queued {new_count} new mentions") 1194 print(f"📁 Check ./x_queue/ directory for queued mentions") 1195 1196 # Show updated state 1197 new_last_seen = load_last_seen_id() 1198 print(f"📍 Updated last seen ID: {new_last_seen}") 1199 else: 1200 print("ℹ️ No new mentions to queue") 1201 1202 except Exception as e: 1203 print(f"Fetch and queue test failed: {e}") 1204 1205def test_thread_context(): 1206 """Test thread context retrieval from a queued mention.""" 1207 try: 1208 import json 1209 1210 # Find a queued mention file 1211 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1212 if not queue_files: 1213 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1214 return 1215 1216 # Read the first mention 1217 mention_file = queue_files[0] 1218 with open(mention_file, 'r') as f: 1219 mention_data = json.load(f) 1220 1221 mention = mention_data['mention'] 1222 print(f"📄 Using mention: {mention.get('id')}") 1223 print(f"📝 Text: {mention.get('text')}") 1224 1225 # Check if it has a conversation_id 1226 conversation_id = mention.get('conversation_id') 1227 if not conversation_id: 1228 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.") 1229 return 1230 1231 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1232 1233 # Get thread context 1234 client = create_x_client() 1235 thread_data = client.get_thread_context(conversation_id) 1236 1237 if thread_data: 1238 tweets = thread_data.get('tweets', []) 1239 print(f"✅ Retrieved thread with {len(tweets)} tweets") 1240 1241 # Convert to YAML 1242 yaml_thread = thread_to_yaml_string(thread_data) 1243 1244 # Save thread context for inspection 1245 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml" 1246 with open(thread_file, 'w') as f: 1247 f.write(yaml_thread) 1248 1249 print(f"💾 Saved thread context to: {thread_file}") 1250 print("\n📋 Thread preview:") 1251 print(yaml_thread) 1252 else: 1253 print("❌ Failed to retrieve thread context") 1254 1255 except Exception as e: 1256 print(f"Thread context test failed: {e}") 1257 1258def test_letta_integration(agent_id: str = None): 1259 """Test sending X thread context to Letta agent.""" 1260 try: 1261 from letta_client import Letta 1262 import json 1263 import yaml 1264 1265 # Load X config to access letta section 1266 try: 1267 x_config = load_x_config() 1268 letta_config = x_config.get('letta', {}) 1269 api_key = letta_config.get('api_key') 1270 config_agent_id = letta_config.get('agent_id') 1271 1272 # Use agent_id from config if not provided as parameter 1273 if not agent_id: 1274 if config_agent_id: 1275 agent_id = config_agent_id 1276 print(f"ℹ️ Using agent_id from config: {agent_id}") 1277 else: 1278 print("❌ No agent_id found in x_config.yaml") 1279 print("Expected config structure:") 1280 print(" letta:") 1281 print(" agent_id: your-agent-id") 1282 return 1283 else: 1284 print(f"ℹ️ Using provided agent_id: {agent_id}") 1285 1286 if not api_key: 1287 # Try loading from environment as fallback 1288 import os 1289 api_key = os.getenv('LETTA_API_KEY') 1290 if not api_key: 1291 print("❌ LETTA_API_KEY not found in x_config.yaml or environment") 1292 print("Expected config structure:") 1293 print(" letta:") 1294 print(" api_key: your-letta-api-key") 1295 return 1296 else: 1297 print("ℹ️ Using LETTA_API_KEY from environment") 1298 else: 1299 print("ℹ️ Using LETTA_API_KEY from x_config.yaml") 1300 1301 except Exception as e: 1302 print(f"❌ Error loading config: {e}") 1303 return 1304 1305 letta_client = Letta(token=api_key, timeout=600) 1306 print(f"🤖 Connected to Letta, using agent: {agent_id}") 1307 1308 # Find a queued mention file 1309 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1310 if not queue_files: 1311 print("❌ No queued mentions found. Run 'python x.py queue' first.") 1312 return 1313 1314 # Read the first mention 1315 mention_file = queue_files[0] 1316 with open(mention_file, 'r') as f: 1317 mention_data = json.load(f) 1318 1319 mention = mention_data['mention'] 1320 conversation_id = mention.get('conversation_id') 1321 1322 if not conversation_id: 1323 print("❌ No conversation_id found in mention.") 1324 return 1325 1326 print(f"🧵 Getting thread context for conversation: {conversation_id}") 1327 1328 # Get thread context 1329 x_client = create_x_client() 1330 thread_data = x_client.get_thread_context(conversation_id) 1331 1332 if not thread_data: 1333 print("❌ Failed to retrieve thread context") 1334 return 1335 1336 # Convert to YAML 1337 yaml_thread = thread_to_yaml_string(thread_data) 1338 1339 # Create prompt for the agent 1340 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately. 1341 1342Here is the thread context: 1343 1344{yaml_thread} 1345 1346Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona.""" 1347 1348 prompt_char_count = len(prompt) 1349 print(f"📤 Sending thread context to Letta agent... | prompt: {prompt_char_count} chars") 1350 1351 # Print the prompt in a rich panel 1352 rprint(Panel(prompt, title=f"Prompt ({prompt_char_count} chars)", border_style="blue")) 1353 1354 # Send to Letta agent using streaming 1355 message_stream = letta_client.agents.messages.create_stream( 1356 agent_id=agent_id, 1357 messages=[{"role": "user", "content": prompt}], 1358 stream_tokens=False, 1359 max_steps=10 1360 ) 1361 1362 print("🔄 Streaming response from agent...") 1363 response_text = "" 1364 1365 for chunk in message_stream: 1366 print(chunk) 1367 if hasattr(chunk, 'message_type'): 1368 if chunk.message_type == 'assistant_message': 1369 print(f"🤖 Agent response: {chunk.content}") 1370 response_text = chunk.content 1371 elif chunk.message_type == 'reasoning_message': 1372 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...") 1373 elif chunk.message_type == 'tool_call_message': 1374 print(f"🔧 Agent tool call: {chunk.tool_call.name}") 1375 1376 if response_text: 1377 print(f"\n✅ Agent generated response:") 1378 print(f"📝 Response: {response_text}") 1379 else: 1380 print("❌ No response generated by agent") 1381 1382 except Exception as e: 1383 print(f"Letta integration test failed: {e}") 1384 import traceback 1385 traceback.print_exc() 1386 1387def test_x_client(): 1388 """Test the X client by fetching mentions.""" 1389 try: 1390 client = create_x_client() 1391 mentions = client.get_mentions(max_results=5) 1392 1393 if mentions: 1394 print(f"Successfully retrieved {len(mentions)} mentions:") 1395 for mention in mentions: 1396 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...") 1397 else: 1398 print("No mentions retrieved") 1399 1400 except Exception as e: 1401 print(f"Test failed: {e}") 1402 1403def reply_to_cameron_post(): 1404 """ 1405 Reply to Cameron's specific X post. 1406 1407 NOTE: This requires OAuth User Context authentication, not Bearer token. 1408 Current Bearer token is Application-Only which can't post. 1409 """ 1410 try: 1411 client = create_x_client() 1412 1413 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618 1414 cameron_post_id = "1950690566909710618" 1415 1416 # Simple reply message 1417 reply_text = "Hello from void! 🤖 Testing X integration." 1418 1419 print(f"Attempting to reply to post {cameron_post_id}") 1420 print(f"Reply text: {reply_text}") 1421 print("\nNOTE: This will fail with current Bearer token (Application-Only)") 1422 print("Posting requires OAuth User Context authentication") 1423 1424 result = client.post_reply(reply_text, cameron_post_id) 1425 1426 if result: 1427 print(f"✅ Successfully posted reply!") 1428 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}") 1429 else: 1430 print("❌ Failed to post reply (expected with current auth)") 1431 1432 except Exception as e: 1433 print(f"Reply failed: {e}") 1434 1435def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False): 1436 """ 1437 Process an X mention and generate a reply using the Letta agent. 1438 Similar to bsky.py process_mention but for X/Twitter. 1439 1440 Args: 1441 void_agent: The Letta agent instance 1442 x_client: The X API client 1443 mention_data: The mention data dictionary 1444 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 1445 testing_mode: If True, don't actually post to X 1446 1447 Returns: 1448 True: Successfully processed, remove from queue 1449 False: Failed but retryable, keep in queue 1450 None: Failed with non-retryable error, move to errors directory 1451 "no_reply": No reply was generated, move to no_reply directory 1452 """ 1453 try: 1454 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}") 1455 1456 # Extract mention details 1457 if isinstance(mention_data, dict): 1458 # Handle both raw mention and queued mention formats 1459 if 'mention' in mention_data: 1460 mention = mention_data['mention'] 1461 else: 1462 mention = mention_data 1463 else: 1464 mention = mention_data 1465 1466 mention_id = mention.get('id') 1467 mention_text = mention.get('text', '') 1468 author_id = mention.get('author_id') 1469 conversation_id = mention.get('conversation_id') 1470 in_reply_to_user_id = mention.get('in_reply_to_user_id') 1471 referenced_tweets = mention.get('referenced_tweets', []) 1472 1473 # Check downrank list - only respond to downranked users 10% of the time 1474 downrank_users = load_downrank_users() 1475 if not should_respond_to_downranked_user(str(author_id), downrank_users): 1476 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection") 1477 return "no_reply" 1478 1479 # Enhanced conversation tracking for debug - especially important for Grok handling 1480 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}") 1481 logger.info(f" Author ID: {author_id}") 1482 logger.info(f" Conversation ID: {conversation_id}") 1483 logger.info(f" In Reply To User ID: {in_reply_to_user_id}") 1484 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items") 1485 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets 1486 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}") 1487 logger.info(f" Text preview: {mention_text[:100]}...") 1488 1489 if not conversation_id: 1490 logger.warning(f"❌ No conversation_id found for mention {mention_id} - this may cause thread context issues") 1491 return None 1492 1493 # Get thread context with caching enabled for efficiency 1494 # Use mention_id as until_id to exclude tweets that occurred after this mention 1495 try: 1496 thread_data = x_client.get_thread_context(conversation_id, use_cache=True, until_id=mention_id) 1497 if not thread_data: 1498 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}") 1499 return False 1500 1501 # If this mention references a specific tweet, ensure we have that tweet in context 1502 if referenced_tweets: 1503 for ref in referenced_tweets: 1504 if ref.get('type') == 'replied_to': 1505 ref_id = ref.get('id') 1506 # Check if the referenced tweet is in our thread data 1507 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])] 1508 if ref_id and ref_id not in thread_tweet_ids: 1509 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch") 1510 try: 1511 # Fetch the missing referenced tweet directly 1512 endpoint = f"/tweets/{ref_id}" 1513 params = { 1514 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id", 1515 "user.fields": "id,name,username", 1516 "expansions": "author_id" 1517 } 1518 logger.debug(f"Fetching individual missing tweet: GET {endpoint}") 1519 response = x_client._make_request(endpoint, params) 1520 if response and "data" in response: 1521 missing_tweet = response["data"] 1522 if missing_tweet.get('conversation_id') == conversation_id: 1523 # Add to thread data 1524 if 'tweets' not in thread_data: 1525 thread_data['tweets'] = [] 1526 thread_data['tweets'].append(missing_tweet) 1527 1528 # Add user data if available 1529 if "includes" in response and "users" in response["includes"]: 1530 if 'users' not in thread_data: 1531 thread_data['users'] = {} 1532 for user in response["includes"]["users"]: 1533 thread_data['users'][user["id"]] = user 1534 1535 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context") 1536 else: 1537 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}") 1538 except Exception as e: 1539 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}") 1540 1541 # Enhanced thread context debugging 1542 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}") 1543 thread_posts = thread_data.get('tweets', []) 1544 thread_users = thread_data.get('users', {}) 1545 logger.info(f" Posts in thread: {len(thread_posts)}") 1546 logger.info(f" Users in thread: {len(thread_users)}") 1547 1548 # Log thread participants for Grok detection 1549 for user_id, user_info in thread_users.items(): 1550 username = user_info.get('username', 'unknown') 1551 name = user_info.get('name', 'Unknown') 1552 is_verified = user_info.get('verified', False) 1553 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}") 1554 1555 # Special logging for Grok or AI-related users 1556 if 'grok' in username.lower() or 'grok' in name.lower(): 1557 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})") 1558 1559 # Log conversation structure 1560 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts 1561 post_id = post.get('id') 1562 post_author = post.get('author_id') 1563 post_text = post.get('text', '')[:50] 1564 is_reply = 'in_reply_to_user_id' in post 1565 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...") 1566 1567 except Exception as e: 1568 logger.error(f"❌ Error getting thread context: {e}") 1569 return False 1570 1571 # Convert to YAML string 1572 thread_context = thread_to_yaml_string(thread_data) 1573 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters") 1574 1575 # Save comprehensive conversation data for debugging 1576 try: 1577 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1578 debug_dir.mkdir(parents=True, exist_ok=True) 1579 1580 # Save raw thread data (JSON) 1581 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f: 1582 json.dump(thread_data, f, indent=2) 1583 1584 # Save YAML thread context 1585 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f: 1586 f.write(thread_context) 1587 1588 # Save mention processing debug info 1589 debug_info = { 1590 'processed_at': datetime.now().isoformat(), 1591 'mention_id': mention_id, 1592 'conversation_id': conversation_id, 1593 'author_id': author_id, 1594 'in_reply_to_user_id': in_reply_to_user_id, 1595 'referenced_tweets': referenced_tweets, 1596 'thread_stats': { 1597 'total_posts': len(thread_posts), 1598 'total_users': len(thread_users), 1599 'yaml_length': len(thread_context) 1600 }, 1601 'users_in_conversation': { 1602 user_id: { 1603 'username': user_info.get('username'), 1604 'name': user_info.get('name'), 1605 'verified': user_info.get('verified', False), 1606 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower() 1607 } 1608 for user_id, user_info in thread_users.items() 1609 } 1610 } 1611 1612 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f: 1613 json.dump(debug_info, f, indent=2) 1614 1615 logger.info(f"💾 Saved conversation debug data to: {debug_dir}") 1616 1617 except Exception as debug_error: 1618 logger.warning(f"Failed to save debug data: {debug_error}") 1619 # Continue processing even if debug save fails 1620 1621 # Check for #voidstop 1622 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower(): 1623 logger.info("Found #voidstop, skipping this mention") 1624 return True 1625 1626 # Ensure X user blocks are attached 1627 try: 1628 ensure_x_user_blocks_attached(thread_data, void_agent.id) 1629 except Exception as e: 1630 logger.warning(f"Failed to ensure X user blocks: {e}") 1631 # Continue without user blocks rather than failing completely 1632 1633 # Create prompt for Letta agent 1634 author_info = thread_data.get('users', {}).get(author_id, {}) 1635 author_username = author_info.get('username', 'unknown') 1636 author_name = author_info.get('name', author_username) 1637 1638 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}). 1639 1640MOST RECENT POST (the mention you're responding to): 1641"{mention_text}" 1642 1643FULL THREAD CONTEXT: 1644```yaml 1645{thread_context} 1646``` 1647 1648The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 1649 1650If you need to update user information, use the x_user_* tools. 1651 1652To reply, use the add_post_to_x_thread tool: 1653- Each call creates one post (max 280 characters) 1654- For most responses, a single call is sufficient 1655- Only use multiple calls for threaded replies when: 1656 * The topic requires extended explanation that cannot fit in 280 characters 1657 * You're explicitly asked for a detailed/long response 1658 * The conversation naturally benefits from a structured multi-part answer 1659- Avoid unnecessary threads - be concise when possible""" 1660 1661 # Log mention processing 1662 title = f"X MENTION FROM @{author_username}" 1663 print(f"\n{title}") 1664 print(f" {'' * len(title)}") 1665 for line in mention_text.split('\n'): 1666 print(f" {line}") 1667 1668 # Send to Letta agent 1669 from letta_client import Letta 1670 1671 config = get_x_letta_config() 1672 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 1673 1674 prompt_char_count = len(prompt) 1675 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars | prompt: {prompt_char_count} chars") 1676 1677 try: 1678 # Use streaming to avoid timeout errors 1679 message_stream = letta_client.agents.messages.create_stream( 1680 agent_id=void_agent.id, 1681 messages=[{"role": "user", "content": prompt}], 1682 stream_tokens=False, 1683 max_steps=100 1684 ) 1685 1686 # Collect streaming response (simplified version of bsky.py logic) 1687 all_messages = [] 1688 for chunk in message_stream: 1689 if hasattr(chunk, 'message_type'): 1690 if chunk.message_type == 'reasoning_message': 1691 print("\n◆ Reasoning") 1692 print(" ─────────") 1693 for line in chunk.reasoning.split('\n'): 1694 print(f" {line}") 1695 elif chunk.message_type == 'tool_call_message': 1696 tool_name = chunk.tool_call.name 1697 if tool_name == 'add_post_to_x_thread': 1698 try: 1699 args = json.loads(chunk.tool_call.arguments) 1700 text = args.get('text', '') 1701 if text: 1702 print("\n✎ X Post") 1703 print(" ────────") 1704 for line in text.split('\n'): 1705 print(f" {line}") 1706 except: 1707 pass 1708 elif tool_name == 'halt_activity': 1709 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT") 1710 if queue_filepath and queue_filepath.exists(): 1711 queue_filepath.unlink() 1712 logger.info(f"Deleted queue file: {queue_filepath.name}") 1713 logger.info("=== X BOT TERMINATED BY AGENT ===") 1714 exit(0) 1715 elif chunk.message_type == 'tool_return_message': 1716 tool_name = chunk.name 1717 status = chunk.status 1718 if status == 'success' and tool_name == 'add_post_to_x_thread': 1719 print("\n✓ X Post Queued") 1720 print(" ──────────────") 1721 print(" Post queued successfully") 1722 elif chunk.message_type == 'assistant_message': 1723 print("\n▶ Assistant Response") 1724 print(" ──────────────────") 1725 for line in chunk.content.split('\n'): 1726 print(f" {line}") 1727 1728 all_messages.append(chunk) 1729 if str(chunk) == 'done': 1730 break 1731 1732 # Convert streaming response for compatibility 1733 message_response = type('StreamingResponse', (), { 1734 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 1735 })() 1736 1737 except Exception as api_error: 1738 logger.error(f"Letta API error: {api_error}") 1739 raise 1740 1741 # Extract successful add_post_to_x_thread tool calls 1742 reply_candidates = [] 1743 tool_call_results = {} 1744 ignored_notification = False 1745 ack_note = None # Track any note from annotate_ack tool 1746 1747 # First pass: collect tool return statuses 1748 for message in message_response.messages: 1749 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 1750 if message.name == 'add_post_to_x_thread': 1751 tool_call_results[message.tool_call_id] = message.status 1752 elif message.name == 'ignore_notification': 1753 if message.status == 'success': 1754 ignored_notification = True 1755 logger.info("🚫 X notification ignored") 1756 1757 # Second pass: collect successful tool calls 1758 for message in message_response.messages: 1759 if hasattr(message, 'tool_call') and message.tool_call: 1760 # Collect annotate_ack tool calls 1761 if message.tool_call.name == 'annotate_ack': 1762 try: 1763 args = json.loads(message.tool_call.arguments) 1764 note = args.get('note', '') 1765 if note: 1766 ack_note = note 1767 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 1768 except json.JSONDecodeError as e: 1769 logger.error(f"Failed to parse annotate_ack arguments: {e}") 1770 1771 # Collect add_post_to_x_thread tool calls - only if they were successful 1772 elif message.tool_call.name == 'add_post_to_x_thread': 1773 tool_call_id = message.tool_call.tool_call_id 1774 tool_status = tool_call_results.get(tool_call_id, 'unknown') 1775 1776 if tool_status == 'success': 1777 try: 1778 args = json.loads(message.tool_call.arguments) 1779 reply_text = args.get('text', '') 1780 if reply_text: 1781 reply_candidates.append(reply_text) 1782 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...") 1783 except json.JSONDecodeError as e: 1784 logger.error(f"Failed to parse tool call arguments: {e}") 1785 1786 # Save agent response data to debug folder 1787 try: 1788 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}" 1789 1790 # Save complete agent interaction 1791 agent_response_data = { 1792 'processed_at': datetime.now().isoformat(), 1793 'mention_id': mention_id, 1794 'conversation_id': conversation_id, 1795 'prompt_sent': prompt, 1796 'reply_candidates': reply_candidates, 1797 'ignored_notification': ignored_notification, 1798 'ack_note': ack_note, 1799 'tool_call_results': tool_call_results, 1800 'all_messages': [] 1801 } 1802 1803 # Convert messages to serializable format 1804 for message in message_response.messages: 1805 msg_data = { 1806 'message_type': getattr(message, 'message_type', 'unknown'), 1807 'content': getattr(message, 'content', ''), 1808 'reasoning': getattr(message, 'reasoning', ''), 1809 'status': getattr(message, 'status', ''), 1810 'name': getattr(message, 'name', ''), 1811 } 1812 1813 if hasattr(message, 'tool_call') and message.tool_call: 1814 msg_data['tool_call'] = { 1815 'name': message.tool_call.name, 1816 'arguments': message.tool_call.arguments, 1817 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '') 1818 } 1819 1820 agent_response_data['all_messages'].append(msg_data) 1821 1822 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f: 1823 json.dump(agent_response_data, f, indent=2) 1824 1825 logger.info(f"💾 Saved agent response debug data") 1826 1827 except Exception as debug_error: 1828 logger.warning(f"Failed to save agent response debug data: {debug_error}") 1829 1830 # Handle conflicts 1831 if reply_candidates and ignored_notification: 1832 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!") 1833 return False 1834 1835 if reply_candidates: 1836 # Post replies to X 1837 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X") 1838 1839 if len(reply_candidates) == 1: 1840 content = reply_candidates[0] 1841 title = f"Reply to @{author_username}" 1842 else: 1843 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)]) 1844 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)" 1845 1846 print(f"\n{title}") 1847 print(f" {'' * len(title)}") 1848 for line in content.split('\n'): 1849 print(f" {line}") 1850 1851 if testing_mode: 1852 logger.info("TESTING MODE: Skipping actual X post") 1853 return True 1854 else: 1855 # Post to X using thread approach 1856 success = post_x_thread_replies(x_client, mention_id, reply_candidates) 1857 if success: 1858 logger.info(f"Successfully replied to @{author_username} on X") 1859 1860 # Acknowledge the post we're replying to 1861 try: 1862 ack_result = acknowledge_x_post(x_client, mention_id, ack_note) 1863 if ack_result: 1864 if ack_note: 1865 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")") 1866 else: 1867 logger.info(f"Successfully acknowledged X post from @{author_username}") 1868 else: 1869 logger.warning(f"Failed to acknowledge X post from @{author_username}") 1870 except Exception as e: 1871 logger.error(f"Error acknowledging X post from @{author_username}: {e}") 1872 # Don't fail the entire operation if acknowledgment fails 1873 1874 return True 1875 else: 1876 logger.error(f"Failed to send reply to @{author_username} on X") 1877 return False 1878 else: 1879 if ignored_notification: 1880 logger.info(f"X mention from @{author_username} was explicitly ignored") 1881 return "ignored" 1882 else: 1883 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass") 1884 return False # Keep in queue for retry instead of removing 1885 1886 except Exception as e: 1887 logger.error(f"Error processing X mention: {e}") 1888 return False 1889 1890def acknowledge_x_post(x_client, post_id, note=None): 1891 """ 1892 Acknowledge an X post that we replied to. 1893 Uses the same Bluesky client and uploads to the void data repository on atproto, 1894 just like Bluesky acknowledgments. 1895 1896 Args: 1897 x_client: XClient instance (not used, kept for compatibility) 1898 post_id: The X post ID we're acknowledging 1899 note: Optional note to include with the acknowledgment 1900 1901 Returns: 1902 True if successful, False otherwise 1903 """ 1904 try: 1905 # Use Bluesky client to upload acks to the void data repository on atproto 1906 bsky_client = bsky_utils.default_login() 1907 1908 # Create a synthetic URI and CID for the X post 1909 # X posts don't have atproto URIs/CIDs, so we create identifiers 1910 post_uri = f"x://twitter.com/post/{post_id}" 1911 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts 1912 1913 # Use the same acknowledge_post function as Bluesky 1914 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note) 1915 1916 if ack_result: 1917 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else "")) 1918 return True 1919 else: 1920 logger.error(f"Failed to acknowledge X post {post_id}") 1921 return False 1922 1923 except Exception as e: 1924 logger.error(f"Error acknowledging X post {post_id}: {e}") 1925 return False 1926 1927def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages): 1928 """ 1929 Post a series of replies to X, threading them properly. 1930 1931 Args: 1932 x_client: XClient instance 1933 in_reply_to_tweet_id: The original tweet ID to reply to 1934 reply_messages: List of reply text strings 1935 1936 Returns: 1937 True if successful, False otherwise 1938 """ 1939 try: 1940 current_reply_id = in_reply_to_tweet_id 1941 1942 for i, reply_text in enumerate(reply_messages): 1943 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...") 1944 1945 result = x_client.post_reply(reply_text, current_reply_id) 1946 1947 if result and 'data' in result: 1948 new_tweet_id = result['data']['id'] 1949 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}") 1950 # For threading, the next reply should reply to this one 1951 current_reply_id = new_tweet_id 1952 else: 1953 logger.error(f"Failed to post X reply {i+1}") 1954 return False 1955 1956 return True 1957 1958 except Exception as e: 1959 logger.error(f"Error posting X thread replies: {e}") 1960 return False 1961 1962def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False): 1963 """ 1964 Load and process all X mentions from the queue. 1965 Similar to bsky.py load_and_process_queued_notifications but for X. 1966 """ 1967 try: 1968 # Get all X mention files in queue directory 1969 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json")) 1970 1971 if not queue_files: 1972 return 1973 1974 # Load file metadata and sort by creation time (chronological order) 1975 file_metadata = [] 1976 for filepath in queue_files: 1977 try: 1978 with open(filepath, 'r') as f: 1979 queue_data = json.load(f) 1980 mention_data = queue_data.get('mention', queue_data) 1981 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing 1982 file_metadata.append((created_at, filepath)) 1983 except Exception as e: 1984 logger.warning(f"Error reading queue file {filepath.name}: {e}") 1985 # Add with default timestamp so it still gets processed 1986 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath)) 1987 1988 # Sort by creation time (oldest first) 1989 file_metadata.sort(key=lambda x: x[0]) 1990 1991 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order") 1992 1993 for i, (created_at, filepath) in enumerate(file_metadata, 1): 1994 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})") 1995 1996 try: 1997 # Load mention data 1998 with open(filepath, 'r') as f: 1999 queue_data = json.load(f) 2000 2001 mention_data = queue_data.get('mention', queue_data) 2002 2003 # Process the mention 2004 success = process_x_mention(void_agent, x_client, mention_data, 2005 queue_filepath=filepath, testing_mode=testing_mode) 2006 2007 except XRateLimitError: 2008 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning") 2009 break 2010 2011 except Exception as e: 2012 logger.error(f"Error processing X queue file {filepath.name}: {e}") 2013 continue 2014 2015 # Handle file based on processing result 2016 if success: 2017 if testing_mode: 2018 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}") 2019 else: 2020 filepath.unlink() 2021 logger.info(f"Successfully processed and removed X file: {filepath.name}") 2022 2023 # Mark as processed 2024 processed_mentions = load_processed_mentions() 2025 processed_mentions.add(mention_data.get('id')) 2026 save_processed_mentions(processed_mentions) 2027 2028 elif success is None: # Move to error directory 2029 error_dir = X_QUEUE_DIR / "errors" 2030 error_dir.mkdir(exist_ok=True) 2031 error_path = error_dir / filepath.name 2032 filepath.rename(error_path) 2033 logger.warning(f"Moved X file {filepath.name} to errors directory") 2034 2035 elif success == "no_reply": # Move to no_reply directory 2036 no_reply_dir = X_QUEUE_DIR / "no_reply" 2037 no_reply_dir.mkdir(exist_ok=True) 2038 no_reply_path = no_reply_dir / filepath.name 2039 filepath.rename(no_reply_path) 2040 logger.info(f"Moved X file {filepath.name} to no_reply directory") 2041 2042 elif success == "ignored": # Delete ignored notifications 2043 filepath.unlink() 2044 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}") 2045 2046 else: 2047 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry") 2048 2049 except Exception as e: 2050 logger.error(f"Error loading queued X mentions: {e}") 2051 2052def process_x_notifications(void_agent, x_client, testing_mode=False): 2053 """ 2054 Fetch new X mentions, queue them, and process the queue. 2055 Similar to bsky.py process_notifications but for X. 2056 """ 2057 try: 2058 # Get username for fetching mentions - uses cached data to avoid rate limits 2059 username = x_client.get_username() 2060 if not username: 2061 logger.error("Could not get username for X mentions") 2062 return 2063 2064 # Fetch and queue new mentions 2065 new_count = fetch_and_queue_mentions(username) 2066 2067 if new_count > 0: 2068 logger.info(f"Found {new_count} new X mentions to process") 2069 2070 # Process the entire queue 2071 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 2072 2073 except Exception as e: 2074 logger.error(f"Error processing X notifications: {e}") 2075 2076def periodic_user_block_cleanup(client, agent_id: str) -> None: 2077 """ 2078 Detach all user blocks from the agent to prevent memory bloat. 2079 This should be called periodically to ensure clean state. 2080 """ 2081 try: 2082 # Get all blocks attached to the agent 2083 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 2084 2085 user_blocks_to_detach = [] 2086 for block in attached_blocks: 2087 if hasattr(block, 'label') and block.label.startswith('user_'): 2088 user_blocks_to_detach.append({ 2089 'label': block.label, 2090 'id': block.id 2091 }) 2092 2093 if not user_blocks_to_detach: 2094 logger.debug("No user blocks found to detach during periodic cleanup") 2095 return 2096 2097 logger.info(f"Found {len(user_blocks_to_detach)} user blocks to detach") 2098 2099 # Detach each user block 2100 for block in user_blocks_to_detach: 2101 try: 2102 client.agents.blocks.detach( 2103 agent_id=agent_id, 2104 block_id=block['id'] 2105 ) 2106 logger.debug(f"Detached user block: {block['label']}") 2107 except Exception as e: 2108 logger.error(f"Failed to detach user block {block['label']}: {e}") 2109 2110 logger.info(f"Periodic cleanup complete: detached {len(user_blocks_to_detach)} user blocks") 2111 2112 except Exception as e: 2113 logger.error(f"Error during periodic user block cleanup: {e}") 2114 2115def initialize_x_void(): 2116 """Initialize the void agent for X operations.""" 2117 logger.info("Starting void agent initialization for X...") 2118 2119 from letta_client import Letta 2120 2121 # Get config 2122 config = get_x_letta_config() 2123 client = Letta(token=config['api_key'], timeout=config['timeout']) 2124 agent_id = config['agent_id'] 2125 2126 try: 2127 void_agent = client.agents.retrieve(agent_id=agent_id) 2128 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})") 2129 except Exception as e: 2130 logger.error(f"Failed to load void agent {agent_id}: {e}") 2131 raise e 2132 2133 # Clean up all user blocks at startup 2134 logger.info("🧹 Cleaning up user blocks at X startup...") 2135 periodic_user_block_cleanup(client, agent_id) 2136 2137 # Ensure correct tools are attached for X 2138 logger.info("Configuring tools for X platform...") 2139 try: 2140 from tool_manager import ensure_platform_tools 2141 ensure_platform_tools('x', void_agent.id, config['api_key']) 2142 except Exception as e: 2143 logger.error(f"Failed to configure platform tools: {e}") 2144 logger.warning("Continuing with existing tool configuration") 2145 2146 # Log agent details 2147 logger.info(f"X Void agent details - ID: {void_agent.id}") 2148 logger.info(f"Agent name: {void_agent.name}") 2149 2150 return void_agent 2151 2152def x_main_loop(testing_mode=False, cleanup_interval=10): 2153 """ 2154 Main X bot loop that continuously monitors for mentions and processes them. 2155 Similar to bsky.py main() but for X/Twitter. 2156 2157 Args: 2158 testing_mode: If True, don't actually post to X 2159 cleanup_interval: Run user block cleanup every N cycles (0 to disable) 2160 """ 2161 import time 2162 from time import sleep 2163 from letta_client import Letta 2164 2165 logger.info("=== STARTING X VOID BOT ===") 2166 2167 # Configure logging from config file 2168 setup_logging_from_config() 2169 2170 # Initialize void agent 2171 void_agent = initialize_x_void() 2172 logger.info(f"X void agent initialized: {void_agent.id}") 2173 2174 # Initialize X client 2175 x_client = create_x_client() 2176 logger.info("Connected to X API") 2177 2178 # Get Letta client for periodic cleanup 2179 config = get_x_letta_config() 2180 letta_client = Letta(token=config['api_key'], timeout=config['timeout']) 2181 2182 # Main loop 2183 FETCH_DELAY_SEC = 120 # Check every 2 minutes for X mentions (reduced from 60s to conserve API calls) 2184 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds") 2185 2186 if testing_mode: 2187 logger.info("=== RUNNING IN X TESTING MODE ===") 2188 logger.info(" - No messages will be sent to X") 2189 logger.info(" - Queue files will not be deleted") 2190 2191 if cleanup_interval > 0: 2192 logger.info(f"User block cleanup enabled every {cleanup_interval} cycles") 2193 else: 2194 logger.info("User block cleanup disabled") 2195 2196 cycle_count = 0 2197 start_time = time.time() 2198 2199 while True: 2200 try: 2201 cycle_count += 1 2202 logger.info(f"=== X CYCLE {cycle_count} ===") 2203 2204 # Process X notifications (fetch, queue, and process) 2205 process_x_notifications(void_agent, x_client, testing_mode) 2206 2207 # Run periodic cleanup every N cycles 2208 if cleanup_interval > 0 and cycle_count % cleanup_interval == 0: 2209 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 2210 periodic_user_block_cleanup(letta_client, void_agent.id) 2211 2212 # Log cycle completion 2213 elapsed_time = time.time() - start_time 2214 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes") 2215 2216 sleep(FETCH_DELAY_SEC) 2217 2218 except KeyboardInterrupt: 2219 elapsed_time = time.time() - start_time 2220 logger.info("=== X BOT STOPPED BY USER ===") 2221 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes") 2222 break 2223 except Exception as e: 2224 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===") 2225 logger.error(f"Error details: {e}") 2226 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...") 2227 sleep(FETCH_DELAY_SEC * 2) 2228 2229def process_queue_only(testing_mode=False): 2230 """ 2231 Process all queued X mentions without fetching new ones. 2232 Useful for rate limit management - queue first, then process separately. 2233 2234 Args: 2235 testing_mode: If True, don't actually post to X and keep queue files 2236 """ 2237 logger.info("=== PROCESSING X QUEUE ONLY ===") 2238 2239 if testing_mode: 2240 logger.info("=== RUNNING IN X TESTING MODE ===") 2241 logger.info(" - No messages will be sent to X") 2242 logger.info(" - Queue files will not be deleted") 2243 2244 try: 2245 # Initialize void agent 2246 void_agent = initialize_x_void() 2247 logger.info(f"X void agent initialized: {void_agent.id}") 2248 2249 # Initialize X client 2250 x_client = create_x_client() 2251 logger.info("Connected to X API") 2252 2253 # Process the queue without fetching new mentions 2254 logger.info("Processing existing X queue...") 2255 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode) 2256 2257 logger.info("=== X QUEUE PROCESSING COMPLETE ===") 2258 2259 except Exception as e: 2260 logger.error(f"Error processing X queue: {e}") 2261 raise 2262 2263def x_notification_loop(): 2264 """ 2265 DEPRECATED: Old X notification loop using search-based mention detection. 2266 Use x_main_loop() instead for the full bot experience. 2267 """ 2268 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.") 2269 x_main_loop() 2270 2271if __name__ == "__main__": 2272 import sys 2273 import argparse 2274 2275 if len(sys.argv) > 1: 2276 if sys.argv[1] == "bot": 2277 # Main bot with optional --test flag and cleanup interval 2278 parser = argparse.ArgumentParser(description='X Void Bot') 2279 parser.add_argument('command', choices=['bot']) 2280 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)') 2281 parser.add_argument('--cleanup-interval', type=int, default=10, 2282 help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 2283 args = parser.parse_args() 2284 x_main_loop(testing_mode=args.test, cleanup_interval=args.cleanup_interval) 2285 elif sys.argv[1] == "loop": 2286 x_notification_loop() 2287 elif sys.argv[1] == "reply": 2288 reply_to_cameron_post() 2289 elif sys.argv[1] == "me": 2290 get_my_user_info() 2291 elif sys.argv[1] == "search": 2292 test_search_mentions() 2293 elif sys.argv[1] == "queue": 2294 test_fetch_and_queue() 2295 elif sys.argv[1] == "thread": 2296 test_thread_context() 2297 elif sys.argv[1] == "process": 2298 # Process all queued mentions with optional --test flag 2299 testing_mode = "--test" in sys.argv 2300 process_queue_only(testing_mode=testing_mode) 2301 elif sys.argv[1] == "letta": 2302 # Use specific agent ID if provided, otherwise use from config 2303 agent_id = sys.argv[2] if len(sys.argv) > 2 else None 2304 test_letta_integration(agent_id) 2305 elif sys.argv[1] == "downrank": 2306 # View or manage downrank list 2307 if len(sys.argv) > 2 and sys.argv[2] == "list": 2308 downrank_users = load_downrank_users() 2309 if downrank_users: 2310 print(f"📋 Downrank users ({len(downrank_users)} total):") 2311 for user_id in sorted(downrank_users): 2312 print(f" - {user_id}") 2313 else: 2314 print("📋 No downrank users configured") 2315 else: 2316 print("Usage: python x.py downrank list") 2317 print(" list - Show all downranked user IDs") 2318 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list") 2319 else: 2320 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]") 2321 print(" bot - Run the main X bot (use --test for testing mode)") 2322 print(" Example: python x.py bot --test") 2323 print(" queue - Fetch and queue mentions only (no processing)") 2324 print(" process - Process all queued mentions only (no fetching)") 2325 print(" Example: python x.py process --test") 2326 print(" downrank - Manage downrank users (10% response rate)") 2327 print(" Example: python x.py downrank list") 2328 print(" loop - Run the old notification monitoring loop (deprecated)") 2329 print(" reply - Reply to Cameron's specific post") 2330 print(" me - Get authenticated user info and correct user ID") 2331 print(" search - Test search-based mention detection") 2332 print(" thread - Test thread context retrieval from queued mention") 2333 print(" letta - Test sending thread context to Letta agent") 2334 print(" Optional: python x.py letta <agent-id>") 2335 else: 2336 test_x_client()