a digital person for bluesky
1import os
2import logging
3import requests
4import yaml
5import json
6import hashlib
7import random
8import time
9from typing import Optional, Dict, Any, List, Set
10from datetime import datetime
11from pathlib import Path
12from requests_oauthlib import OAuth1
13from rich import print as rprint
14from rich.panel import Panel
15from rich.text import Text
16
17import bsky_utils
18
19class XRateLimitError(Exception):
20 """Exception raised when X API rate limit is exceeded"""
21 pass
22
23
24# Configure logging
25logging.basicConfig(
26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
27)
28logger = logging.getLogger("x_client")
29
30# X-specific file paths
31X_QUEUE_DIR = Path("x_queue")
32X_CACHE_DIR = Path("x_cache")
33X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json")
34X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json")
35X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt")
36
37class XClient:
38 """X (Twitter) API client for fetching mentions and managing interactions."""
39
40 def __init__(self, api_key: str, user_id: str, access_token: str = None,
41 consumer_key: str = None, consumer_secret: str = None,
42 access_token_secret: str = None):
43 self.api_key = api_key
44 self.access_token = access_token
45 self.user_id = user_id
46 self.base_url = "https://api.x.com/2"
47
48 # Check if we have OAuth 1.0a credentials
49 if (consumer_key and consumer_secret and access_token and access_token_secret):
50 # Use OAuth 1.0a for User Context
51 self.oauth = OAuth1(
52 consumer_key,
53 client_secret=consumer_secret,
54 resource_owner_key=access_token,
55 resource_owner_secret=access_token_secret
56 )
57 self.headers = {"Content-Type": "application/json"}
58 self.auth_method = "oauth1a"
59 logger.info("Using OAuth 1.0a User Context authentication for X API")
60 elif access_token:
61 # Use OAuth 2.0 Bearer token for User Context
62 self.oauth = None
63 self.headers = {
64 "Authorization": f"Bearer {access_token}",
65 "Content-Type": "application/json"
66 }
67 self.auth_method = "oauth2_user"
68 logger.info("Using OAuth 2.0 User Context access token for X API")
69 else:
70 # Use Application-Only Bearer token
71 self.oauth = None
72 self.headers = {
73 "Authorization": f"Bearer {api_key}",
74 "Content-Type": "application/json"
75 }
76 self.auth_method = "bearer"
77 logger.info("Using Application-Only Bearer token for X API")
78
79 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]:
80 """Make a request to the X API with proper error handling and exponential backoff."""
81 url = f"{self.base_url}{endpoint}"
82
83 for attempt in range(max_retries):
84 try:
85 if method.upper() == "GET":
86 if self.oauth:
87 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth)
88 else:
89 response = requests.get(url, headers=self.headers, params=params)
90 elif method.upper() == "POST":
91 if self.oauth:
92 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth)
93 else:
94 response = requests.post(url, headers=self.headers, json=data)
95 else:
96 raise ValueError(f"Unsupported HTTP method: {method}")
97
98 response.raise_for_status()
99 return response.json()
100
101 except requests.exceptions.HTTPError as e:
102 if response.status_code == 401:
103 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials")
104 logger.error(f"Response: {response.text}")
105 return None # Don't retry auth failures
106 elif response.status_code == 403:
107 logger.error(f"X API forbidden with {self.auth_method} - check app permissions")
108 logger.error(f"Response: {response.text}")
109 return None # Don't retry permission failures
110 elif response.status_code == 429:
111 if attempt < max_retries - 1:
112 # Exponential backoff: 60s, 120s, 240s
113 backoff_time = 60 * (2 ** attempt)
114 logger.warning(f"X API rate limit exceeded (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry")
115 logger.error(f"Response: {response.text}")
116 time.sleep(backoff_time)
117 continue
118 else:
119 logger.error("X API rate limit exceeded - max retries reached")
120 logger.error(f"Response: {response.text}")
121 raise XRateLimitError("X API rate limit exceeded")
122 else:
123 if attempt < max_retries - 1:
124 # Exponential backoff for other HTTP errors too
125 backoff_time = 30 * (2 ** attempt)
126 logger.warning(f"X API request failed (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s")
127 logger.error(f"Response: {response.text}")
128 time.sleep(backoff_time)
129 continue
130 else:
131 logger.error(f"X API request failed after {max_retries} attempts: {e}")
132 logger.error(f"Response: {response.text}")
133 return None
134
135 except Exception as e:
136 if attempt < max_retries - 1:
137 backoff_time = 15 * (2 ** attempt)
138 logger.warning(f"Unexpected error making X API request (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s")
139 time.sleep(backoff_time)
140 continue
141 else:
142 logger.error(f"Unexpected error making X API request after {max_retries} attempts: {e}")
143 return None
144
145 return None
146
147 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]:
148 """
149 Fetch mentions for the configured user.
150
151 Args:
152 since_id: Minimum Post ID to include (for getting newer mentions)
153 max_results: Number of results to return (5-100)
154
155 Returns:
156 List of mention objects or None if request failed
157 """
158 endpoint = f"/users/{self.user_id}/mentions"
159 params = {
160 "max_results": min(max(max_results, 5), 100), # Ensure within API limits
161 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets",
162 "user.fields": "id,name,username",
163 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
164 }
165
166 if since_id:
167 params["since_id"] = since_id
168
169 logger.info(f"Fetching mentions for user {self.user_id}")
170 response = self._make_request(endpoint, params)
171
172 if response:
173 logger.debug(f"X API response: {response}")
174
175 if response and "data" in response:
176 mentions = response["data"]
177 logger.info(f"Retrieved {len(mentions)} mentions")
178 return mentions
179 else:
180 if response:
181 logger.info(f"No mentions in response. Full response: {response}")
182 else:
183 logger.warning("Request failed - no response received")
184 return []
185
186 def get_user_info(self, user_id: str) -> Optional[Dict]:
187 """Get information about a specific user."""
188 endpoint = f"/users/{user_id}"
189 params = {
190 "user.fields": "id,name,username,description,public_metrics"
191 }
192
193 response = self._make_request(endpoint, params)
194 return response.get("data") if response else None
195
196 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]:
197 """
198 Search for mentions using the search endpoint instead of mentions endpoint.
199 This might have better rate limits than the direct mentions endpoint.
200
201 Args:
202 username: Username to search for mentions of (without @)
203 max_results: Number of results to return (10-100)
204 since_id: Only return results newer than this tweet ID
205
206 Returns:
207 List of tweets mentioning the username
208 """
209 endpoint = "/tweets/search/recent"
210
211 # Search for mentions of the username
212 query = f"@{username}"
213
214 params = {
215 "query": query,
216 "max_results": min(max(max_results, 10), 100),
217 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
218 "user.fields": "id,name,username",
219 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
220 }
221
222 if since_id:
223 params["since_id"] = since_id
224
225 logger.info(f"Searching for mentions of @{username}")
226 response = self._make_request(endpoint, params)
227
228 if response and "data" in response:
229 tweets = response["data"]
230 logger.info(f"Found {len(tweets)} mentions via search")
231 return tweets
232 else:
233 if response:
234 logger.info(f"No mentions found via search. Response: {response}")
235 else:
236 logger.warning("Search request failed")
237 return []
238
239 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]:
240 """
241 Get all tweets in a conversation thread up to a specific tweet ID.
242
243 Args:
244 conversation_id: The conversation ID to fetch (should be the original tweet ID)
245 use_cache: Whether to use cached data if available
246 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID)
247
248 Returns:
249 List of tweets in the conversation, ordered chronologically
250 """
251 # Check cache first if enabled
252 if use_cache:
253 cached_data = get_cached_thread_context(conversation_id)
254 if cached_data:
255 return cached_data
256
257 # First, get the original tweet directly since it might not appear in conversation search
258 original_tweet = None
259 try:
260 endpoint = f"/tweets/{conversation_id}"
261 params = {
262 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
263 "user.fields": "id,name,username",
264 "expansions": "author_id"
265 }
266 response = self._make_request(endpoint, params)
267 if response and "data" in response:
268 original_tweet = response["data"]
269 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}")
270 except Exception as e:
271 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}")
272
273 # Then search for all tweets in this conversation
274 endpoint = "/tweets/search/recent"
275 params = {
276 "query": f"conversation_id:{conversation_id}",
277 "max_results": 100, # Get as many as possible
278 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
279 "user.fields": "id,name,username",
280 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id",
281 "sort_order": "recency" # Get newest first, we'll reverse later
282 }
283
284 # Add until_id parameter to exclude tweets after the mention being processed
285 if until_id:
286 params["until_id"] = until_id
287 logger.info(f"Using until_id={until_id} to exclude future tweets")
288
289 logger.info(f"Fetching thread context for conversation {conversation_id}")
290 response = self._make_request(endpoint, params)
291
292 tweets = []
293 users_data = {}
294
295 # Collect tweets from search
296 if response and "data" in response:
297 tweets.extend(response["data"])
298 # Store user data for reference
299 if "includes" in response and "users" in response["includes"]:
300 for user in response["includes"]["users"]:
301 users_data[user["id"]] = user
302
303 # Add original tweet if we got it and it's not already in the list
304 if original_tweet:
305 tweet_ids = [t.get('id') for t in tweets]
306 if original_tweet.get('id') not in tweet_ids:
307 tweets.append(original_tweet)
308 logger.info("Added original tweet to thread context")
309
310 # Attempt to fill gaps by fetching referenced tweets that are missing
311 # This helps with X API's incomplete conversation search results
312 tweet_ids = set(t.get('id') for t in tweets)
313 missing_tweet_ids = set()
314 critical_missing_ids = set()
315
316 # Collect referenced tweet IDs, prioritizing critical ones
317 for tweet in tweets:
318 referenced_tweets = tweet.get('referenced_tweets', [])
319 for ref in referenced_tweets:
320 ref_id = ref.get('id')
321 ref_type = ref.get('type')
322 if ref_id and ref_id not in tweet_ids:
323 missing_tweet_ids.add(ref_id)
324 # Prioritize direct replies and quoted tweets over retweets
325 if ref_type in ['replied_to', 'quoted']:
326 critical_missing_ids.add(ref_id)
327
328 # For rate limit efficiency, only fetch critical missing tweets if we have many
329 if len(missing_tweet_ids) > 10:
330 logger.info(f"Many missing tweets ({len(missing_tweet_ids)}), prioritizing {len(critical_missing_ids)} critical ones")
331 missing_tweet_ids = critical_missing_ids
332
333 # Context sufficiency check - skip backfill if we already have enough context
334 if has_sufficient_context(tweets, missing_tweet_ids):
335 logger.info("Thread has sufficient context, skipping missing tweet backfill")
336 missing_tweet_ids = set()
337
338 # Fetch missing referenced tweets in batches (more rate-limit friendly)
339 if missing_tweet_ids:
340 missing_list = list(missing_tweet_ids)
341
342 # First, check cache for missing tweets
343 cached_tweets = get_cached_tweets(missing_list)
344 for tweet_id, cached_tweet in cached_tweets.items():
345 if cached_tweet.get('conversation_id') == conversation_id:
346 tweets.append(cached_tweet)
347 tweet_ids.add(tweet_id)
348 logger.info(f"Retrieved missing tweet from cache: {tweet_id}")
349
350 # Add user data if available in cache
351 if cached_tweet.get('author_info'):
352 author_id = cached_tweet.get('author_id')
353 if author_id:
354 users_data[author_id] = cached_tweet['author_info']
355
356 # Only fetch tweets that weren't found in cache
357 uncached_ids = [tid for tid in missing_list if tid not in cached_tweets]
358
359 if uncached_ids:
360 batch_size = 100 # X API limit for bulk tweet lookup
361
362 for i in range(0, len(uncached_ids), batch_size):
363 batch_ids = uncached_ids[i:i + batch_size]
364 try:
365 endpoint = "/tweets"
366 params = {
367 "ids": ",".join(batch_ids),
368 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
369 "user.fields": "id,name,username",
370 "expansions": "author_id"
371 }
372 response = self._make_request(endpoint, params)
373
374 if response and "data" in response:
375 fetched_tweets = []
376 batch_users_data = {}
377
378 for missing_tweet in response["data"]:
379 # Only add if it's actually part of this conversation
380 if missing_tweet.get('conversation_id') == conversation_id:
381 tweets.append(missing_tweet)
382 tweet_ids.add(missing_tweet.get('id'))
383 fetched_tweets.append(missing_tweet)
384 logger.info(f"Retrieved missing referenced tweet: {missing_tweet.get('id')}")
385
386 # Add user data if available
387 if "includes" in response and "users" in response["includes"]:
388 for user in response["includes"]["users"]:
389 users_data[user["id"]] = user
390 batch_users_data[user["id"]] = user
391
392 # Cache the newly fetched tweets
393 if fetched_tweets:
394 save_cached_tweets(fetched_tweets, batch_users_data)
395
396 logger.info(f"Batch fetched {len(response['data'])} missing tweets from {len(batch_ids)} requested")
397
398 # Handle partial success - log any missing tweets that weren't found
399 if response and "errors" in response:
400 for error in response["errors"]:
401 logger.warning(f"Could not fetch tweet {error.get('resource_id')}: {error.get('title')}")
402
403 except Exception as e:
404 logger.warning(f"Could not fetch batch of missing tweets {batch_ids[:3]}...: {e}")
405 else:
406 logger.info(f"All {len(missing_list)} missing tweets found in cache")
407
408 if tweets:
409 # Filter out tweets that occur after until_id (if specified)
410 if until_id:
411 original_count = len(tweets)
412 # Convert until_id to int for comparison (Twitter IDs are sequential)
413 until_id_int = int(until_id)
414 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int]
415 filtered_count = len(tweets)
416 if original_count != filtered_count:
417 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}")
418
419 # Sort chronologically (oldest first)
420 tweets.sort(key=lambda x: x.get('created_at', ''))
421 logger.info(f"Retrieved {len(tweets)} tweets in thread")
422
423 thread_data = {"tweets": tweets, "users": users_data}
424
425 # Cache individual tweets from the thread for future backfill
426 save_cached_tweets(tweets, users_data)
427
428 # Cache the result
429 if use_cache:
430 save_cached_thread_context(conversation_id, thread_data)
431
432 return thread_data
433 else:
434 logger.warning("No tweets found for thread context")
435 return None
436
437 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]:
438 """
439 Post a reply to a specific tweet.
440
441 Args:
442 reply_text: The text content of the reply
443 in_reply_to_tweet_id: The ID of the tweet to reply to
444
445 Returns:
446 Response data if successful, None if failed
447 """
448 endpoint = "/tweets"
449
450 payload = {
451 "text": reply_text,
452 "reply": {
453 "in_reply_to_tweet_id": in_reply_to_tweet_id
454 }
455 }
456
457 logger.info(f"Attempting to post reply with {self.auth_method} authentication")
458 result = self._make_request(endpoint, method="POST", data=payload)
459
460 if result:
461 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}")
462 return result
463 else:
464 logger.error("Failed to post reply")
465 return None
466
467def load_x_config(config_path: str = "config.yaml") -> Dict[str, str]:
468 """Load X configuration from config file."""
469 try:
470 with open(config_path, 'r') as f:
471 config = yaml.safe_load(f)
472
473 x_config = config.get('x', {})
474 if not x_config.get('api_key') or not x_config.get('user_id'):
475 raise ValueError("X API key and user_id must be configured in config.yaml")
476
477 return x_config
478 except Exception as e:
479 logger.error(f"Failed to load X configuration: {e}")
480 raise
481
482def create_x_client(config_path: str = "config.yaml") -> XClient:
483 """Create and return an X client with configuration loaded from file."""
484 config = load_x_config(config_path)
485 return XClient(
486 api_key=config['api_key'],
487 user_id=config['user_id'],
488 access_token=config.get('access_token'),
489 consumer_key=config.get('consumer_key'),
490 consumer_secret=config.get('consumer_secret'),
491 access_token_secret=config.get('access_token_secret')
492 )
493
494def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str:
495 """
496 Convert a mention object to a YAML string for better AI comprehension.
497 Similar to thread_to_yaml_string in bsky_utils.py
498 """
499 # Extract relevant fields
500 simplified_mention = {
501 'id': mention.get('id'),
502 'text': mention.get('text'),
503 'author_id': mention.get('author_id'),
504 'created_at': mention.get('created_at'),
505 'in_reply_to_user_id': mention.get('in_reply_to_user_id')
506 }
507
508 # Add user information if available
509 if users_data and mention.get('author_id') in users_data:
510 user = users_data[mention.get('author_id')]
511 simplified_mention['author'] = {
512 'username': user.get('username'),
513 'name': user.get('name')
514 }
515
516 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False)
517
518def thread_to_yaml_string(thread_data: Dict) -> str:
519 """
520 Convert X thread context to YAML string for AI comprehension.
521 Similar to Bluesky's thread_to_yaml_string function.
522
523 Args:
524 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context()
525
526 Returns:
527 YAML string representation of the thread
528 """
529 if not thread_data or "tweets" not in thread_data:
530 return "conversation: []\n"
531
532 tweets = thread_data["tweets"]
533 users_data = thread_data.get("users", {})
534
535 simplified_thread = {
536 "conversation": []
537 }
538
539 for tweet in tweets:
540 # Get user info
541 author_id = tweet.get('author_id')
542 author_info = {}
543 if author_id and author_id in users_data:
544 user = users_data[author_id]
545 author_info = {
546 'username': user.get('username'),
547 'name': user.get('name')
548 }
549
550 # Build tweet object (simplified for AI consumption)
551 tweet_obj = {
552 'text': tweet.get('text'),
553 'created_at': tweet.get('created_at'),
554 'author': author_info,
555 'author_id': author_id # Include user ID for block management
556 }
557
558 simplified_thread["conversation"].append(tweet_obj)
559
560 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False)
561
562
563def ensure_x_user_blocks_attached(thread_data: Dict, agent_id: Optional[str] = None) -> None:
564 """
565 Ensure all users in the thread have their X user blocks attached.
566 Creates blocks with initial content including their handle if they don't exist.
567
568 Args:
569 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context()
570 agent_id: The Letta agent ID to attach blocks to (defaults to config agent_id)
571 """
572 if not thread_data or "users" not in thread_data:
573 return
574
575 try:
576 from tools.blocks import attach_x_user_blocks, x_user_note_set
577 from config_loader import get_letta_config
578 from letta_client import Letta
579
580 # Get Letta client and agent_id from config
581 config = get_letta_config()
582 client = Letta(token=config['api_key'], timeout=config['timeout'])
583
584 # Use provided agent_id or get from config
585 if agent_id is None:
586 agent_id = config['agent_id']
587
588 # Get agent info to create a mock agent_state for the functions
589 class MockAgentState:
590 def __init__(self, agent_id):
591 self.id = agent_id
592
593 agent_state = MockAgentState(agent_id)
594
595 users_data = thread_data["users"]
596 user_ids = list(users_data.keys())
597
598 if not user_ids:
599 return
600
601 logger.info(f"Ensuring X user blocks for {len(user_ids)} users: {user_ids}")
602
603 # Get current blocks to check which users already have blocks with content
604 current_blocks = client.agents.blocks.list(agent_id=agent_id)
605 existing_user_blocks = {}
606
607 for block in current_blocks:
608 if block.label.startswith("x_user_"):
609 user_id = block.label.replace("x_user_", "")
610 existing_user_blocks[user_id] = block
611
612 # Attach all user blocks (this will create missing ones with basic content)
613 attach_result = attach_x_user_blocks(user_ids, agent_state)
614 logger.info(f"X user block attachment result: {attach_result}")
615
616 # For newly created blocks, update with user handle information
617 for user_id in user_ids:
618 if user_id not in existing_user_blocks:
619 user_info = users_data[user_id]
620 username = user_info.get('username', 'unknown')
621 name = user_info.get('name', 'Unknown')
622
623 # Set initial content with handle information
624 initial_content = f"# X User: {user_id}\n\n**Handle:** @{username}\n**Name:** {name}\n\nNo additional information about this user yet."
625
626 try:
627 x_user_note_set(user_id, initial_content, agent_state)
628 logger.info(f"Set initial content for X user {user_id} (@{username})")
629 except Exception as e:
630 logger.error(f"Failed to set initial content for X user {user_id}: {e}")
631
632 except Exception as e:
633 logger.error(f"Error ensuring X user blocks: {e}")
634
635
636# X Caching and Queue System Functions
637
638def load_last_seen_id() -> Optional[str]:
639 """Load the last seen mention ID for incremental fetching."""
640 if X_LAST_SEEN_FILE.exists():
641 try:
642 with open(X_LAST_SEEN_FILE, 'r') as f:
643 data = json.load(f)
644 return data.get('last_seen_id')
645 except Exception as e:
646 logger.error(f"Error loading last seen ID: {e}")
647 return None
648
649def save_last_seen_id(mention_id: str):
650 """Save the last seen mention ID."""
651 try:
652 X_QUEUE_DIR.mkdir(exist_ok=True)
653 with open(X_LAST_SEEN_FILE, 'w') as f:
654 json.dump({
655 'last_seen_id': mention_id,
656 'updated_at': datetime.now().isoformat()
657 }, f)
658 logger.debug(f"Saved last seen ID: {mention_id}")
659 except Exception as e:
660 logger.error(f"Error saving last seen ID: {e}")
661
662def load_processed_mentions() -> set:
663 """Load the set of processed mention IDs."""
664 if X_PROCESSED_MENTIONS_FILE.exists():
665 try:
666 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f:
667 data = json.load(f)
668 # Keep only recent entries (last 10000)
669 if len(data) > 10000:
670 data = data[-10000:]
671 save_processed_mentions(set(data))
672 return set(data)
673 except Exception as e:
674 logger.error(f"Error loading processed mentions: {e}")
675 return set()
676
677def save_processed_mentions(processed_set: set):
678 """Save the set of processed mention IDs."""
679 try:
680 X_QUEUE_DIR.mkdir(exist_ok=True)
681 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f:
682 json.dump(list(processed_set), f)
683 except Exception as e:
684 logger.error(f"Error saving processed mentions: {e}")
685
686def load_downrank_users() -> Set[str]:
687 """Load the set of user IDs that should be downranked (responded to 10% of the time)."""
688 try:
689 if not X_DOWNRANK_USERS_FILE.exists():
690 return set()
691
692 downrank_users = set()
693 with open(X_DOWNRANK_USERS_FILE, 'r') as f:
694 for line in f:
695 line = line.strip()
696 # Skip empty lines and comments
697 if line and not line.startswith('#'):
698 downrank_users.add(line)
699
700 logger.info(f"Loaded {len(downrank_users)} downrank users")
701 return downrank_users
702 except Exception as e:
703 logger.error(f"Error loading downrank users: {e}")
704 return set()
705
706def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool:
707 """
708 Check if we should respond to a downranked user.
709 Returns True 10% of the time for downranked users, True 100% of the time for others.
710 """
711 if user_id not in downrank_users:
712 return True
713
714 # 10% chance for downranked users
715 should_respond = random.random() < 0.1
716 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)")
717 return should_respond
718
719def save_mention_to_queue(mention: Dict):
720 """Save a mention to the queue directory for async processing."""
721 try:
722 mention_id = mention.get('id')
723 if not mention_id:
724 logger.error("Mention missing ID, cannot queue")
725 return
726
727 # Check if already processed
728 processed_mentions = load_processed_mentions()
729 if mention_id in processed_mentions:
730 logger.debug(f"Mention {mention_id} already processed, skipping")
731 return
732
733 # Create queue directory
734 X_QUEUE_DIR.mkdir(exist_ok=True)
735
736 # Create filename using hash (similar to Bluesky system)
737 mention_str = json.dumps(mention, sort_keys=True)
738 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16]
739 filename = f"x_mention_{mention_hash}.json"
740
741 queue_file = X_QUEUE_DIR / filename
742
743 # Save mention data with enhanced debugging information
744 mention_data = {
745 'mention': mention,
746 'queued_at': datetime.now().isoformat(),
747 'type': 'x_mention',
748 # Debug info for conversation tracking
749 'debug_info': {
750 'mention_id': mention.get('id'),
751 'author_id': mention.get('author_id'),
752 'conversation_id': mention.get('conversation_id'),
753 'in_reply_to_user_id': mention.get('in_reply_to_user_id'),
754 'referenced_tweets': mention.get('referenced_tweets', []),
755 'text_preview': mention.get('text', '')[:200],
756 'created_at': mention.get('created_at'),
757 'public_metrics': mention.get('public_metrics', {}),
758 'context_annotations': mention.get('context_annotations', [])
759 }
760 }
761
762 with open(queue_file, 'w') as f:
763 json.dump(mention_data, f, indent=2)
764
765 logger.info(f"Queued X mention {mention_id} -> {filename}")
766
767 except Exception as e:
768 logger.error(f"Error saving mention to queue: {e}")
769
770# X Cache Functions
771def get_cached_thread_context(conversation_id: str) -> Optional[Dict]:
772 """Load cached thread context if available."""
773 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
774 if cache_file.exists():
775 try:
776 with open(cache_file, 'r') as f:
777 cached_data = json.load(f)
778 # Check if cache is recent (within 1 hour)
779 from datetime import datetime, timedelta
780 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
781 if datetime.now() - cached_time < timedelta(hours=1):
782 logger.info(f"Using cached thread context for {conversation_id}")
783 return cached_data.get('thread_data')
784 except Exception as e:
785 logger.warning(f"Error loading cached thread context: {e}")
786 return None
787
788def save_cached_thread_context(conversation_id: str, thread_data: Dict):
789 """Save thread context to cache."""
790 try:
791 X_CACHE_DIR.mkdir(exist_ok=True)
792 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
793
794 cache_data = {
795 'conversation_id': conversation_id,
796 'thread_data': thread_data,
797 'cached_at': datetime.now().isoformat()
798 }
799
800 with open(cache_file, 'w') as f:
801 json.dump(cache_data, f, indent=2)
802
803 logger.debug(f"Cached thread context for {conversation_id}")
804 except Exception as e:
805 logger.error(f"Error caching thread context: {e}")
806
807def get_cached_tweets(tweet_ids: List[str]) -> Dict[str, Dict]:
808 """
809 Load cached individual tweets if available.
810 Returns dict mapping tweet_id -> tweet_data for found tweets.
811 """
812 cached_tweets = {}
813
814 for tweet_id in tweet_ids:
815 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json"
816 if cache_file.exists():
817 try:
818 with open(cache_file, 'r') as f:
819 cached_data = json.load(f)
820
821 # Use longer cache times for older tweets (24 hours vs 1 hour)
822 from datetime import datetime, timedelta
823 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
824 tweet_created = cached_data.get('tweet_data', {}).get('created_at', '')
825
826 # Parse tweet creation time to determine age
827 try:
828 from dateutil.parser import parse
829 tweet_age = datetime.now() - parse(tweet_created)
830 cache_duration = timedelta(hours=24) if tweet_age > timedelta(hours=24) else timedelta(hours=1)
831 except:
832 cache_duration = timedelta(hours=1) # Default to 1 hour if parsing fails
833
834 if datetime.now() - cached_time < cache_duration:
835 cached_tweets[tweet_id] = cached_data.get('tweet_data')
836 logger.debug(f"Using cached tweet {tweet_id}")
837
838 except Exception as e:
839 logger.warning(f"Error loading cached tweet {tweet_id}: {e}")
840
841 return cached_tweets
842
843def save_cached_tweets(tweets_data: List[Dict], users_data: Dict[str, Dict] = None):
844 """Save individual tweets to cache for future reuse."""
845 try:
846 X_CACHE_DIR.mkdir(exist_ok=True)
847
848 for tweet in tweets_data:
849 tweet_id = tweet.get('id')
850 if not tweet_id:
851 continue
852
853 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json"
854
855 # Include user data if available
856 tweet_with_user = tweet.copy()
857 if users_data and tweet.get('author_id') in users_data:
858 tweet_with_user['author_info'] = users_data[tweet.get('author_id')]
859
860 cache_data = {
861 'tweet_id': tweet_id,
862 'tweet_data': tweet_with_user,
863 'cached_at': datetime.now().isoformat()
864 }
865
866 with open(cache_file, 'w') as f:
867 json.dump(cache_data, f, indent=2)
868
869 logger.debug(f"Cached individual tweet {tweet_id}")
870
871 except Exception as e:
872 logger.error(f"Error caching individual tweets: {e}")
873
874def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool:
875 """
876 Determine if we have sufficient context to skip backfilling missing tweets.
877
878 Args:
879 tweets: List of tweets already in the thread
880 missing_tweet_ids: Set of missing tweet IDs we'd like to fetch
881
882 Returns:
883 True if context is sufficient, False if backfill is needed
884 """
885 # If no missing tweets, context is sufficient
886 if not missing_tweet_ids:
887 return True
888
889 # If we have a substantial conversation (5+ tweets), likely sufficient
890 if len(tweets) >= 5:
891 logger.debug(f"Thread has {len(tweets)} tweets, considering sufficient")
892 return True
893
894 # If only a few missing tweets and we have some context, might be enough
895 if len(missing_tweet_ids) <= 2 and len(tweets) >= 3:
896 logger.debug(f"Only {len(missing_tweet_ids)} missing tweets with {len(tweets)} existing, considering sufficient")
897 return True
898
899 # Check if we have conversational flow (mentions between users)
900 has_conversation_flow = False
901 for tweet in tweets:
902 text = tweet.get('text', '').lower()
903 # Look for mentions, replies, or conversational indicators
904 if '@' in text or 'reply' in text or len([t for t in tweets if t.get('author_id') != tweet.get('author_id')]) > 1:
905 has_conversation_flow = True
906 break
907
908 # If we have clear conversational flow and reasonable length, sufficient
909 if has_conversation_flow and len(tweets) >= 2:
910 logger.debug("Thread has conversational flow, considering sufficient")
911 return True
912
913 # Otherwise, we need to backfill
914 logger.debug(f"Context insufficient: {len(tweets)} tweets, {len(missing_tweet_ids)} missing, no clear flow")
915 return False
916
917def fetch_and_queue_mentions(username: str) -> int:
918 """
919 Single-pass function to fetch new mentions and queue them.
920 Returns number of new mentions found.
921 """
922 try:
923 client = create_x_client()
924
925 # Load last seen ID for incremental fetching
926 last_seen_id = load_last_seen_id()
927
928 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}")
929
930 # Search for mentions
931 mentions = client.search_mentions(
932 username=username,
933 since_id=last_seen_id,
934 max_results=100 # Get as many as possible
935 )
936
937 if not mentions:
938 logger.info("No new mentions found")
939 return 0
940
941 # Process mentions (newest first, so reverse to process oldest first)
942 mentions.reverse()
943 new_count = 0
944
945 for mention in mentions:
946 save_mention_to_queue(mention)
947 new_count += 1
948
949 # Update last seen ID to the most recent mention
950 if mentions:
951 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent
952 save_last_seen_id(most_recent_id)
953
954 logger.info(f"Queued {new_count} new X mentions")
955 return new_count
956
957 except Exception as e:
958 logger.error(f"Error fetching and queuing mentions: {e}")
959 return 0
960
961# Simple test function
962def get_my_user_info():
963 """Get the authenticated user's information to find correct user ID."""
964 try:
965 client = create_x_client()
966
967 # Use the /2/users/me endpoint to get authenticated user info
968 endpoint = "/users/me"
969 params = {
970 "user.fields": "id,name,username,description"
971 }
972
973 print("Fetching authenticated user information...")
974 response = client._make_request(endpoint, params=params)
975
976 if response and "data" in response:
977 user_data = response["data"]
978 print(f"✅ Found authenticated user:")
979 print(f" ID: {user_data.get('id')}")
980 print(f" Username: @{user_data.get('username')}")
981 print(f" Name: {user_data.get('name')}")
982 print(f" Description: {user_data.get('description', 'N/A')[:100]}...")
983 print(f"\n🔧 Update your config.yaml with:")
984 print(f" user_id: \"{user_data.get('id')}\"")
985 return user_data
986 else:
987 print("❌ Failed to get user information")
988 print(f"Response: {response}")
989 return None
990
991 except Exception as e:
992 print(f"Error getting user info: {e}")
993 return None
994
995def test_search_mentions():
996 """Test the search-based mention detection."""
997 try:
998 client = create_x_client()
999
1000 # First get our username
1001 user_info = client._make_request("/users/me", params={"user.fields": "username"})
1002 if not user_info or "data" not in user_info:
1003 print("❌ Could not get username")
1004 return
1005
1006 username = user_info["data"]["username"]
1007 print(f"🔍 Searching for mentions of @{username}")
1008
1009 mentions = client.search_mentions(username, max_results=5)
1010
1011 if mentions:
1012 print(f"✅ Found {len(mentions)} mentions via search:")
1013 for mention in mentions:
1014 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...")
1015 else:
1016 print("No mentions found via search")
1017
1018 except Exception as e:
1019 print(f"Search test failed: {e}")
1020
1021def test_fetch_and_queue():
1022 """Test the single-pass fetch and queue function."""
1023 try:
1024 client = create_x_client()
1025
1026 # Get our username
1027 user_info = client._make_request("/users/me", params={"user.fields": "username"})
1028 if not user_info or "data" not in user_info:
1029 print("❌ Could not get username")
1030 return
1031
1032 username = user_info["data"]["username"]
1033 print(f"🔄 Fetching and queueing mentions for @{username}")
1034
1035 # Show current state
1036 last_seen = load_last_seen_id()
1037 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}")
1038
1039 # Fetch and queue
1040 new_count = fetch_and_queue_mentions(username)
1041
1042 if new_count > 0:
1043 print(f"✅ Queued {new_count} new mentions")
1044 print(f"📁 Check ./x_queue/ directory for queued mentions")
1045
1046 # Show updated state
1047 new_last_seen = load_last_seen_id()
1048 print(f"📍 Updated last seen ID: {new_last_seen}")
1049 else:
1050 print("ℹ️ No new mentions to queue")
1051
1052 except Exception as e:
1053 print(f"Fetch and queue test failed: {e}")
1054
1055def test_thread_context():
1056 """Test thread context retrieval from a queued mention."""
1057 try:
1058 import json
1059
1060 # Find a queued mention file
1061 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1062 if not queue_files:
1063 print("❌ No queued mentions found. Run 'python x.py queue' first.")
1064 return
1065
1066 # Read the first mention
1067 mention_file = queue_files[0]
1068 with open(mention_file, 'r') as f:
1069 mention_data = json.load(f)
1070
1071 mention = mention_data['mention']
1072 print(f"📄 Using mention: {mention.get('id')}")
1073 print(f"📝 Text: {mention.get('text')}")
1074
1075 # Check if it has a conversation_id
1076 conversation_id = mention.get('conversation_id')
1077 if not conversation_id:
1078 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.")
1079 return
1080
1081 print(f"🧵 Getting thread context for conversation: {conversation_id}")
1082
1083 # Get thread context
1084 client = create_x_client()
1085 thread_data = client.get_thread_context(conversation_id)
1086
1087 if thread_data:
1088 tweets = thread_data.get('tweets', [])
1089 print(f"✅ Retrieved thread with {len(tweets)} tweets")
1090
1091 # Convert to YAML
1092 yaml_thread = thread_to_yaml_string(thread_data)
1093
1094 # Save thread context for inspection
1095 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml"
1096 with open(thread_file, 'w') as f:
1097 f.write(yaml_thread)
1098
1099 print(f"💾 Saved thread context to: {thread_file}")
1100 print("\n📋 Thread preview:")
1101 print(yaml_thread)
1102 else:
1103 print("❌ Failed to retrieve thread context")
1104
1105 except Exception as e:
1106 print(f"Thread context test failed: {e}")
1107
1108def test_letta_integration(agent_id: str = None):
1109 """Test sending X thread context to Letta agent."""
1110 try:
1111 from letta_client import Letta
1112 import json
1113 import yaml
1114
1115 # Load full config to access letta section
1116 try:
1117 with open("config.yaml", 'r') as f:
1118 full_config = yaml.safe_load(f)
1119
1120 letta_config = full_config.get('letta', {})
1121 api_key = letta_config.get('api_key')
1122 config_agent_id = letta_config.get('agent_id')
1123
1124 # Use agent_id from config if not provided as parameter
1125 if not agent_id:
1126 if config_agent_id:
1127 agent_id = config_agent_id
1128 print(f"ℹ️ Using agent_id from config: {agent_id}")
1129 else:
1130 print("❌ No agent_id found in config.yaml")
1131 print("Expected config structure:")
1132 print(" letta:")
1133 print(" agent_id: your-agent-id")
1134 return
1135 else:
1136 print(f"ℹ️ Using provided agent_id: {agent_id}")
1137
1138 if not api_key:
1139 # Try loading from environment as fallback
1140 import os
1141 api_key = os.getenv('LETTA_API_KEY')
1142 if not api_key:
1143 print("❌ LETTA_API_KEY not found in config.yaml or environment")
1144 print("Expected config structure:")
1145 print(" letta:")
1146 print(" api_key: your-letta-api-key")
1147 return
1148 else:
1149 print("ℹ️ Using LETTA_API_KEY from environment")
1150 else:
1151 print("ℹ️ Using LETTA_API_KEY from config.yaml")
1152
1153 except Exception as e:
1154 print(f"❌ Error loading config: {e}")
1155 return
1156
1157 letta_client = Letta(token=api_key, timeout=600)
1158 print(f"🤖 Connected to Letta, using agent: {agent_id}")
1159
1160 # Find a queued mention file
1161 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1162 if not queue_files:
1163 print("❌ No queued mentions found. Run 'python x.py queue' first.")
1164 return
1165
1166 # Read the first mention
1167 mention_file = queue_files[0]
1168 with open(mention_file, 'r') as f:
1169 mention_data = json.load(f)
1170
1171 mention = mention_data['mention']
1172 conversation_id = mention.get('conversation_id')
1173
1174 if not conversation_id:
1175 print("❌ No conversation_id found in mention.")
1176 return
1177
1178 print(f"🧵 Getting thread context for conversation: {conversation_id}")
1179
1180 # Get thread context
1181 x_client = create_x_client()
1182 thread_data = x_client.get_thread_context(conversation_id)
1183
1184 if not thread_data:
1185 print("❌ Failed to retrieve thread context")
1186 return
1187
1188 # Convert to YAML
1189 yaml_thread = thread_to_yaml_string(thread_data)
1190
1191 # Create prompt for the agent
1192 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately.
1193
1194Here is the thread context:
1195
1196{yaml_thread}
1197
1198Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona."""
1199
1200 print(f"📤 Sending thread context to Letta agent...")
1201
1202 # Print the prompt in a rich panel
1203 rprint(Panel(prompt, title="Prompt", border_style="blue"))
1204
1205 # Send to Letta agent using streaming
1206 message_stream = letta_client.agents.messages.create_stream(
1207 agent_id=agent_id,
1208 messages=[{"role": "user", "content": prompt}],
1209 stream_tokens=False,
1210 max_steps=10
1211 )
1212
1213 print("🔄 Streaming response from agent...")
1214 response_text = ""
1215
1216 for chunk in message_stream:
1217 print(chunk)
1218 if hasattr(chunk, 'message_type'):
1219 if chunk.message_type == 'assistant_message':
1220 print(f"🤖 Agent response: {chunk.content}")
1221 response_text = chunk.content
1222 elif chunk.message_type == 'reasoning_message':
1223 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...")
1224 elif chunk.message_type == 'tool_call_message':
1225 print(f"🔧 Agent tool call: {chunk.tool_call.name}")
1226
1227 if response_text:
1228 print(f"\n✅ Agent generated response:")
1229 print(f"📝 Response: {response_text}")
1230 else:
1231 print("❌ No response generated by agent")
1232
1233 except Exception as e:
1234 print(f"Letta integration test failed: {e}")
1235 import traceback
1236 traceback.print_exc()
1237
1238def test_x_client():
1239 """Test the X client by fetching mentions."""
1240 try:
1241 client = create_x_client()
1242 mentions = client.get_mentions(max_results=5)
1243
1244 if mentions:
1245 print(f"Successfully retrieved {len(mentions)} mentions:")
1246 for mention in mentions:
1247 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...")
1248 else:
1249 print("No mentions retrieved")
1250
1251 except Exception as e:
1252 print(f"Test failed: {e}")
1253
1254def reply_to_cameron_post():
1255 """
1256 Reply to Cameron's specific X post.
1257
1258 NOTE: This requires OAuth User Context authentication, not Bearer token.
1259 Current Bearer token is Application-Only which can't post.
1260 """
1261 try:
1262 client = create_x_client()
1263
1264 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618
1265 cameron_post_id = "1950690566909710618"
1266
1267 # Simple reply message
1268 reply_text = "Hello from void! 🤖 Testing X integration."
1269
1270 print(f"Attempting to reply to post {cameron_post_id}")
1271 print(f"Reply text: {reply_text}")
1272 print("\nNOTE: This will fail with current Bearer token (Application-Only)")
1273 print("Posting requires OAuth User Context authentication")
1274
1275 result = client.post_reply(reply_text, cameron_post_id)
1276
1277 if result:
1278 print(f"✅ Successfully posted reply!")
1279 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}")
1280 else:
1281 print("❌ Failed to post reply (expected with current auth)")
1282
1283 except Exception as e:
1284 print(f"Reply failed: {e}")
1285
1286def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False):
1287 """
1288 Process an X mention and generate a reply using the Letta agent.
1289 Similar to bsky.py process_mention but for X/Twitter.
1290
1291 Args:
1292 void_agent: The Letta agent instance
1293 x_client: The X API client
1294 mention_data: The mention data dictionary
1295 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
1296 testing_mode: If True, don't actually post to X
1297
1298 Returns:
1299 True: Successfully processed, remove from queue
1300 False: Failed but retryable, keep in queue
1301 None: Failed with non-retryable error, move to errors directory
1302 "no_reply": No reply was generated, move to no_reply directory
1303 """
1304 try:
1305 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}")
1306
1307 # Extract mention details
1308 if isinstance(mention_data, dict):
1309 # Handle both raw mention and queued mention formats
1310 if 'mention' in mention_data:
1311 mention = mention_data['mention']
1312 else:
1313 mention = mention_data
1314 else:
1315 mention = mention_data
1316
1317 mention_id = mention.get('id')
1318 mention_text = mention.get('text', '')
1319 author_id = mention.get('author_id')
1320 conversation_id = mention.get('conversation_id')
1321 in_reply_to_user_id = mention.get('in_reply_to_user_id')
1322 referenced_tweets = mention.get('referenced_tweets', [])
1323
1324 # Check downrank list - only respond to downranked users 10% of the time
1325 downrank_users = load_downrank_users()
1326 if not should_respond_to_downranked_user(str(author_id), downrank_users):
1327 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection")
1328 return "no_reply"
1329
1330 # Enhanced conversation tracking for debug - especially important for Grok handling
1331 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}")
1332 logger.info(f" Author ID: {author_id}")
1333 logger.info(f" Conversation ID: {conversation_id}")
1334 logger.info(f" In Reply To User ID: {in_reply_to_user_id}")
1335 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items")
1336 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets
1337 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}")
1338 logger.info(f" Text preview: {mention_text[:100]}...")
1339
1340 if not conversation_id:
1341 logger.warning(f"❌ No conversation_id found for mention {mention_id} - this may cause thread context issues")
1342 return None
1343
1344 # Get thread context with caching enabled for efficiency
1345 # Use mention_id as until_id to exclude tweets that occurred after this mention
1346 try:
1347 thread_data = x_client.get_thread_context(conversation_id, use_cache=True, until_id=mention_id)
1348 if not thread_data:
1349 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}")
1350 return False
1351
1352 # If this mention references a specific tweet, ensure we have that tweet in context
1353 if referenced_tweets:
1354 for ref in referenced_tweets:
1355 if ref.get('type') == 'replied_to':
1356 ref_id = ref.get('id')
1357 # Check if the referenced tweet is in our thread data
1358 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])]
1359 if ref_id and ref_id not in thread_tweet_ids:
1360 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch")
1361 try:
1362 # Fetch the missing referenced tweet directly
1363 endpoint = f"/tweets/{ref_id}"
1364 params = {
1365 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
1366 "user.fields": "id,name,username",
1367 "expansions": "author_id"
1368 }
1369 response = x_client._make_request(endpoint, params)
1370 if response and "data" in response:
1371 missing_tweet = response["data"]
1372 if missing_tweet.get('conversation_id') == conversation_id:
1373 # Add to thread data
1374 if 'tweets' not in thread_data:
1375 thread_data['tweets'] = []
1376 thread_data['tweets'].append(missing_tweet)
1377
1378 # Add user data if available
1379 if "includes" in response and "users" in response["includes"]:
1380 if 'users' not in thread_data:
1381 thread_data['users'] = {}
1382 for user in response["includes"]["users"]:
1383 thread_data['users'][user["id"]] = user
1384
1385 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context")
1386 else:
1387 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}")
1388 except Exception as e:
1389 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}")
1390
1391 # Enhanced thread context debugging
1392 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}")
1393 thread_posts = thread_data.get('tweets', [])
1394 thread_users = thread_data.get('users', {})
1395 logger.info(f" Posts in thread: {len(thread_posts)}")
1396 logger.info(f" Users in thread: {len(thread_users)}")
1397
1398 # Log thread participants for Grok detection
1399 for user_id, user_info in thread_users.items():
1400 username = user_info.get('username', 'unknown')
1401 name = user_info.get('name', 'Unknown')
1402 is_verified = user_info.get('verified', False)
1403 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}")
1404
1405 # Special logging for Grok or AI-related users
1406 if 'grok' in username.lower() or 'grok' in name.lower():
1407 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})")
1408
1409 # Log conversation structure
1410 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts
1411 post_id = post.get('id')
1412 post_author = post.get('author_id')
1413 post_text = post.get('text', '')[:50]
1414 is_reply = 'in_reply_to_user_id' in post
1415 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...")
1416
1417 except Exception as e:
1418 logger.error(f"❌ Error getting thread context: {e}")
1419 return False
1420
1421 # Convert to YAML string
1422 thread_context = thread_to_yaml_string(thread_data)
1423 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters")
1424
1425 # Save comprehensive conversation data for debugging
1426 try:
1427 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1428 debug_dir.mkdir(parents=True, exist_ok=True)
1429
1430 # Save raw thread data (JSON)
1431 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f:
1432 json.dump(thread_data, f, indent=2)
1433
1434 # Save YAML thread context
1435 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f:
1436 f.write(thread_context)
1437
1438 # Save mention processing debug info
1439 debug_info = {
1440 'processed_at': datetime.now().isoformat(),
1441 'mention_id': mention_id,
1442 'conversation_id': conversation_id,
1443 'author_id': author_id,
1444 'in_reply_to_user_id': in_reply_to_user_id,
1445 'referenced_tweets': referenced_tweets,
1446 'thread_stats': {
1447 'total_posts': len(thread_posts),
1448 'total_users': len(thread_users),
1449 'yaml_length': len(thread_context)
1450 },
1451 'users_in_conversation': {
1452 user_id: {
1453 'username': user_info.get('username'),
1454 'name': user_info.get('name'),
1455 'verified': user_info.get('verified', False),
1456 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower()
1457 }
1458 for user_id, user_info in thread_users.items()
1459 }
1460 }
1461
1462 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f:
1463 json.dump(debug_info, f, indent=2)
1464
1465 logger.info(f"💾 Saved conversation debug data to: {debug_dir}")
1466
1467 except Exception as debug_error:
1468 logger.warning(f"Failed to save debug data: {debug_error}")
1469 # Continue processing even if debug save fails
1470
1471 # Check for #voidstop
1472 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower():
1473 logger.info("Found #voidstop, skipping this mention")
1474 return True
1475
1476 # Ensure X user blocks are attached
1477 try:
1478 ensure_x_user_blocks_attached(thread_data, void_agent.id)
1479 except Exception as e:
1480 logger.warning(f"Failed to ensure X user blocks: {e}")
1481 # Continue without user blocks rather than failing completely
1482
1483 # Create prompt for Letta agent
1484 author_info = thread_data.get('users', {}).get(author_id, {})
1485 author_username = author_info.get('username', 'unknown')
1486 author_name = author_info.get('name', author_username)
1487
1488 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}).
1489
1490MOST RECENT POST (the mention you're responding to):
1491"{mention_text}"
1492
1493FULL THREAD CONTEXT:
1494```yaml
1495{thread_context}
1496```
1497
1498The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
1499
1500If you need to update user information, use the x_user_* tools.
1501
1502To reply, use the add_post_to_x_thread tool:
1503- Each call creates one post (max 280 characters)
1504- For most responses, a single call is sufficient
1505- Only use multiple calls for threaded replies when:
1506 * The topic requires extended explanation that cannot fit in 280 characters
1507 * You're explicitly asked for a detailed/long response
1508 * The conversation naturally benefits from a structured multi-part answer
1509- Avoid unnecessary threads - be concise when possible"""
1510
1511 # Log mention processing
1512 title = f"X MENTION FROM @{author_username}"
1513 print(f"\n▶ {title}")
1514 print(f" {'═' * len(title)}")
1515 for line in mention_text.split('\n'):
1516 print(f" {line}")
1517
1518 # Send to Letta agent
1519 from config_loader import get_letta_config
1520 from letta_client import Letta
1521
1522 config = get_letta_config()
1523 letta_client = Letta(token=config['api_key'], timeout=config['timeout'])
1524
1525 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars")
1526
1527 try:
1528 # Use streaming to avoid timeout errors
1529 message_stream = letta_client.agents.messages.create_stream(
1530 agent_id=void_agent.id,
1531 messages=[{"role": "user", "content": prompt}],
1532 stream_tokens=False,
1533 max_steps=100
1534 )
1535
1536 # Collect streaming response (simplified version of bsky.py logic)
1537 all_messages = []
1538 for chunk in message_stream:
1539 if hasattr(chunk, 'message_type'):
1540 if chunk.message_type == 'reasoning_message':
1541 print("\n◆ Reasoning")
1542 print(" ─────────")
1543 for line in chunk.reasoning.split('\n'):
1544 print(f" {line}")
1545 elif chunk.message_type == 'tool_call_message':
1546 tool_name = chunk.tool_call.name
1547 if tool_name == 'add_post_to_x_thread':
1548 try:
1549 args = json.loads(chunk.tool_call.arguments)
1550 text = args.get('text', '')
1551 if text:
1552 print("\n✎ X Post")
1553 print(" ────────")
1554 for line in text.split('\n'):
1555 print(f" {line}")
1556 except:
1557 pass
1558 elif tool_name == 'halt_activity':
1559 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT")
1560 if queue_filepath and queue_filepath.exists():
1561 queue_filepath.unlink()
1562 logger.info(f"Deleted queue file: {queue_filepath.name}")
1563 logger.info("=== X BOT TERMINATED BY AGENT ===")
1564 exit(0)
1565 elif chunk.message_type == 'tool_return_message':
1566 tool_name = chunk.name
1567 status = chunk.status
1568 if status == 'success' and tool_name == 'add_post_to_x_thread':
1569 print("\n✓ X Post Queued")
1570 print(" ──────────────")
1571 print(" Post queued successfully")
1572 elif chunk.message_type == 'assistant_message':
1573 print("\n▶ Assistant Response")
1574 print(" ──────────────────")
1575 for line in chunk.content.split('\n'):
1576 print(f" {line}")
1577
1578 all_messages.append(chunk)
1579 if str(chunk) == 'done':
1580 break
1581
1582 # Convert streaming response for compatibility
1583 message_response = type('StreamingResponse', (), {
1584 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
1585 })()
1586
1587 except Exception as api_error:
1588 logger.error(f"Letta API error: {api_error}")
1589 raise
1590
1591 # Extract successful add_post_to_x_thread tool calls
1592 reply_candidates = []
1593 tool_call_results = {}
1594 ignored_notification = False
1595 ack_note = None # Track any note from annotate_ack tool
1596
1597 # First pass: collect tool return statuses
1598 for message in message_response.messages:
1599 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
1600 if message.name == 'add_post_to_x_thread':
1601 tool_call_results[message.tool_call_id] = message.status
1602 elif message.name == 'ignore_notification':
1603 if message.status == 'success':
1604 ignored_notification = True
1605 logger.info("🚫 X notification ignored")
1606
1607 # Second pass: collect successful tool calls
1608 for message in message_response.messages:
1609 if hasattr(message, 'tool_call') and message.tool_call:
1610 # Collect annotate_ack tool calls
1611 if message.tool_call.name == 'annotate_ack':
1612 try:
1613 args = json.loads(message.tool_call.arguments)
1614 note = args.get('note', '')
1615 if note:
1616 ack_note = note
1617 logger.debug(f"Found annotate_ack with note: {note[:50]}...")
1618 except json.JSONDecodeError as e:
1619 logger.error(f"Failed to parse annotate_ack arguments: {e}")
1620
1621 # Collect add_post_to_x_thread tool calls - only if they were successful
1622 elif message.tool_call.name == 'add_post_to_x_thread':
1623 tool_call_id = message.tool_call.tool_call_id
1624 tool_status = tool_call_results.get(tool_call_id, 'unknown')
1625
1626 if tool_status == 'success':
1627 try:
1628 args = json.loads(message.tool_call.arguments)
1629 reply_text = args.get('text', '')
1630 if reply_text:
1631 reply_candidates.append(reply_text)
1632 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...")
1633 except json.JSONDecodeError as e:
1634 logger.error(f"Failed to parse tool call arguments: {e}")
1635
1636 # Save agent response data to debug folder
1637 try:
1638 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1639
1640 # Save complete agent interaction
1641 agent_response_data = {
1642 'processed_at': datetime.now().isoformat(),
1643 'mention_id': mention_id,
1644 'conversation_id': conversation_id,
1645 'prompt_sent': prompt,
1646 'reply_candidates': reply_candidates,
1647 'ignored_notification': ignored_notification,
1648 'ack_note': ack_note,
1649 'tool_call_results': tool_call_results,
1650 'all_messages': []
1651 }
1652
1653 # Convert messages to serializable format
1654 for message in message_response.messages:
1655 msg_data = {
1656 'message_type': getattr(message, 'message_type', 'unknown'),
1657 'content': getattr(message, 'content', ''),
1658 'reasoning': getattr(message, 'reasoning', ''),
1659 'status': getattr(message, 'status', ''),
1660 'name': getattr(message, 'name', ''),
1661 }
1662
1663 if hasattr(message, 'tool_call') and message.tool_call:
1664 msg_data['tool_call'] = {
1665 'name': message.tool_call.name,
1666 'arguments': message.tool_call.arguments,
1667 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '')
1668 }
1669
1670 agent_response_data['all_messages'].append(msg_data)
1671
1672 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f:
1673 json.dump(agent_response_data, f, indent=2)
1674
1675 logger.info(f"💾 Saved agent response debug data")
1676
1677 except Exception as debug_error:
1678 logger.warning(f"Failed to save agent response debug data: {debug_error}")
1679
1680 # Handle conflicts
1681 if reply_candidates and ignored_notification:
1682 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!")
1683 return False
1684
1685 if reply_candidates:
1686 # Post replies to X
1687 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X")
1688
1689 if len(reply_candidates) == 1:
1690 content = reply_candidates[0]
1691 title = f"Reply to @{author_username}"
1692 else:
1693 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)])
1694 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)"
1695
1696 print(f"\n✎ {title}")
1697 print(f" {'─' * len(title)}")
1698 for line in content.split('\n'):
1699 print(f" {line}")
1700
1701 if testing_mode:
1702 logger.info("TESTING MODE: Skipping actual X post")
1703 return True
1704 else:
1705 # Post to X using thread approach
1706 success = post_x_thread_replies(x_client, mention_id, reply_candidates)
1707 if success:
1708 logger.info(f"Successfully replied to @{author_username} on X")
1709
1710 # Acknowledge the post we're replying to
1711 try:
1712 ack_result = acknowledge_x_post(x_client, mention_id, ack_note)
1713 if ack_result:
1714 if ack_note:
1715 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")")
1716 else:
1717 logger.info(f"Successfully acknowledged X post from @{author_username}")
1718 else:
1719 logger.warning(f"Failed to acknowledge X post from @{author_username}")
1720 except Exception as e:
1721 logger.error(f"Error acknowledging X post from @{author_username}: {e}")
1722 # Don't fail the entire operation if acknowledgment fails
1723
1724 return True
1725 else:
1726 logger.error(f"Failed to send reply to @{author_username} on X")
1727 return False
1728 else:
1729 if ignored_notification:
1730 logger.info(f"X mention from @{author_username} was explicitly ignored")
1731 return "ignored"
1732 else:
1733 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass")
1734 return False # Keep in queue for retry instead of removing
1735
1736 except Exception as e:
1737 logger.error(f"Error processing X mention: {e}")
1738 return False
1739
1740def acknowledge_x_post(x_client, post_id, note=None):
1741 """
1742 Acknowledge an X post that we replied to.
1743 Uses the same Bluesky client and uploads to the void data repository on atproto,
1744 just like Bluesky acknowledgments.
1745
1746 Args:
1747 x_client: XClient instance (not used, kept for compatibility)
1748 post_id: The X post ID we're acknowledging
1749 note: Optional note to include with the acknowledgment
1750
1751 Returns:
1752 True if successful, False otherwise
1753 """
1754 try:
1755 # Use Bluesky client to upload acks to the void data repository on atproto
1756 bsky_client = bsky_utils.default_login()
1757
1758 # Create a synthetic URI and CID for the X post
1759 # X posts don't have atproto URIs/CIDs, so we create identifiers
1760 post_uri = f"x://twitter.com/post/{post_id}"
1761 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts
1762
1763 # Use the same acknowledge_post function as Bluesky
1764 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note)
1765
1766 if ack_result:
1767 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else ""))
1768 return True
1769 else:
1770 logger.error(f"Failed to acknowledge X post {post_id}")
1771 return False
1772
1773 except Exception as e:
1774 logger.error(f"Error acknowledging X post {post_id}: {e}")
1775 return False
1776
1777def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages):
1778 """
1779 Post a series of replies to X, threading them properly.
1780
1781 Args:
1782 x_client: XClient instance
1783 in_reply_to_tweet_id: The original tweet ID to reply to
1784 reply_messages: List of reply text strings
1785
1786 Returns:
1787 True if successful, False otherwise
1788 """
1789 try:
1790 current_reply_id = in_reply_to_tweet_id
1791
1792 for i, reply_text in enumerate(reply_messages):
1793 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...")
1794
1795 result = x_client.post_reply(reply_text, current_reply_id)
1796
1797 if result and 'data' in result:
1798 new_tweet_id = result['data']['id']
1799 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}")
1800 # For threading, the next reply should reply to this one
1801 current_reply_id = new_tweet_id
1802 else:
1803 logger.error(f"Failed to post X reply {i+1}")
1804 return False
1805
1806 return True
1807
1808 except Exception as e:
1809 logger.error(f"Error posting X thread replies: {e}")
1810 return False
1811
1812def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False):
1813 """
1814 Load and process all X mentions from the queue.
1815 Similar to bsky.py load_and_process_queued_notifications but for X.
1816 """
1817 try:
1818 # Get all X mention files in queue directory
1819 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1820
1821 if not queue_files:
1822 return
1823
1824 # Load file metadata and sort by creation time (chronological order)
1825 file_metadata = []
1826 for filepath in queue_files:
1827 try:
1828 with open(filepath, 'r') as f:
1829 queue_data = json.load(f)
1830 mention_data = queue_data.get('mention', queue_data)
1831 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing
1832 file_metadata.append((created_at, filepath))
1833 except Exception as e:
1834 logger.warning(f"Error reading queue file {filepath.name}: {e}")
1835 # Add with default timestamp so it still gets processed
1836 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath))
1837
1838 # Sort by creation time (oldest first)
1839 file_metadata.sort(key=lambda x: x[0])
1840
1841 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order")
1842
1843 for i, (created_at, filepath) in enumerate(file_metadata, 1):
1844 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})")
1845
1846 try:
1847 # Load mention data
1848 with open(filepath, 'r') as f:
1849 queue_data = json.load(f)
1850
1851 mention_data = queue_data.get('mention', queue_data)
1852
1853 # Process the mention
1854 success = process_x_mention(void_agent, x_client, mention_data,
1855 queue_filepath=filepath, testing_mode=testing_mode)
1856
1857 except XRateLimitError:
1858 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning")
1859 break
1860
1861 except Exception as e:
1862 logger.error(f"Error processing X queue file {filepath.name}: {e}")
1863 continue
1864
1865 # Handle file based on processing result
1866 if success:
1867 if testing_mode:
1868 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}")
1869 else:
1870 filepath.unlink()
1871 logger.info(f"Successfully processed and removed X file: {filepath.name}")
1872
1873 # Mark as processed
1874 processed_mentions = load_processed_mentions()
1875 processed_mentions.add(mention_data.get('id'))
1876 save_processed_mentions(processed_mentions)
1877
1878 elif success is None: # Move to error directory
1879 error_dir = X_QUEUE_DIR / "errors"
1880 error_dir.mkdir(exist_ok=True)
1881 error_path = error_dir / filepath.name
1882 filepath.rename(error_path)
1883 logger.warning(f"Moved X file {filepath.name} to errors directory")
1884
1885 elif success == "no_reply": # Move to no_reply directory
1886 no_reply_dir = X_QUEUE_DIR / "no_reply"
1887 no_reply_dir.mkdir(exist_ok=True)
1888 no_reply_path = no_reply_dir / filepath.name
1889 filepath.rename(no_reply_path)
1890 logger.info(f"Moved X file {filepath.name} to no_reply directory")
1891
1892 elif success == "ignored": # Delete ignored notifications
1893 filepath.unlink()
1894 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}")
1895
1896 else:
1897 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry")
1898
1899 except Exception as e:
1900 logger.error(f"Error loading queued X mentions: {e}")
1901
1902def process_x_notifications(void_agent, x_client, testing_mode=False):
1903 """
1904 Fetch new X mentions, queue them, and process the queue.
1905 Similar to bsky.py process_notifications but for X.
1906 """
1907 try:
1908 # Get username for fetching mentions
1909 user_info = x_client._make_request("/users/me", params={"user.fields": "username"})
1910 if not user_info or "data" not in user_info:
1911 logger.error("Could not get username for X mentions")
1912 return
1913
1914 username = user_info["data"]["username"]
1915
1916 # Fetch and queue new mentions
1917 new_count = fetch_and_queue_mentions(username)
1918
1919 if new_count > 0:
1920 logger.info(f"Found {new_count} new X mentions to process")
1921
1922 # Process the entire queue
1923 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
1924
1925 except Exception as e:
1926 logger.error(f"Error processing X notifications: {e}")
1927
1928def initialize_x_void():
1929 """Initialize the void agent for X operations."""
1930 logger.info("Starting void agent initialization for X...")
1931
1932 from config_loader import get_letta_config
1933 from letta_client import Letta
1934
1935 # Get config
1936 config = get_letta_config()
1937 client = Letta(token=config['api_key'], timeout=config['timeout'])
1938 agent_id = config['agent_id']
1939
1940 try:
1941 void_agent = client.agents.retrieve(agent_id=agent_id)
1942 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})")
1943 except Exception as e:
1944 logger.error(f"Failed to load void agent {agent_id}: {e}")
1945 raise e
1946
1947 # Ensure correct tools are attached for X
1948 logger.info("Configuring tools for X platform...")
1949 try:
1950 from tool_manager import ensure_platform_tools
1951 ensure_platform_tools('x', void_agent.id)
1952 except Exception as e:
1953 logger.error(f"Failed to configure platform tools: {e}")
1954 logger.warning("Continuing with existing tool configuration")
1955
1956 # Log agent details
1957 logger.info(f"X Void agent details - ID: {void_agent.id}")
1958 logger.info(f"Agent name: {void_agent.name}")
1959
1960 return void_agent
1961
1962def x_main_loop(testing_mode=False):
1963 """
1964 Main X bot loop that continuously monitors for mentions and processes them.
1965 Similar to bsky.py main() but for X/Twitter.
1966 """
1967 import time
1968 from time import sleep
1969
1970 logger.info("=== STARTING X VOID BOT ===")
1971
1972 # Initialize void agent
1973 void_agent = initialize_x_void()
1974 logger.info(f"X void agent initialized: {void_agent.id}")
1975
1976 # Initialize X client
1977 x_client = create_x_client()
1978 logger.info("Connected to X API")
1979
1980 # Main loop
1981 FETCH_DELAY_SEC = 120 # Check every 2 minutes for X mentions (reduced from 60s to conserve API calls)
1982 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds")
1983
1984 if testing_mode:
1985 logger.info("=== RUNNING IN X TESTING MODE ===")
1986 logger.info(" - No messages will be sent to X")
1987 logger.info(" - Queue files will not be deleted")
1988
1989 cycle_count = 0
1990 start_time = time.time()
1991
1992 while True:
1993 try:
1994 cycle_count += 1
1995 logger.info(f"=== X CYCLE {cycle_count} ===")
1996
1997 # Process X notifications (fetch, queue, and process)
1998 process_x_notifications(void_agent, x_client, testing_mode)
1999
2000 # Log cycle completion
2001 elapsed_time = time.time() - start_time
2002 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes")
2003
2004 sleep(FETCH_DELAY_SEC)
2005
2006 except KeyboardInterrupt:
2007 elapsed_time = time.time() - start_time
2008 logger.info("=== X BOT STOPPED BY USER ===")
2009 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes")
2010 break
2011 except Exception as e:
2012 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===")
2013 logger.error(f"Error details: {e}")
2014 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...")
2015 sleep(FETCH_DELAY_SEC * 2)
2016
2017def process_queue_only(testing_mode=False):
2018 """
2019 Process all queued X mentions without fetching new ones.
2020 Useful for rate limit management - queue first, then process separately.
2021
2022 Args:
2023 testing_mode: If True, don't actually post to X and keep queue files
2024 """
2025 logger.info("=== PROCESSING X QUEUE ONLY ===")
2026
2027 if testing_mode:
2028 logger.info("=== RUNNING IN X TESTING MODE ===")
2029 logger.info(" - No messages will be sent to X")
2030 logger.info(" - Queue files will not be deleted")
2031
2032 try:
2033 # Initialize void agent
2034 void_agent = initialize_x_void()
2035 logger.info(f"X void agent initialized: {void_agent.id}")
2036
2037 # Initialize X client
2038 x_client = create_x_client()
2039 logger.info("Connected to X API")
2040
2041 # Process the queue without fetching new mentions
2042 logger.info("Processing existing X queue...")
2043 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
2044
2045 logger.info("=== X QUEUE PROCESSING COMPLETE ===")
2046
2047 except Exception as e:
2048 logger.error(f"Error processing X queue: {e}")
2049 raise
2050
2051def x_notification_loop():
2052 """
2053 DEPRECATED: Old X notification loop using search-based mention detection.
2054 Use x_main_loop() instead for the full bot experience.
2055 """
2056 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.")
2057 x_main_loop()
2058
2059if __name__ == "__main__":
2060 import sys
2061 import argparse
2062
2063 if len(sys.argv) > 1:
2064 if sys.argv[1] == "bot":
2065 # Main bot with optional --test flag
2066 parser = argparse.ArgumentParser(description='X Void Bot')
2067 parser.add_argument('command', choices=['bot'])
2068 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)')
2069 args = parser.parse_args()
2070 x_main_loop(testing_mode=args.test)
2071 elif sys.argv[1] == "loop":
2072 x_notification_loop()
2073 elif sys.argv[1] == "reply":
2074 reply_to_cameron_post()
2075 elif sys.argv[1] == "me":
2076 get_my_user_info()
2077 elif sys.argv[1] == "search":
2078 test_search_mentions()
2079 elif sys.argv[1] == "queue":
2080 test_fetch_and_queue()
2081 elif sys.argv[1] == "thread":
2082 test_thread_context()
2083 elif sys.argv[1] == "process":
2084 # Process all queued mentions with optional --test flag
2085 testing_mode = "--test" in sys.argv
2086 process_queue_only(testing_mode=testing_mode)
2087 elif sys.argv[1] == "letta":
2088 # Use specific agent ID if provided, otherwise use from config
2089 agent_id = sys.argv[2] if len(sys.argv) > 2 else None
2090 test_letta_integration(agent_id)
2091 elif sys.argv[1] == "downrank":
2092 # View or manage downrank list
2093 if len(sys.argv) > 2 and sys.argv[2] == "list":
2094 downrank_users = load_downrank_users()
2095 if downrank_users:
2096 print(f"📋 Downrank users ({len(downrank_users)} total):")
2097 for user_id in sorted(downrank_users):
2098 print(f" - {user_id}")
2099 else:
2100 print("📋 No downrank users configured")
2101 else:
2102 print("Usage: python x.py downrank list")
2103 print(" list - Show all downranked user IDs")
2104 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list")
2105 else:
2106 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]")
2107 print(" bot - Run the main X bot (use --test for testing mode)")
2108 print(" Example: python x.py bot --test")
2109 print(" queue - Fetch and queue mentions only (no processing)")
2110 print(" process - Process all queued mentions only (no fetching)")
2111 print(" Example: python x.py process --test")
2112 print(" downrank - Manage downrank users (10% response rate)")
2113 print(" Example: python x.py downrank list")
2114 print(" loop - Run the old notification monitoring loop (deprecated)")
2115 print(" reply - Reply to Cameron's specific post")
2116 print(" me - Get authenticated user info and correct user ID")
2117 print(" search - Test search-based mention detection")
2118 print(" thread - Test thread context retrieval from queued mention")
2119 print(" letta - Test sending thread context to Letta agent")
2120 print(" Optional: python x.py letta <agent-id>")
2121 else:
2122 test_x_client()