a digital person for bluesky
1import os
2import logging
3import requests
4import yaml
5import json
6import hashlib
7import random
8import time
9from typing import Optional, Dict, Any, List, Set
10from datetime import datetime
11from pathlib import Path
12from requests_oauthlib import OAuth1
13from rich import print as rprint
14from rich.panel import Panel
15from rich.text import Text
16
17import bsky_utils
18
19class XRateLimitError(Exception):
20 """Exception raised when X API rate limit is exceeded"""
21 pass
22
23
24# Configure logging (will be updated by setup_logging_from_config if called)
25logging.basicConfig(
26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
27)
28logger = logging.getLogger("x_client")
29
30def setup_logging_from_config(config_path: str = "configs/x_config.yaml"):
31 """Configure logging based on configs/x_config.yaml settings."""
32 try:
33 config = load_x_config(config_path)
34 logging_config = config.get('logging', {})
35 log_level = logging_config.get('level', 'INFO').upper()
36
37 # Convert string level to logging constant
38 numeric_level = getattr(logging, log_level, logging.INFO)
39
40 # Update the root logger level
41 logging.getLogger().setLevel(numeric_level)
42 # Update our specific logger level
43 logger.setLevel(numeric_level)
44
45 logger.info(f"Logging level set to {log_level}")
46
47 except Exception as e:
48 logger.warning(f"Failed to configure logging from config: {e}, using default INFO level")
49
50# X-specific file paths
51X_QUEUE_DIR = Path("x_queue")
52X_CACHE_DIR = Path("x_cache")
53X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json")
54X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json")
55X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt")
56
57class XClient:
58 """X (Twitter) API client for fetching mentions and managing interactions."""
59
60 def __init__(self, api_key: str, user_id: str, access_token: str = None,
61 consumer_key: str = None, consumer_secret: str = None,
62 access_token_secret: str = None):
63 self.api_key = api_key
64 self.access_token = access_token
65 self.user_id = user_id
66 self.base_url = "https://api.x.com/2"
67
68 # Check if we have OAuth 1.0a credentials
69 if (consumer_key and consumer_secret and access_token and access_token_secret):
70 # Use OAuth 1.0a for User Context
71 self.oauth = OAuth1(
72 consumer_key,
73 client_secret=consumer_secret,
74 resource_owner_key=access_token,
75 resource_owner_secret=access_token_secret
76 )
77 self.headers = {"Content-Type": "application/json"}
78 self.auth_method = "oauth1a"
79 logger.info("Using OAuth 1.0a User Context authentication for X API")
80 elif access_token:
81 # Use OAuth 2.0 Bearer token for User Context
82 self.oauth = None
83 self.headers = {
84 "Authorization": f"Bearer {access_token}",
85 "Content-Type": "application/json"
86 }
87 self.auth_method = "oauth2_user"
88 logger.info("Using OAuth 2.0 User Context access token for X API")
89 else:
90 # Use Application-Only Bearer token
91 self.oauth = None
92 self.headers = {
93 "Authorization": f"Bearer {api_key}",
94 "Content-Type": "application/json"
95 }
96 self.auth_method = "bearer"
97 logger.info("Using Application-Only Bearer token for X API")
98
99 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]:
100 """Make a request to the X API with proper error handling and exponential backoff."""
101 url = f"{self.base_url}{endpoint}"
102
103 # Log the specific API call being made
104 logger.debug(f"Making X API request: {method} {endpoint}")
105
106 for attempt in range(max_retries):
107 try:
108 if method.upper() == "GET":
109 if self.oauth:
110 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth)
111 else:
112 response = requests.get(url, headers=self.headers, params=params)
113 elif method.upper() == "POST":
114 if self.oauth:
115 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth)
116 else:
117 response = requests.post(url, headers=self.headers, json=data)
118 else:
119 raise ValueError(f"Unsupported HTTP method: {method}")
120
121 response.raise_for_status()
122 return response.json()
123
124 except requests.exceptions.HTTPError as e:
125 if response.status_code == 401:
126 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials")
127 logger.error(f"Response: {response.text}")
128 return None # Don't retry auth failures
129 elif response.status_code == 403:
130 logger.error(f"X API forbidden with {self.auth_method} - check app permissions")
131 logger.error(f"Response: {response.text}")
132 return None # Don't retry permission failures
133 elif response.status_code == 429:
134 if attempt < max_retries - 1:
135 # Exponential backoff: 60s, 120s, 240s
136 backoff_time = 60 * (2 ** attempt)
137 logger.warning(f"X API rate limit exceeded on {method} {endpoint} (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry")
138 logger.error(f"Response: {response.text}")
139 time.sleep(backoff_time)
140 continue
141 else:
142 logger.error(f"X API rate limit exceeded on {method} {endpoint} - max retries reached")
143 logger.error(f"Response: {response.text}")
144 raise XRateLimitError(f"X API rate limit exceeded on {method} {endpoint}")
145 else:
146 if attempt < max_retries - 1:
147 # Exponential backoff for other HTTP errors too
148 backoff_time = 30 * (2 ** attempt)
149 logger.warning(f"X API request failed (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s")
150 logger.error(f"Response: {response.text}")
151 time.sleep(backoff_time)
152 continue
153 else:
154 logger.error(f"X API request failed after {max_retries} attempts: {e}")
155 logger.error(f"Response: {response.text}")
156 return None
157
158 except Exception as e:
159 if attempt < max_retries - 1:
160 backoff_time = 15 * (2 ** attempt)
161 logger.warning(f"Unexpected error making X API request (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s")
162 time.sleep(backoff_time)
163 continue
164 else:
165 logger.error(f"Unexpected error making X API request after {max_retries} attempts: {e}")
166 return None
167
168 return None
169
170 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]:
171 """
172 Fetch mentions for the configured user.
173
174 Args:
175 since_id: Minimum Post ID to include (for getting newer mentions)
176 max_results: Number of results to return (5-100)
177
178 Returns:
179 List of mention objects or None if request failed
180 """
181 endpoint = f"/users/{self.user_id}/mentions"
182 params = {
183 "max_results": min(max(max_results, 5), 100), # Ensure within API limits
184 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets",
185 "user.fields": "id,name,username",
186 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
187 }
188
189 if since_id:
190 params["since_id"] = since_id
191
192 logger.info(f"Fetching mentions for user {self.user_id}")
193 response = self._make_request(endpoint, params)
194
195 if response:
196 logger.debug(f"X API response: {response}")
197
198 if response and "data" in response:
199 mentions = response["data"]
200 logger.info(f"Retrieved {len(mentions)} mentions")
201 return mentions
202 else:
203 if response:
204 logger.info(f"No mentions in response. Full response: {response}")
205 else:
206 logger.warning("Request failed - no response received")
207 return []
208
209 def get_user_info(self, user_id: str) -> Optional[Dict]:
210 """Get information about a specific user."""
211 endpoint = f"/users/{user_id}"
212 params = {
213 "user.fields": "id,name,username,description,public_metrics"
214 }
215
216 response = self._make_request(endpoint, params)
217 return response.get("data") if response else None
218
219 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]:
220 """
221 Search for mentions using the search endpoint instead of mentions endpoint.
222 This might have better rate limits than the direct mentions endpoint.
223
224 Args:
225 username: Username to search for mentions of (without @)
226 max_results: Number of results to return (10-100)
227 since_id: Only return results newer than this tweet ID
228
229 Returns:
230 List of tweets mentioning the username
231 """
232 endpoint = "/tweets/search/recent"
233
234 # Search for mentions of the username
235 query = f"@{username}"
236
237 params = {
238 "query": query,
239 "max_results": min(max(max_results, 10), 100),
240 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
241 "user.fields": "id,name,username",
242 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
243 }
244
245 if since_id:
246 params["since_id"] = since_id
247
248 logger.info(f"Searching for mentions of @{username}")
249 response = self._make_request(endpoint, params)
250
251 if response and "data" in response:
252 tweets = response["data"]
253 logger.info(f"Found {len(tweets)} mentions via search")
254 return tweets
255 else:
256 if response:
257 logger.info(f"No mentions found via search. Response: {response}")
258 else:
259 logger.warning("Search request failed")
260 return []
261
262 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]:
263 """
264 Get all tweets in a conversation thread up to a specific tweet ID.
265
266 Args:
267 conversation_id: The conversation ID to fetch (should be the original tweet ID)
268 use_cache: Whether to use cached data if available
269 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID)
270
271 Returns:
272 List of tweets in the conversation, ordered chronologically
273 """
274 # Check cache first if enabled
275 if use_cache:
276 cached_data = get_cached_thread_context(conversation_id)
277 if cached_data:
278 return cached_data
279
280 # First, get the original tweet directly since it might not appear in conversation search
281 logger.debug(f"Getting thread context for conversation {conversation_id}")
282 original_tweet = None
283 try:
284 endpoint = f"/tweets/{conversation_id}"
285 params = {
286 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
287 "user.fields": "id,name,username",
288 "expansions": "author_id"
289 }
290 logger.debug(f"Fetching original tweet: GET {endpoint}")
291 response = self._make_request(endpoint, params)
292 if response and "data" in response:
293 original_tweet = response["data"]
294 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}")
295 except Exception as e:
296 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}")
297
298 # Then search for all tweets in this conversation
299 endpoint = "/tweets/search/recent"
300 params = {
301 "query": f"conversation_id:{conversation_id}",
302 "max_results": 100, # Get as many as possible
303 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
304 "user.fields": "id,name,username",
305 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id",
306 "sort_order": "recency" # Get newest first, we'll reverse later
307 }
308
309 # Add until_id parameter to exclude tweets after the mention being processed
310 if until_id:
311 params["until_id"] = until_id
312 logger.info(f"Using until_id={until_id} to exclude future tweets")
313
314 logger.info(f"Fetching thread context for conversation {conversation_id}")
315 logger.debug(f"Searching conversation: GET {endpoint} with query={params['query']}")
316 response = self._make_request(endpoint, params)
317
318 tweets = []
319 users_data = {}
320
321 # Collect tweets from search
322 if response and "data" in response:
323 tweets.extend(response["data"])
324 # Store user data for reference
325 if "includes" in response and "users" in response["includes"]:
326 for user in response["includes"]["users"]:
327 users_data[user["id"]] = user
328
329 # Add original tweet if we got it and it's not already in the list
330 if original_tweet:
331 tweet_ids = [t.get('id') for t in tweets]
332 if original_tweet.get('id') not in tweet_ids:
333 tweets.append(original_tweet)
334 logger.info("Added original tweet to thread context")
335
336 # Attempt to fill gaps by fetching referenced tweets that are missing
337 # This helps with X API's incomplete conversation search results
338 tweet_ids = set(t.get('id') for t in tweets)
339 missing_tweet_ids = set()
340 critical_missing_ids = set()
341
342 # Collect referenced tweet IDs, prioritizing critical ones
343 for tweet in tweets:
344 referenced_tweets = tweet.get('referenced_tweets', [])
345 for ref in referenced_tweets:
346 ref_id = ref.get('id')
347 ref_type = ref.get('type')
348 if ref_id and ref_id not in tweet_ids:
349 missing_tweet_ids.add(ref_id)
350 # Prioritize direct replies and quoted tweets over retweets
351 if ref_type in ['replied_to', 'quoted']:
352 critical_missing_ids.add(ref_id)
353
354 # For rate limit efficiency, only fetch critical missing tweets if we have many
355 if len(missing_tweet_ids) > 10:
356 logger.info(f"Many missing tweets ({len(missing_tweet_ids)}), prioritizing {len(critical_missing_ids)} critical ones")
357 missing_tweet_ids = critical_missing_ids
358
359 # Context sufficiency check - skip backfill if we already have enough context
360 if has_sufficient_context(tweets, missing_tweet_ids):
361 logger.info("Thread has sufficient context, skipping missing tweet backfill")
362 missing_tweet_ids = set()
363
364 # Fetch missing referenced tweets in batches (more rate-limit friendly)
365 if missing_tweet_ids:
366 missing_list = list(missing_tweet_ids)
367
368 # First, check cache for missing tweets
369 cached_tweets = get_cached_tweets(missing_list)
370 for tweet_id, cached_tweet in cached_tweets.items():
371 if cached_tweet.get('conversation_id') == conversation_id:
372 tweets.append(cached_tweet)
373 tweet_ids.add(tweet_id)
374 logger.info(f"Retrieved missing tweet from cache: {tweet_id}")
375
376 # Add user data if available in cache
377 if cached_tweet.get('author_info'):
378 author_id = cached_tweet.get('author_id')
379 if author_id:
380 users_data[author_id] = cached_tweet['author_info']
381
382 # Only fetch tweets that weren't found in cache
383 uncached_ids = [tid for tid in missing_list if tid not in cached_tweets]
384
385 if uncached_ids:
386 batch_size = 100 # X API limit for bulk tweet lookup
387
388 for i in range(0, len(uncached_ids), batch_size):
389 batch_ids = uncached_ids[i:i + batch_size]
390 try:
391 endpoint = "/tweets"
392 params = {
393 "ids": ",".join(batch_ids),
394 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
395 "user.fields": "id,name,username",
396 "expansions": "author_id"
397 }
398 logger.debug(f"Batch fetching missing tweets: GET {endpoint} (ids: {len(batch_ids)} tweets)")
399 response = self._make_request(endpoint, params)
400
401 if response and "data" in response:
402 fetched_tweets = []
403 batch_users_data = {}
404
405 for missing_tweet in response["data"]:
406 # Only add if it's actually part of this conversation
407 if missing_tweet.get('conversation_id') == conversation_id:
408 tweets.append(missing_tweet)
409 tweet_ids.add(missing_tweet.get('id'))
410 fetched_tweets.append(missing_tweet)
411 logger.info(f"Retrieved missing referenced tweet: {missing_tweet.get('id')}")
412
413 # Add user data if available
414 if "includes" in response and "users" in response["includes"]:
415 for user in response["includes"]["users"]:
416 users_data[user["id"]] = user
417 batch_users_data[user["id"]] = user
418
419 # Cache the newly fetched tweets
420 if fetched_tweets:
421 save_cached_tweets(fetched_tweets, batch_users_data)
422
423 logger.info(f"Batch fetched {len(response['data'])} missing tweets from {len(batch_ids)} requested")
424
425 # Handle partial success - log any missing tweets that weren't found
426 if response and "errors" in response:
427 for error in response["errors"]:
428 logger.warning(f"Could not fetch tweet {error.get('resource_id')}: {error.get('title')}")
429
430 except Exception as e:
431 logger.warning(f"Could not fetch batch of missing tweets {batch_ids[:3]}...: {e}")
432 else:
433 logger.info(f"All {len(missing_list)} missing tweets found in cache")
434
435 if tweets:
436 # Filter out tweets that occur after until_id (if specified)
437 if until_id:
438 original_count = len(tweets)
439 # Convert until_id to int for comparison (Twitter IDs are sequential)
440 until_id_int = int(until_id)
441 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int]
442 filtered_count = len(tweets)
443 if original_count != filtered_count:
444 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}")
445
446 # Sort chronologically (oldest first)
447 tweets.sort(key=lambda x: x.get('created_at', ''))
448 logger.info(f"Retrieved {len(tweets)} tweets in thread")
449
450 thread_data = {"tweets": tweets, "users": users_data}
451
452 # Cache individual tweets from the thread for future backfill
453 save_cached_tweets(tweets, users_data)
454
455 # Cache the result
456 if use_cache:
457 save_cached_thread_context(conversation_id, thread_data)
458
459 return thread_data
460 else:
461 logger.warning("No tweets found for thread context")
462 return None
463
464 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]:
465 """
466 Post a reply to a specific tweet.
467
468 Args:
469 reply_text: The text content of the reply
470 in_reply_to_tweet_id: The ID of the tweet to reply to
471
472 Returns:
473 Response data if successful, None if failed
474 """
475 endpoint = "/tweets"
476
477 payload = {
478 "text": reply_text,
479 "reply": {
480 "in_reply_to_tweet_id": in_reply_to_tweet_id
481 }
482 }
483
484 logger.info(f"Attempting to post reply with {self.auth_method} authentication")
485 logger.debug(f"Posting reply: POST {endpoint}")
486 result = self._make_request(endpoint, method="POST", data=payload)
487
488 if result:
489 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}")
490 return result
491 else:
492 logger.error("Failed to post reply")
493 return None
494
495 def post_tweet(self, tweet_text: str) -> Optional[Dict]:
496 """
497 Post a standalone tweet (not a reply).
498
499 Args:
500 tweet_text: The text content of the tweet (max 280 characters)
501
502 Returns:
503 Response data if successful, None if failed
504 """
505 endpoint = "/tweets"
506
507 # Validate tweet length
508 if len(tweet_text) > 280:
509 logger.error(f"Tweet text too long: {len(tweet_text)} characters (max 280)")
510 return None
511
512 payload = {
513 "text": tweet_text
514 }
515
516 logger.info(f"Attempting to post tweet with {self.auth_method} authentication")
517 logger.debug(f"Posting tweet: POST {endpoint}")
518 result = self._make_request(endpoint, method="POST", data=payload)
519
520 if result:
521 logger.info(f"Successfully posted tweet")
522 return result
523 else:
524 logger.error("Failed to post tweet")
525 return None
526
527 def get_user_info(self, fields: Optional[str] = None) -> Optional[Dict]:
528 """
529 Get the authenticated user's information, using cached data when available.
530 This reduces API calls significantly since user info rarely changes.
531
532 Args:
533 fields: Optional comma-separated list of user fields to fetch
534
535 Returns:
536 User data dict if successful, None if failed
537 """
538 # First try to get from cache
539 cached_user_info = get_cached_user_info()
540 if cached_user_info:
541 # Check if cached data has all requested fields
542 requested_fields = set(fields.split(',') if fields else ['id', 'username', 'name'])
543 cached_fields = set(cached_user_info.keys())
544 if requested_fields.issubset(cached_fields):
545 return cached_user_info
546
547 # Cache miss, expired, or missing requested fields - fetch from API
548 logger.debug("Fetching fresh user info from /users/me API")
549 endpoint = "/users/me"
550 params = {"user.fields": fields or "id,username,name,description"}
551
552 response = self._make_request(endpoint, params=params)
553 if response and "data" in response:
554 user_data = response["data"]
555 # Cache the result for future use
556 save_cached_user_info(user_data)
557 return user_data
558 else:
559 logger.error("Failed to get user info from /users/me API")
560 return None
561
562 def get_username(self) -> Optional[str]:
563 """
564 Get the authenticated user's username, using cached data when available.
565 This reduces API calls significantly since username rarely changes.
566
567 Returns:
568 Username string if successful, None if failed
569 """
570 user_info = self.get_user_info("id,username,name")
571 return user_info.get("username") if user_info else None
572
573def load_x_config(config_path: str = "configs/x_config.yaml") -> Dict[str, Any]:
574 """Load complete X configuration from configs/x_config.yaml."""
575 try:
576 with open(config_path, 'r') as f:
577 config = yaml.safe_load(f)
578
579 if not config:
580 raise ValueError(f"Empty or invalid configuration file: {config_path}")
581
582 # Validate required sections
583 x_config = config.get('x', {})
584 letta_config = config.get('letta', {})
585
586 if not x_config.get('api_key') or not x_config.get('user_id'):
587 raise ValueError("X API key and user_id must be configured in configs/x_config.yaml")
588
589 if not letta_config.get('api_key') or not letta_config.get('agent_id'):
590 raise ValueError("Letta API key and agent_id must be configured in configs/x_config.yaml")
591
592 return config
593 except Exception as e:
594 logger.error(f"Failed to load X configuration: {e}")
595 raise
596
597def get_x_letta_config(config_path: str = "configs/x_config.yaml") -> Dict[str, Any]:
598 """Get Letta configuration from X config file."""
599 config = load_x_config(config_path)
600 return config['letta']
601
602def create_x_client(config_path: str = "configs/x_config.yaml") -> XClient:
603 """Create and return an X client with configuration loaded from file."""
604 config = load_x_config(config_path)
605 x_config = config['x']
606 return XClient(
607 api_key=x_config['api_key'],
608 user_id=x_config['user_id'],
609 access_token=x_config.get('access_token'),
610 consumer_key=x_config.get('consumer_key'),
611 consumer_secret=x_config.get('consumer_secret'),
612 access_token_secret=x_config.get('access_token_secret')
613 )
614
615def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str:
616 """
617 Convert a mention object to a YAML string for better AI comprehension.
618 Similar to thread_to_yaml_string in bsky_utils.py
619 """
620 # Extract relevant fields
621 simplified_mention = {
622 'id': mention.get('id'),
623 'text': mention.get('text'),
624 'author_id': mention.get('author_id'),
625 'created_at': mention.get('created_at'),
626 'in_reply_to_user_id': mention.get('in_reply_to_user_id')
627 }
628
629 # Add user information if available
630 if users_data and mention.get('author_id') in users_data:
631 user = users_data[mention.get('author_id')]
632 simplified_mention['author'] = {
633 'username': user.get('username'),
634 'name': user.get('name')
635 }
636
637 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False)
638
639def thread_to_yaml_string(thread_data: Dict) -> str:
640 """
641 Convert X thread context to YAML string for AI comprehension.
642 Similar to Bluesky's thread_to_yaml_string function.
643
644 Args:
645 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context()
646
647 Returns:
648 YAML string representation of the thread
649 """
650 if not thread_data or "tweets" not in thread_data:
651 return "conversation: []\n"
652
653 tweets = thread_data["tweets"]
654 users_data = thread_data.get("users", {})
655
656 simplified_thread = {
657 "conversation": []
658 }
659
660 for tweet in tweets:
661 # Get user info
662 author_id = tweet.get('author_id')
663 author_info = {}
664 if author_id and author_id in users_data:
665 user = users_data[author_id]
666 author_info = {
667 'username': user.get('username'),
668 'name': user.get('name')
669 }
670
671 # Build tweet object (simplified for AI consumption)
672 tweet_obj = {
673 'text': tweet.get('text'),
674 'created_at': tweet.get('created_at'),
675 'author': author_info,
676 'author_id': author_id # Include user ID for block management
677 }
678
679 simplified_thread["conversation"].append(tweet_obj)
680
681 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False)
682
683
684# X Caching and Queue System Functions
685
686def load_last_seen_id() -> Optional[str]:
687 """Load the last seen mention ID for incremental fetching."""
688 if X_LAST_SEEN_FILE.exists():
689 try:
690 with open(X_LAST_SEEN_FILE, 'r') as f:
691 data = json.load(f)
692 return data.get('last_seen_id')
693 except Exception as e:
694 logger.error(f"Error loading last seen ID: {e}")
695 return None
696
697def save_last_seen_id(mention_id: str):
698 """Save the last seen mention ID."""
699 try:
700 X_QUEUE_DIR.mkdir(exist_ok=True)
701 with open(X_LAST_SEEN_FILE, 'w') as f:
702 json.dump({
703 'last_seen_id': mention_id,
704 'updated_at': datetime.now().isoformat()
705 }, f)
706 logger.debug(f"Saved last seen ID: {mention_id}")
707 except Exception as e:
708 logger.error(f"Error saving last seen ID: {e}")
709
710def load_processed_mentions() -> set:
711 """Load the set of processed mention IDs."""
712 if X_PROCESSED_MENTIONS_FILE.exists():
713 try:
714 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f:
715 data = json.load(f)
716 # Keep only recent entries (last 10000)
717 if len(data) > 10000:
718 data = data[-10000:]
719 save_processed_mentions(set(data))
720 return set(data)
721 except Exception as e:
722 logger.error(f"Error loading processed mentions: {e}")
723 return set()
724
725def save_processed_mentions(processed_set: set):
726 """Save the set of processed mention IDs."""
727 try:
728 X_QUEUE_DIR.mkdir(exist_ok=True)
729 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f:
730 json.dump(list(processed_set), f)
731 except Exception as e:
732 logger.error(f"Error saving processed mentions: {e}")
733
734def load_downrank_users() -> Set[str]:
735 """Load the set of user IDs that should be downranked (responded to 10% of the time)."""
736 try:
737 if not X_DOWNRANK_USERS_FILE.exists():
738 return set()
739
740 downrank_users = set()
741 with open(X_DOWNRANK_USERS_FILE, 'r') as f:
742 for line in f:
743 line = line.strip()
744 # Skip empty lines and comments
745 if line and not line.startswith('#'):
746 downrank_users.add(line)
747
748 logger.info(f"Loaded {len(downrank_users)} downrank users")
749 return downrank_users
750 except Exception as e:
751 logger.error(f"Error loading downrank users: {e}")
752 return set()
753
754def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool:
755 """
756 Check if we should respond to a downranked user.
757 Returns True 10% of the time for downranked users, True 100% of the time for others.
758 """
759 if user_id not in downrank_users:
760 return True
761
762 # 10% chance for downranked users
763 should_respond = random.random() < 0.1
764 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)")
765 return should_respond
766
767def save_mention_to_queue(mention: Dict):
768 """Save a mention to the queue directory for async processing."""
769 try:
770 mention_id = mention.get('id')
771 if not mention_id:
772 logger.error("Mention missing ID, cannot queue")
773 return
774
775 # Check if already processed
776 processed_mentions = load_processed_mentions()
777 if mention_id in processed_mentions:
778 logger.debug(f"Mention {mention_id} already processed, skipping")
779 return
780
781 # Create queue directory
782 X_QUEUE_DIR.mkdir(exist_ok=True)
783
784 # Create filename using hash (similar to Bluesky system)
785 mention_str = json.dumps(mention, sort_keys=True)
786 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16]
787 filename = f"x_mention_{mention_hash}.json"
788
789 queue_file = X_QUEUE_DIR / filename
790
791 # Save mention data with enhanced debugging information
792 mention_data = {
793 'mention': mention,
794 'queued_at': datetime.now().isoformat(),
795 'type': 'x_mention',
796 # Debug info for conversation tracking
797 'debug_info': {
798 'mention_id': mention.get('id'),
799 'author_id': mention.get('author_id'),
800 'conversation_id': mention.get('conversation_id'),
801 'in_reply_to_user_id': mention.get('in_reply_to_user_id'),
802 'referenced_tweets': mention.get('referenced_tweets', []),
803 'text_preview': mention.get('text', '')[:200],
804 'created_at': mention.get('created_at'),
805 'public_metrics': mention.get('public_metrics', {}),
806 'context_annotations': mention.get('context_annotations', [])
807 }
808 }
809
810 with open(queue_file, 'w') as f:
811 json.dump(mention_data, f, indent=2)
812
813 logger.info(f"Queued X mention {mention_id} -> {filename}")
814
815 except Exception as e:
816 logger.error(f"Error saving mention to queue: {e}")
817
818# X Cache Functions
819def get_cached_thread_context(conversation_id: str) -> Optional[Dict]:
820 """Load cached thread context if available."""
821 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
822 if cache_file.exists():
823 try:
824 with open(cache_file, 'r') as f:
825 cached_data = json.load(f)
826 # Check if cache is recent (within 1 hour)
827 from datetime import datetime, timedelta
828 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
829 if datetime.now() - cached_time < timedelta(hours=1):
830 logger.info(f"Using cached thread context for {conversation_id}")
831 return cached_data.get('thread_data')
832 except Exception as e:
833 logger.warning(f"Error loading cached thread context: {e}")
834 return None
835
836def save_cached_thread_context(conversation_id: str, thread_data: Dict):
837 """Save thread context to cache."""
838 try:
839 X_CACHE_DIR.mkdir(exist_ok=True)
840 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
841
842 cache_data = {
843 'conversation_id': conversation_id,
844 'thread_data': thread_data,
845 'cached_at': datetime.now().isoformat()
846 }
847
848 with open(cache_file, 'w') as f:
849 json.dump(cache_data, f, indent=2)
850
851 logger.debug(f"Cached thread context for {conversation_id}")
852 except Exception as e:
853 logger.error(f"Error caching thread context: {e}")
854
855def get_cached_tweets(tweet_ids: List[str]) -> Dict[str, Dict]:
856 """
857 Load cached individual tweets if available.
858 Returns dict mapping tweet_id -> tweet_data for found tweets.
859 """
860 cached_tweets = {}
861
862 for tweet_id in tweet_ids:
863 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json"
864 if cache_file.exists():
865 try:
866 with open(cache_file, 'r') as f:
867 cached_data = json.load(f)
868
869 # Use longer cache times for older tweets (24 hours vs 1 hour)
870 from datetime import datetime, timedelta
871 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
872 tweet_created = cached_data.get('tweet_data', {}).get('created_at', '')
873
874 # Parse tweet creation time to determine age
875 try:
876 from dateutil.parser import parse
877 tweet_age = datetime.now() - parse(tweet_created)
878 cache_duration = timedelta(hours=24) if tweet_age > timedelta(hours=24) else timedelta(hours=1)
879 except:
880 cache_duration = timedelta(hours=1) # Default to 1 hour if parsing fails
881
882 if datetime.now() - cached_time < cache_duration:
883 cached_tweets[tweet_id] = cached_data.get('tweet_data')
884 logger.debug(f"Using cached tweet {tweet_id}")
885
886 except Exception as e:
887 logger.warning(f"Error loading cached tweet {tweet_id}: {e}")
888
889 return cached_tweets
890
891def save_cached_tweets(tweets_data: List[Dict], users_data: Dict[str, Dict] = None):
892 """Save individual tweets to cache for future reuse."""
893 try:
894 X_CACHE_DIR.mkdir(exist_ok=True)
895
896 for tweet in tweets_data:
897 tweet_id = tweet.get('id')
898 if not tweet_id:
899 continue
900
901 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json"
902
903 # Include user data if available
904 tweet_with_user = tweet.copy()
905 if users_data and tweet.get('author_id') in users_data:
906 tweet_with_user['author_info'] = users_data[tweet.get('author_id')]
907
908 cache_data = {
909 'tweet_id': tweet_id,
910 'tweet_data': tweet_with_user,
911 'cached_at': datetime.now().isoformat()
912 }
913
914 with open(cache_file, 'w') as f:
915 json.dump(cache_data, f, indent=2)
916
917 logger.debug(f"Cached individual tweet {tweet_id}")
918
919 except Exception as e:
920 logger.error(f"Error caching individual tweets: {e}")
921
922def get_cached_user_info() -> Optional[Dict]:
923 """Load cached user info if available and not expired."""
924 cache_file = X_CACHE_DIR / "user_info.json"
925 if cache_file.exists():
926 try:
927 with open(cache_file, 'r') as f:
928 cached_data = json.load(f)
929 # Check if cache is recent (within 24 hours)
930 from datetime import datetime, timedelta
931 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
932 if datetime.now() - cached_time < timedelta(hours=24):
933 logger.debug("Using cached user info")
934 return cached_data.get('data')
935 else:
936 logger.debug("Cached user info expired (>24 hours old)")
937 except Exception as e:
938 logger.warning(f"Error loading cached user info: {e}")
939 return None
940
941def save_cached_user_info(user_data: Dict):
942 """Save user info to cache."""
943 try:
944 X_CACHE_DIR.mkdir(exist_ok=True)
945 cache_file = X_CACHE_DIR / "user_info.json"
946
947 from datetime import datetime
948 cache_data = {
949 'data': user_data,
950 'cached_at': datetime.now().isoformat()
951 }
952
953 with open(cache_file, 'w') as f:
954 json.dump(cache_data, f, indent=2)
955
956 logger.debug(f"Cached user info: {user_data.get('username')}")
957
958 except Exception as e:
959 logger.error(f"Error caching user info: {e}")
960
961def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool:
962 """
963 Determine if we have sufficient context to skip backfilling missing tweets.
964
965 Args:
966 tweets: List of tweets already in the thread
967 missing_tweet_ids: Set of missing tweet IDs we'd like to fetch
968
969 Returns:
970 True if context is sufficient, False if backfill is needed
971 """
972 # If no missing tweets, context is sufficient
973 if not missing_tweet_ids:
974 return True
975
976 # If we have a substantial conversation (5+ tweets), likely sufficient
977 if len(tweets) >= 5:
978 logger.debug(f"Thread has {len(tweets)} tweets, considering sufficient")
979 return True
980
981 # If only a few missing tweets and we have some context, might be enough
982 if len(missing_tweet_ids) <= 2 and len(tweets) >= 3:
983 logger.debug(f"Only {len(missing_tweet_ids)} missing tweets with {len(tweets)} existing, considering sufficient")
984 return True
985
986 # Check if we have conversational flow (mentions between users)
987 has_conversation_flow = False
988 for tweet in tweets:
989 text = tweet.get('text', '').lower()
990 # Look for mentions, replies, or conversational indicators
991 if '@' in text or 'reply' in text or len([t for t in tweets if t.get('author_id') != tweet.get('author_id')]) > 1:
992 has_conversation_flow = True
993 break
994
995 # If we have clear conversational flow and reasonable length, sufficient
996 if has_conversation_flow and len(tweets) >= 2:
997 logger.debug("Thread has conversational flow, considering sufficient")
998 return True
999
1000 # Otherwise, we need to backfill
1001 logger.debug(f"Context insufficient: {len(tweets)} tweets, {len(missing_tweet_ids)} missing, no clear flow")
1002 return False
1003
1004def fetch_and_queue_mentions(username: str) -> int:
1005 """
1006 Single-pass function to fetch new mentions and queue them.
1007 Returns number of new mentions found.
1008 """
1009 try:
1010 client = create_x_client()
1011
1012 # Load last seen ID for incremental fetching
1013 last_seen_id = load_last_seen_id()
1014
1015 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}")
1016
1017 # Search for mentions - this calls GET /2/tweets/search/recent
1018 logger.debug(f"Calling search_mentions API for @{username}")
1019 mentions = client.search_mentions(
1020 username=username,
1021 since_id=last_seen_id,
1022 max_results=100 # Get as many as possible
1023 )
1024
1025 if not mentions:
1026 logger.info("No new mentions found")
1027 return 0
1028
1029 # Process mentions (newest first, so reverse to process oldest first)
1030 mentions.reverse()
1031 new_count = 0
1032
1033 for mention in mentions:
1034 save_mention_to_queue(mention)
1035 new_count += 1
1036
1037 # Update last seen ID to the most recent mention
1038 if mentions:
1039 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent
1040 save_last_seen_id(most_recent_id)
1041
1042 logger.info(f"Queued {new_count} new X mentions")
1043 return new_count
1044
1045 except Exception as e:
1046 logger.error(f"Error fetching and queuing mentions: {e}")
1047 return 0
1048
1049# Simple test function
1050def get_my_user_info():
1051 """Get the authenticated user's information to find correct user ID."""
1052 try:
1053 client = create_x_client()
1054
1055 # Get authenticated user info using cached method
1056 print("Fetching authenticated user information...")
1057 user_data = client.get_user_info("id,name,username,description")
1058
1059 if user_data:
1060 print(f"✅ Found authenticated user:")
1061 print(f" ID: {user_data.get('id')}")
1062 print(f" Username: @{user_data.get('username')}")
1063 print(f" Name: {user_data.get('name')}")
1064 print(f" Description: {user_data.get('description', 'N/A')[:100]}...")
1065 print(f"\n🔧 Update your x_config.yaml with:")
1066 print(f" user_id: \"{user_data.get('id')}\"")
1067 return user_data
1068 else:
1069 print("❌ Failed to get user information")
1070 print(f"Response: {response}")
1071 return None
1072
1073 except Exception as e:
1074 print(f"Error getting user info: {e}")
1075 return None
1076
1077def test_search_mentions():
1078 """Test the search-based mention detection."""
1079 try:
1080 client = create_x_client()
1081
1082 # First get our username
1083 username = client.get_username()
1084 if not username:
1085 print("❌ Could not get username")
1086 return
1087 print(f"🔍 Searching for mentions of @{username}")
1088
1089 mentions = client.search_mentions(username, max_results=5)
1090
1091 if mentions:
1092 print(f"✅ Found {len(mentions)} mentions via search:")
1093 for mention in mentions:
1094 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...")
1095 else:
1096 print("No mentions found via search")
1097
1098 except Exception as e:
1099 print(f"Search test failed: {e}")
1100
1101def test_fetch_and_queue():
1102 """Test the single-pass fetch and queue function."""
1103 try:
1104 client = create_x_client()
1105
1106 # Get our username
1107 username = client.get_username()
1108 if not username:
1109 print("❌ Could not get username")
1110 return
1111 print(f"🔄 Fetching and queueing mentions for @{username}")
1112
1113 # Show current state
1114 last_seen = load_last_seen_id()
1115 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}")
1116
1117 # Fetch and queue
1118 new_count = fetch_and_queue_mentions(username)
1119
1120 if new_count > 0:
1121 print(f"✅ Queued {new_count} new mentions")
1122 print(f"📁 Check ./x_queue/ directory for queued mentions")
1123
1124 # Show updated state
1125 new_last_seen = load_last_seen_id()
1126 print(f"📍 Updated last seen ID: {new_last_seen}")
1127 else:
1128 print("ℹ️ No new mentions to queue")
1129
1130 except Exception as e:
1131 print(f"Fetch and queue test failed: {e}")
1132
1133def test_thread_context():
1134 """Test thread context retrieval from a queued mention."""
1135 try:
1136 import json
1137
1138 # Find a queued mention file
1139 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1140 if not queue_files:
1141 print("❌ No queued mentions found. Run 'python x.py queue' first.")
1142 return
1143
1144 # Read the first mention
1145 mention_file = queue_files[0]
1146 with open(mention_file, 'r') as f:
1147 mention_data = json.load(f)
1148
1149 mention = mention_data['mention']
1150 print(f"📄 Using mention: {mention.get('id')}")
1151 print(f"📝 Text: {mention.get('text')}")
1152
1153 # Check if it has a conversation_id
1154 conversation_id = mention.get('conversation_id')
1155 if not conversation_id:
1156 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.")
1157 return
1158
1159 print(f"🧵 Getting thread context for conversation: {conversation_id}")
1160
1161 # Get thread context
1162 client = create_x_client()
1163 thread_data = client.get_thread_context(conversation_id)
1164
1165 if thread_data:
1166 tweets = thread_data.get('tweets', [])
1167 print(f"✅ Retrieved thread with {len(tweets)} tweets")
1168
1169 # Convert to YAML
1170 yaml_thread = thread_to_yaml_string(thread_data)
1171
1172 # Save thread context for inspection
1173 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml"
1174 with open(thread_file, 'w') as f:
1175 f.write(yaml_thread)
1176
1177 print(f"💾 Saved thread context to: {thread_file}")
1178 print("\n📋 Thread preview:")
1179 print(yaml_thread)
1180 else:
1181 print("❌ Failed to retrieve thread context")
1182
1183 except Exception as e:
1184 print(f"Thread context test failed: {e}")
1185
1186def test_letta_integration(agent_id: str = None):
1187 """Test sending X thread context to Letta agent."""
1188 try:
1189 from letta_client import Letta
1190 import json
1191 import yaml
1192
1193 # Load X config to access letta section
1194 try:
1195 x_config = load_x_config()
1196 letta_config = x_config.get('letta', {})
1197 api_key = letta_config.get('api_key')
1198 config_agent_id = letta_config.get('agent_id')
1199
1200 # Use agent_id from config if not provided as parameter
1201 if not agent_id:
1202 if config_agent_id:
1203 agent_id = config_agent_id
1204 print(f"ℹ️ Using agent_id from config: {agent_id}")
1205 else:
1206 print("❌ No agent_id found in x_config.yaml")
1207 print("Expected config structure:")
1208 print(" letta:")
1209 print(" agent_id: your-agent-id")
1210 return
1211 else:
1212 print(f"ℹ️ Using provided agent_id: {agent_id}")
1213
1214 if not api_key:
1215 # Try loading from environment as fallback
1216 import os
1217 api_key = os.getenv('LETTA_API_KEY')
1218 if not api_key:
1219 print("❌ LETTA_API_KEY not found in x_config.yaml or environment")
1220 print("Expected config structure:")
1221 print(" letta:")
1222 print(" api_key: your-letta-api-key")
1223 return
1224 else:
1225 print("ℹ️ Using LETTA_API_KEY from environment")
1226 else:
1227 print("ℹ️ Using LETTA_API_KEY from x_config.yaml")
1228
1229 except Exception as e:
1230 print(f"❌ Error loading config: {e}")
1231 return
1232
1233 letta_client = Letta(token=api_key, timeout=600)
1234 print(f"🤖 Connected to Letta, using agent: {agent_id}")
1235
1236 # Find a queued mention file
1237 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1238 if not queue_files:
1239 print("❌ No queued mentions found. Run 'python x.py queue' first.")
1240 return
1241
1242 # Read the first mention
1243 mention_file = queue_files[0]
1244 with open(mention_file, 'r') as f:
1245 mention_data = json.load(f)
1246
1247 mention = mention_data['mention']
1248 conversation_id = mention.get('conversation_id')
1249
1250 if not conversation_id:
1251 print("❌ No conversation_id found in mention.")
1252 return
1253
1254 print(f"🧵 Getting thread context for conversation: {conversation_id}")
1255
1256 # Get thread context
1257 x_client = create_x_client()
1258 thread_data = x_client.get_thread_context(conversation_id)
1259
1260 if not thread_data:
1261 print("❌ Failed to retrieve thread context")
1262 return
1263
1264 # Convert to YAML
1265 yaml_thread = thread_to_yaml_string(thread_data)
1266
1267 # Create prompt for the agent
1268 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately.
1269
1270Here is the thread context:
1271
1272{yaml_thread}
1273
1274Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona."""
1275
1276 prompt_char_count = len(prompt)
1277 print(f"📤 Sending thread context to Letta agent... | prompt: {prompt_char_count} chars")
1278
1279 # Print the prompt in a rich panel
1280 rprint(Panel(prompt, title=f"Prompt ({prompt_char_count} chars)", border_style="blue"))
1281
1282 # Send to Letta agent using streaming
1283 message_stream = letta_client.agents.messages.create_stream(
1284 agent_id=agent_id,
1285 messages=[{"role": "user", "content": prompt}],
1286 stream_tokens=False,
1287 max_steps=10
1288 )
1289
1290 print("🔄 Streaming response from agent...")
1291 response_text = ""
1292
1293 for chunk in message_stream:
1294 print(chunk)
1295 if hasattr(chunk, 'message_type'):
1296 if chunk.message_type == 'assistant_message':
1297 print(f"🤖 Agent response: {chunk.content}")
1298 response_text = chunk.content
1299 elif chunk.message_type == 'reasoning_message':
1300 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...")
1301 elif chunk.message_type == 'tool_call_message':
1302 print(f"🔧 Agent tool call: {chunk.tool_call.name}")
1303
1304 if response_text:
1305 print(f"\n✅ Agent generated response:")
1306 print(f"📝 Response: {response_text}")
1307 else:
1308 print("❌ No response generated by agent")
1309
1310 except Exception as e:
1311 print(f"Letta integration test failed: {e}")
1312 import traceback
1313 traceback.print_exc()
1314
1315def test_x_client():
1316 """Test the X client by fetching mentions."""
1317 try:
1318 client = create_x_client()
1319 mentions = client.get_mentions(max_results=5)
1320
1321 if mentions:
1322 print(f"Successfully retrieved {len(mentions)} mentions:")
1323 for mention in mentions:
1324 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...")
1325 else:
1326 print("No mentions retrieved")
1327
1328 except Exception as e:
1329 print(f"Test failed: {e}")
1330
1331def reply_to_cameron_post():
1332 """
1333 Reply to Cameron's specific X post.
1334
1335 NOTE: This requires OAuth User Context authentication, not Bearer token.
1336 Current Bearer token is Application-Only which can't post.
1337 """
1338 try:
1339 client = create_x_client()
1340
1341 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618
1342 cameron_post_id = "1950690566909710618"
1343
1344 # Simple reply message
1345 reply_text = "Hello from void! 🤖 Testing X integration."
1346
1347 print(f"Attempting to reply to post {cameron_post_id}")
1348 print(f"Reply text: {reply_text}")
1349 print("\nNOTE: This will fail with current Bearer token (Application-Only)")
1350 print("Posting requires OAuth User Context authentication")
1351
1352 result = client.post_reply(reply_text, cameron_post_id)
1353
1354 if result:
1355 print(f"✅ Successfully posted reply!")
1356 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}")
1357 else:
1358 print("❌ Failed to post reply (expected with current auth)")
1359
1360 except Exception as e:
1361 print(f"Reply failed: {e}")
1362
1363def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False):
1364 """
1365 Process an X mention and generate a reply using the Letta agent.
1366 Similar to bsky.py process_mention but for X/Twitter.
1367
1368 Args:
1369 void_agent: The Letta agent instance
1370 x_client: The X API client
1371 mention_data: The mention data dictionary
1372 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
1373 testing_mode: If True, don't actually post to X
1374
1375 Returns:
1376 True: Successfully processed, remove from queue
1377 False: Failed but retryable, keep in queue
1378 None: Failed with non-retryable error, move to errors directory
1379 "no_reply": No reply was generated, move to no_reply directory
1380 """
1381 try:
1382 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}")
1383
1384 # Extract mention details
1385 if isinstance(mention_data, dict):
1386 # Handle both raw mention and queued mention formats
1387 if 'mention' in mention_data:
1388 mention = mention_data['mention']
1389 else:
1390 mention = mention_data
1391 else:
1392 mention = mention_data
1393
1394 mention_id = mention.get('id')
1395 mention_text = mention.get('text', '')
1396 author_id = mention.get('author_id')
1397 conversation_id = mention.get('conversation_id')
1398 in_reply_to_user_id = mention.get('in_reply_to_user_id')
1399 referenced_tweets = mention.get('referenced_tweets', [])
1400
1401 # Check downrank list - only respond to downranked users 10% of the time
1402 downrank_users = load_downrank_users()
1403 if not should_respond_to_downranked_user(str(author_id), downrank_users):
1404 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection")
1405 return "no_reply"
1406
1407 # Enhanced conversation tracking for debug - especially important for Grok handling
1408 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}")
1409 logger.info(f" Author ID: {author_id}")
1410 logger.info(f" Conversation ID: {conversation_id}")
1411 logger.info(f" In Reply To User ID: {in_reply_to_user_id}")
1412 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items")
1413 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets
1414 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}")
1415 logger.info(f" Text preview: {mention_text[:100]}...")
1416
1417 if not conversation_id:
1418 logger.warning(f"❌ No conversation_id found for mention {mention_id} - this may cause thread context issues")
1419 return None
1420
1421 # Get thread context with caching enabled for efficiency
1422 # Use mention_id as until_id to exclude tweets that occurred after this mention
1423 try:
1424 thread_data = x_client.get_thread_context(conversation_id, use_cache=True, until_id=mention_id)
1425 if not thread_data:
1426 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}")
1427 return False
1428
1429 # If this mention references a specific tweet, ensure we have that tweet in context
1430 if referenced_tweets:
1431 for ref in referenced_tweets:
1432 if ref.get('type') == 'replied_to':
1433 ref_id = ref.get('id')
1434 # Check if the referenced tweet is in our thread data
1435 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])]
1436 if ref_id and ref_id not in thread_tweet_ids:
1437 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch")
1438 try:
1439 # Fetch the missing referenced tweet directly
1440 endpoint = f"/tweets/{ref_id}"
1441 params = {
1442 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
1443 "user.fields": "id,name,username",
1444 "expansions": "author_id"
1445 }
1446 logger.debug(f"Fetching individual missing tweet: GET {endpoint}")
1447 response = x_client._make_request(endpoint, params)
1448 if response and "data" in response:
1449 missing_tweet = response["data"]
1450 if missing_tweet.get('conversation_id') == conversation_id:
1451 # Add to thread data
1452 if 'tweets' not in thread_data:
1453 thread_data['tweets'] = []
1454 thread_data['tweets'].append(missing_tweet)
1455
1456 # Add user data if available
1457 if "includes" in response and "users" in response["includes"]:
1458 if 'users' not in thread_data:
1459 thread_data['users'] = {}
1460 for user in response["includes"]["users"]:
1461 thread_data['users'][user["id"]] = user
1462
1463 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context")
1464 else:
1465 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}")
1466 except Exception as e:
1467 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}")
1468
1469 # Enhanced thread context debugging
1470 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}")
1471 thread_posts = thread_data.get('tweets', [])
1472 thread_users = thread_data.get('users', {})
1473 logger.info(f" Posts in thread: {len(thread_posts)}")
1474 logger.info(f" Users in thread: {len(thread_users)}")
1475
1476 # Log thread participants for Grok detection
1477 for user_id, user_info in thread_users.items():
1478 username = user_info.get('username', 'unknown')
1479 name = user_info.get('name', 'Unknown')
1480 is_verified = user_info.get('verified', False)
1481 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}")
1482
1483 # Special logging for Grok or AI-related users
1484 if 'grok' in username.lower() or 'grok' in name.lower():
1485 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})")
1486
1487 # Log conversation structure
1488 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts
1489 post_id = post.get('id')
1490 post_author = post.get('author_id')
1491 post_text = post.get('text', '')[:50]
1492 is_reply = 'in_reply_to_user_id' in post
1493 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...")
1494
1495 except Exception as e:
1496 logger.error(f"❌ Error getting thread context: {e}")
1497 return False
1498
1499 # Convert to YAML string
1500 thread_context = thread_to_yaml_string(thread_data)
1501 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters")
1502
1503 # Save comprehensive conversation data for debugging
1504 try:
1505 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1506 debug_dir.mkdir(parents=True, exist_ok=True)
1507
1508 # Save raw thread data (JSON)
1509 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f:
1510 json.dump(thread_data, f, indent=2)
1511
1512 # Save YAML thread context
1513 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f:
1514 f.write(thread_context)
1515
1516 # Save mention processing debug info
1517 debug_info = {
1518 'processed_at': datetime.now().isoformat(),
1519 'mention_id': mention_id,
1520 'conversation_id': conversation_id,
1521 'author_id': author_id,
1522 'in_reply_to_user_id': in_reply_to_user_id,
1523 'referenced_tweets': referenced_tweets,
1524 'thread_stats': {
1525 'total_posts': len(thread_posts),
1526 'total_users': len(thread_users),
1527 'yaml_length': len(thread_context)
1528 },
1529 'users_in_conversation': {
1530 user_id: {
1531 'username': user_info.get('username'),
1532 'name': user_info.get('name'),
1533 'verified': user_info.get('verified', False),
1534 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower()
1535 }
1536 for user_id, user_info in thread_users.items()
1537 }
1538 }
1539
1540 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f:
1541 json.dump(debug_info, f, indent=2)
1542
1543 logger.info(f"💾 Saved conversation debug data to: {debug_dir}")
1544
1545 except Exception as debug_error:
1546 logger.warning(f"Failed to save debug data: {debug_error}")
1547 # Continue processing even if debug save fails
1548
1549 # Check for #voidstop
1550 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower():
1551 logger.info("Found #voidstop, skipping this mention")
1552 return True
1553
1554 # Create prompt for Letta agent
1555 author_info = thread_data.get('users', {}).get(author_id, {})
1556 author_username = author_info.get('username', 'unknown')
1557 author_name = author_info.get('name', author_username)
1558
1559 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}).
1560
1561MOST RECENT POST (the mention you're responding to):
1562"{mention_text}"
1563
1564FULL THREAD CONTEXT:
1565```yaml
1566{thread_context}
1567```
1568
1569The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
1570
1571If you need to update user information, use the x_user_* tools.
1572
1573To reply, use the add_post_to_x_thread tool:
1574- Each call creates one post (max 280 characters)
1575- For most responses, a single call is sufficient
1576- Only use multiple calls for threaded replies when:
1577 * The topic requires extended explanation that cannot fit in 280 characters
1578 * You're explicitly asked for a detailed/long response
1579 * The conversation naturally benefits from a structured multi-part answer
1580- Avoid unnecessary threads - be concise when possible"""
1581
1582 # Log mention processing
1583 title = f"X MENTION FROM @{author_username}"
1584 print(f"\n▶ {title}")
1585 print(f" {'═' * len(title)}")
1586 for line in mention_text.split('\n'):
1587 print(f" {line}")
1588
1589 # Send to Letta agent
1590 from letta_client import Letta
1591
1592 config = get_x_letta_config()
1593 letta_client = Letta(token=config['api_key'], timeout=config['timeout'])
1594
1595 prompt_char_count = len(prompt)
1596 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars | prompt: {prompt_char_count} chars")
1597
1598 try:
1599 # Use streaming to avoid timeout errors
1600 message_stream = letta_client.agents.messages.create_stream(
1601 agent_id=void_agent.id,
1602 messages=[{"role": "user", "content": prompt}],
1603 stream_tokens=False,
1604 max_steps=100
1605 )
1606
1607 # Collect streaming response (simplified version of bsky.py logic)
1608 all_messages = []
1609 for chunk in message_stream:
1610 if hasattr(chunk, 'message_type'):
1611 if chunk.message_type == 'reasoning_message':
1612 print("\n◆ Reasoning")
1613 print(" ─────────")
1614 for line in chunk.reasoning.split('\n'):
1615 print(f" {line}")
1616 elif chunk.message_type == 'tool_call_message':
1617 tool_name = chunk.tool_call.name
1618 if tool_name == 'add_post_to_x_thread':
1619 try:
1620 args = json.loads(chunk.tool_call.arguments)
1621 text = args.get('text', '')
1622 if text:
1623 print("\n✎ X Post")
1624 print(" ────────")
1625 for line in text.split('\n'):
1626 print(f" {line}")
1627 except:
1628 pass
1629 elif tool_name == 'halt_activity':
1630 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT")
1631 if queue_filepath and queue_filepath.exists():
1632 queue_filepath.unlink()
1633 logger.info(f"Deleted queue file: {queue_filepath.name}")
1634 logger.info("=== X BOT TERMINATED BY AGENT ===")
1635 exit(0)
1636 elif chunk.message_type == 'tool_return_message':
1637 tool_name = chunk.name
1638 status = chunk.status
1639 if status == 'success' and tool_name == 'add_post_to_x_thread':
1640 print("\n✓ X Post Queued")
1641 print(" ──────────────")
1642 print(" Post queued successfully")
1643 elif chunk.message_type == 'assistant_message':
1644 print("\n▶ Assistant Response")
1645 print(" ──────────────────")
1646 for line in chunk.content.split('\n'):
1647 print(f" {line}")
1648
1649 all_messages.append(chunk)
1650 if str(chunk) == 'done':
1651 break
1652
1653 # Convert streaming response for compatibility
1654 message_response = type('StreamingResponse', (), {
1655 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
1656 })()
1657
1658 except Exception as api_error:
1659 logger.error(f"Letta API error: {api_error}")
1660 raise
1661
1662 # Extract successful add_post_to_x_thread tool calls
1663 reply_candidates = []
1664 tool_call_results = {}
1665 ignored_notification = False
1666 ack_note = None # Track any note from annotate_ack tool
1667
1668 # First pass: collect tool return statuses
1669 for message in message_response.messages:
1670 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
1671 if message.name == 'add_post_to_x_thread':
1672 tool_call_results[message.tool_call_id] = message.status
1673 elif message.name == 'ignore_notification':
1674 if message.status == 'success':
1675 ignored_notification = True
1676 logger.info("🚫 X notification ignored")
1677
1678 # Second pass: collect successful tool calls
1679 for message in message_response.messages:
1680 if hasattr(message, 'tool_call') and message.tool_call:
1681 # Collect annotate_ack tool calls
1682 if message.tool_call.name == 'annotate_ack':
1683 try:
1684 args = json.loads(message.tool_call.arguments)
1685 note = args.get('note', '')
1686 if note:
1687 ack_note = note
1688 logger.debug(f"Found annotate_ack with note: {note[:50]}...")
1689 except json.JSONDecodeError as e:
1690 logger.error(f"Failed to parse annotate_ack arguments: {e}")
1691
1692 # Collect add_post_to_x_thread tool calls - only if they were successful
1693 elif message.tool_call.name == 'add_post_to_x_thread':
1694 tool_call_id = message.tool_call.tool_call_id
1695 tool_status = tool_call_results.get(tool_call_id, 'unknown')
1696
1697 if tool_status == 'success':
1698 try:
1699 args = json.loads(message.tool_call.arguments)
1700 reply_text = args.get('text', '')
1701 if reply_text:
1702 reply_candidates.append(reply_text)
1703 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...")
1704 except json.JSONDecodeError as e:
1705 logger.error(f"Failed to parse tool call arguments: {e}")
1706
1707 # Save agent response data to debug folder
1708 try:
1709 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1710
1711 # Save complete agent interaction
1712 agent_response_data = {
1713 'processed_at': datetime.now().isoformat(),
1714 'mention_id': mention_id,
1715 'conversation_id': conversation_id,
1716 'prompt_sent': prompt,
1717 'reply_candidates': reply_candidates,
1718 'ignored_notification': ignored_notification,
1719 'ack_note': ack_note,
1720 'tool_call_results': tool_call_results,
1721 'all_messages': []
1722 }
1723
1724 # Convert messages to serializable format
1725 for message in message_response.messages:
1726 msg_data = {
1727 'message_type': getattr(message, 'message_type', 'unknown'),
1728 'content': getattr(message, 'content', ''),
1729 'reasoning': getattr(message, 'reasoning', ''),
1730 'status': getattr(message, 'status', ''),
1731 'name': getattr(message, 'name', ''),
1732 }
1733
1734 if hasattr(message, 'tool_call') and message.tool_call:
1735 msg_data['tool_call'] = {
1736 'name': message.tool_call.name,
1737 'arguments': message.tool_call.arguments,
1738 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '')
1739 }
1740
1741 agent_response_data['all_messages'].append(msg_data)
1742
1743 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f:
1744 json.dump(agent_response_data, f, indent=2)
1745
1746 logger.info(f"💾 Saved agent response debug data")
1747
1748 except Exception as debug_error:
1749 logger.warning(f"Failed to save agent response debug data: {debug_error}")
1750
1751 # Handle conflicts
1752 if reply_candidates and ignored_notification:
1753 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!")
1754 return False
1755
1756 if reply_candidates:
1757 # Post replies to X
1758 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X")
1759
1760 if len(reply_candidates) == 1:
1761 content = reply_candidates[0]
1762 title = f"Reply to @{author_username}"
1763 else:
1764 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)])
1765 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)"
1766
1767 print(f"\n✎ {title}")
1768 print(f" {'─' * len(title)}")
1769 for line in content.split('\n'):
1770 print(f" {line}")
1771
1772 if testing_mode:
1773 logger.info("TESTING MODE: Skipping actual X post")
1774 return True
1775 else:
1776 # Post to X using thread approach
1777 success = post_x_thread_replies(x_client, mention_id, reply_candidates)
1778 if success:
1779 logger.info(f"Successfully replied to @{author_username} on X")
1780
1781 # Acknowledge the post we're replying to
1782 try:
1783 ack_result = acknowledge_x_post(x_client, mention_id, ack_note)
1784 if ack_result:
1785 if ack_note:
1786 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")")
1787 else:
1788 logger.info(f"Successfully acknowledged X post from @{author_username}")
1789 else:
1790 logger.warning(f"Failed to acknowledge X post from @{author_username}")
1791 except Exception as e:
1792 logger.error(f"Error acknowledging X post from @{author_username}: {e}")
1793 # Don't fail the entire operation if acknowledgment fails
1794
1795 return True
1796 else:
1797 logger.error(f"Failed to send reply to @{author_username} on X")
1798 return False
1799 else:
1800 if ignored_notification:
1801 logger.info(f"X mention from @{author_username} was explicitly ignored")
1802 return "ignored"
1803 else:
1804 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass")
1805 return False # Keep in queue for retry instead of removing
1806
1807 except Exception as e:
1808 logger.error(f"Error processing X mention: {e}")
1809 return False
1810
1811def acknowledge_x_post(x_client, post_id, note=None):
1812 """
1813 Acknowledge an X post that we replied to.
1814 Uses the same Bluesky client and uploads to the void data repository on atproto,
1815 just like Bluesky acknowledgments.
1816
1817 Args:
1818 x_client: XClient instance (not used, kept for compatibility)
1819 post_id: The X post ID we're acknowledging
1820 note: Optional note to include with the acknowledgment
1821
1822 Returns:
1823 True if successful, False otherwise
1824 """
1825 try:
1826 # Use Bluesky client to upload acks to the void data repository on atproto
1827 bsky_client = bsky_utils.default_login()
1828
1829 # Create a synthetic URI and CID for the X post
1830 # X posts don't have atproto URIs/CIDs, so we create identifiers
1831 post_uri = f"x://twitter.com/post/{post_id}"
1832 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts
1833
1834 # Use the same acknowledge_post function as Bluesky
1835 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note)
1836
1837 if ack_result:
1838 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else ""))
1839 return True
1840 else:
1841 logger.error(f"Failed to acknowledge X post {post_id}")
1842 return False
1843
1844 except Exception as e:
1845 logger.error(f"Error acknowledging X post {post_id}: {e}")
1846 return False
1847
1848def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages):
1849 """
1850 Post a series of replies to X, threading them properly.
1851
1852 Args:
1853 x_client: XClient instance
1854 in_reply_to_tweet_id: The original tweet ID to reply to
1855 reply_messages: List of reply text strings
1856
1857 Returns:
1858 True if successful, False otherwise
1859 """
1860 try:
1861 current_reply_id = in_reply_to_tweet_id
1862
1863 for i, reply_text in enumerate(reply_messages):
1864 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...")
1865
1866 result = x_client.post_reply(reply_text, current_reply_id)
1867
1868 if result and 'data' in result:
1869 new_tweet_id = result['data']['id']
1870 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}")
1871 # For threading, the next reply should reply to this one
1872 current_reply_id = new_tweet_id
1873 else:
1874 logger.error(f"Failed to post X reply {i+1}")
1875 return False
1876
1877 return True
1878
1879 except Exception as e:
1880 logger.error(f"Error posting X thread replies: {e}")
1881 return False
1882
1883def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False):
1884 """
1885 Load and process all X mentions from the queue.
1886 Similar to bsky.py load_and_process_queued_notifications but for X.
1887 """
1888 try:
1889 # Get all X mention files in queue directory
1890 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1891
1892 if not queue_files:
1893 return
1894
1895 # Load file metadata and sort by creation time (chronological order)
1896 file_metadata = []
1897 for filepath in queue_files:
1898 try:
1899 with open(filepath, 'r') as f:
1900 queue_data = json.load(f)
1901 mention_data = queue_data.get('mention', queue_data)
1902 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing
1903 file_metadata.append((created_at, filepath))
1904 except Exception as e:
1905 logger.warning(f"Error reading queue file {filepath.name}: {e}")
1906 # Add with default timestamp so it still gets processed
1907 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath))
1908
1909 # Sort by creation time (oldest first)
1910 file_metadata.sort(key=lambda x: x[0])
1911
1912 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order")
1913
1914 for i, (created_at, filepath) in enumerate(file_metadata, 1):
1915 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})")
1916
1917 try:
1918 # Load mention data
1919 with open(filepath, 'r') as f:
1920 queue_data = json.load(f)
1921
1922 mention_data = queue_data.get('mention', queue_data)
1923
1924 # Process the mention
1925 success = process_x_mention(void_agent, x_client, mention_data,
1926 queue_filepath=filepath, testing_mode=testing_mode)
1927
1928 except XRateLimitError:
1929 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning")
1930 break
1931
1932 except Exception as e:
1933 logger.error(f"Error processing X queue file {filepath.name}: {e}")
1934 continue
1935
1936 # Handle file based on processing result
1937 if success:
1938 if testing_mode:
1939 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}")
1940 else:
1941 filepath.unlink()
1942 logger.info(f"Successfully processed and removed X file: {filepath.name}")
1943
1944 # Mark as processed
1945 processed_mentions = load_processed_mentions()
1946 processed_mentions.add(mention_data.get('id'))
1947 save_processed_mentions(processed_mentions)
1948
1949 elif success is None: # Move to error directory
1950 error_dir = X_QUEUE_DIR / "errors"
1951 error_dir.mkdir(exist_ok=True)
1952 error_path = error_dir / filepath.name
1953 filepath.rename(error_path)
1954 logger.warning(f"Moved X file {filepath.name} to errors directory")
1955
1956 elif success == "no_reply": # Move to no_reply directory
1957 no_reply_dir = X_QUEUE_DIR / "no_reply"
1958 no_reply_dir.mkdir(exist_ok=True)
1959 no_reply_path = no_reply_dir / filepath.name
1960 filepath.rename(no_reply_path)
1961 logger.info(f"Moved X file {filepath.name} to no_reply directory")
1962
1963 elif success == "ignored": # Delete ignored notifications
1964 filepath.unlink()
1965 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}")
1966
1967 else:
1968 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry")
1969
1970 except Exception as e:
1971 logger.error(f"Error loading queued X mentions: {e}")
1972
1973def process_x_notifications(void_agent, x_client, testing_mode=False):
1974 """
1975 Fetch new X mentions, queue them, and process the queue.
1976 Similar to bsky.py process_notifications but for X.
1977 """
1978 try:
1979 # Get username for fetching mentions - uses cached data to avoid rate limits
1980 username = x_client.get_username()
1981 if not username:
1982 logger.error("Could not get username for X mentions")
1983 return
1984
1985 # Fetch and queue new mentions
1986 new_count = fetch_and_queue_mentions(username)
1987
1988 if new_count > 0:
1989 logger.info(f"Found {new_count} new X mentions to process")
1990
1991 # Process the entire queue
1992 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
1993
1994 except Exception as e:
1995 logger.error(f"Error processing X notifications: {e}")
1996
1997def initialize_x_void():
1998 """Initialize the void agent for X operations."""
1999 logger.info("Starting void agent initialization for X...")
2000
2001 from letta_client import Letta
2002
2003 # Get config
2004 config = get_x_letta_config()
2005 client = Letta(token=config['api_key'], timeout=config['timeout'])
2006 agent_id = config['agent_id']
2007
2008 try:
2009 void_agent = client.agents.retrieve(agent_id=agent_id)
2010 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})")
2011 except Exception as e:
2012 logger.error(f"Failed to load void agent {agent_id}: {e}")
2013 raise e
2014
2015 # Ensure correct tools are attached for X
2016 logger.info("Configuring tools for X platform...")
2017 try:
2018 from tool_manager import ensure_platform_tools
2019 ensure_platform_tools('x', void_agent.id, config['api_key'])
2020 except Exception as e:
2021 logger.error(f"Failed to configure platform tools: {e}")
2022 logger.warning("Continuing with existing tool configuration")
2023
2024 # Log agent details
2025 logger.info(f"X Void agent details - ID: {void_agent.id}")
2026 logger.info(f"Agent name: {void_agent.name}")
2027
2028 return void_agent
2029
2030def x_main_loop(testing_mode=False, cleanup_interval=10):
2031 """
2032 Main X bot loop that continuously monitors for mentions and processes them.
2033 Similar to bsky.py main() but for X/Twitter.
2034
2035 Args:
2036 testing_mode: If True, don't actually post to X
2037 cleanup_interval: Run user block cleanup every N cycles (0 to disable)
2038 """
2039 import time
2040 from time import sleep
2041 from letta_client import Letta
2042
2043 logger.info("=== STARTING X VOID BOT ===")
2044
2045 # Configure logging from config file
2046 setup_logging_from_config()
2047
2048 # Initialize void agent
2049 void_agent = initialize_x_void()
2050 logger.info(f"X void agent initialized: {void_agent.id}")
2051
2052 # Initialize X client
2053 x_client = create_x_client()
2054 logger.info("Connected to X API")
2055
2056 # Get Letta client for periodic cleanup
2057 config = get_x_letta_config()
2058 letta_client = Letta(token=config['api_key'], timeout=config['timeout'])
2059
2060 # Main loop
2061 FETCH_DELAY_SEC = 120 # Check every 2 minutes for X mentions (reduced from 60s to conserve API calls)
2062 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds")
2063
2064 if testing_mode:
2065 logger.info("=== RUNNING IN X TESTING MODE ===")
2066 logger.info(" - No messages will be sent to X")
2067 logger.info(" - Queue files will not be deleted")
2068
2069 if cleanup_interval > 0:
2070 logger.info(f"User block cleanup enabled every {cleanup_interval} cycles")
2071 else:
2072 logger.info("User block cleanup disabled")
2073
2074 cycle_count = 0
2075 start_time = time.time()
2076
2077 while True:
2078 try:
2079 cycle_count += 1
2080 logger.info(f"=== X CYCLE {cycle_count} ===")
2081
2082 # Process X notifications (fetch, queue, and process)
2083 process_x_notifications(void_agent, x_client, testing_mode)
2084
2085 # Log cycle completion
2086 elapsed_time = time.time() - start_time
2087 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes")
2088
2089 sleep(FETCH_DELAY_SEC)
2090
2091 except KeyboardInterrupt:
2092 elapsed_time = time.time() - start_time
2093 logger.info("=== X BOT STOPPED BY USER ===")
2094 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes")
2095 break
2096 except Exception as e:
2097 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===")
2098 logger.error(f"Error details: {e}")
2099 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...")
2100 sleep(FETCH_DELAY_SEC * 2)
2101
2102def process_queue_only(testing_mode=False):
2103 """
2104 Process all queued X mentions without fetching new ones.
2105 Useful for rate limit management - queue first, then process separately.
2106
2107 Args:
2108 testing_mode: If True, don't actually post to X and keep queue files
2109 """
2110 logger.info("=== PROCESSING X QUEUE ONLY ===")
2111
2112 if testing_mode:
2113 logger.info("=== RUNNING IN X TESTING MODE ===")
2114 logger.info(" - No messages will be sent to X")
2115 logger.info(" - Queue files will not be deleted")
2116
2117 try:
2118 # Initialize void agent
2119 void_agent = initialize_x_void()
2120 logger.info(f"X void agent initialized: {void_agent.id}")
2121
2122 # Initialize X client
2123 x_client = create_x_client()
2124 logger.info("Connected to X API")
2125
2126 # Process the queue without fetching new mentions
2127 logger.info("Processing existing X queue...")
2128 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
2129
2130 logger.info("=== X QUEUE PROCESSING COMPLETE ===")
2131
2132 except Exception as e:
2133 logger.error(f"Error processing X queue: {e}")
2134 raise
2135
2136def x_notification_loop():
2137 """
2138 DEPRECATED: Old X notification loop using search-based mention detection.
2139 Use x_main_loop() instead for the full bot experience.
2140 """
2141 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.")
2142 x_main_loop()
2143
2144if __name__ == "__main__":
2145 import sys
2146 import argparse
2147
2148 if len(sys.argv) > 1:
2149 if sys.argv[1] == "bot":
2150 # Main bot with optional --test flag and cleanup interval
2151 parser = argparse.ArgumentParser(description='X Void Bot')
2152 parser.add_argument('command', choices=['bot'])
2153 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)')
2154 parser.add_argument('--cleanup-interval', type=int, default=10,
2155 help='Run user block cleanup every N cycles (default: 10, 0 to disable)')
2156 args = parser.parse_args()
2157 x_main_loop(testing_mode=args.test, cleanup_interval=args.cleanup_interval)
2158 elif sys.argv[1] == "loop":
2159 x_notification_loop()
2160 elif sys.argv[1] == "reply":
2161 reply_to_cameron_post()
2162 elif sys.argv[1] == "me":
2163 get_my_user_info()
2164 elif sys.argv[1] == "search":
2165 test_search_mentions()
2166 elif sys.argv[1] == "queue":
2167 test_fetch_and_queue()
2168 elif sys.argv[1] == "thread":
2169 test_thread_context()
2170 elif sys.argv[1] == "process":
2171 # Process all queued mentions with optional --test flag
2172 testing_mode = "--test" in sys.argv
2173 process_queue_only(testing_mode=testing_mode)
2174 elif sys.argv[1] == "letta":
2175 # Use specific agent ID if provided, otherwise use from config
2176 agent_id = sys.argv[2] if len(sys.argv) > 2 else None
2177 test_letta_integration(agent_id)
2178 elif sys.argv[1] == "downrank":
2179 # View or manage downrank list
2180 if len(sys.argv) > 2 and sys.argv[2] == "list":
2181 downrank_users = load_downrank_users()
2182 if downrank_users:
2183 print(f"📋 Downrank users ({len(downrank_users)} total):")
2184 for user_id in sorted(downrank_users):
2185 print(f" - {user_id}")
2186 else:
2187 print("📋 No downrank users configured")
2188 else:
2189 print("Usage: python x.py downrank list")
2190 print(" list - Show all downranked user IDs")
2191 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list")
2192 else:
2193 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]")
2194 print(" bot - Run the main X bot (use --test for testing mode)")
2195 print(" Example: python x.py bot --test")
2196 print(" queue - Fetch and queue mentions only (no processing)")
2197 print(" process - Process all queued mentions only (no fetching)")
2198 print(" Example: python x.py process --test")
2199 print(" downrank - Manage downrank users (10% response rate)")
2200 print(" Example: python x.py downrank list")
2201 print(" loop - Run the old notification monitoring loop (deprecated)")
2202 print(" reply - Reply to Cameron's specific post")
2203 print(" me - Get authenticated user info and correct user ID")
2204 print(" search - Test search-based mention detection")
2205 print(" thread - Test thread context retrieval from queued mention")
2206 print(" letta - Test sending thread context to Letta agent")
2207 print(" Optional: python x.py letta <agent-id>")
2208 else:
2209 test_x_client()