a digital person for bluesky
1import os
2import logging
3import requests
4import yaml
5import json
6import hashlib
7import random
8import time
9from typing import Optional, Dict, Any, List, Set
10from datetime import datetime
11from pathlib import Path
12from requests_oauthlib import OAuth1
13from rich import print as rprint
14from rich.panel import Panel
15from rich.text import Text
16
17import bsky_utils
18
19class XRateLimitError(Exception):
20 """Exception raised when X API rate limit is exceeded"""
21 pass
22
23
24# Configure logging
25logging.basicConfig(
26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
27)
28logger = logging.getLogger("x_client")
29
30# X-specific file paths
31X_QUEUE_DIR = Path("x_queue")
32X_CACHE_DIR = Path("x_cache")
33X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json")
34X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json")
35X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt")
36
37class XClient:
38 """X (Twitter) API client for fetching mentions and managing interactions."""
39
40 def __init__(self, api_key: str, user_id: str, access_token: str = None,
41 consumer_key: str = None, consumer_secret: str = None,
42 access_token_secret: str = None):
43 self.api_key = api_key
44 self.access_token = access_token
45 self.user_id = user_id
46 self.base_url = "https://api.x.com/2"
47
48 # Check if we have OAuth 1.0a credentials
49 if (consumer_key and consumer_secret and access_token and access_token_secret):
50 # Use OAuth 1.0a for User Context
51 self.oauth = OAuth1(
52 consumer_key,
53 client_secret=consumer_secret,
54 resource_owner_key=access_token,
55 resource_owner_secret=access_token_secret
56 )
57 self.headers = {"Content-Type": "application/json"}
58 self.auth_method = "oauth1a"
59 logger.info("Using OAuth 1.0a User Context authentication for X API")
60 elif access_token:
61 # Use OAuth 2.0 Bearer token for User Context
62 self.oauth = None
63 self.headers = {
64 "Authorization": f"Bearer {access_token}",
65 "Content-Type": "application/json"
66 }
67 self.auth_method = "oauth2_user"
68 logger.info("Using OAuth 2.0 User Context access token for X API")
69 else:
70 # Use Application-Only Bearer token
71 self.oauth = None
72 self.headers = {
73 "Authorization": f"Bearer {api_key}",
74 "Content-Type": "application/json"
75 }
76 self.auth_method = "bearer"
77 logger.info("Using Application-Only Bearer token for X API")
78
79 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]:
80 """Make a request to the X API with proper error handling and exponential backoff."""
81 url = f"{self.base_url}{endpoint}"
82
83 for attempt in range(max_retries):
84 try:
85 if method.upper() == "GET":
86 if self.oauth:
87 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth)
88 else:
89 response = requests.get(url, headers=self.headers, params=params)
90 elif method.upper() == "POST":
91 if self.oauth:
92 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth)
93 else:
94 response = requests.post(url, headers=self.headers, json=data)
95 else:
96 raise ValueError(f"Unsupported HTTP method: {method}")
97
98 response.raise_for_status()
99 return response.json()
100
101 except requests.exceptions.HTTPError as e:
102 if response.status_code == 401:
103 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials")
104 logger.error(f"Response: {response.text}")
105 return None # Don't retry auth failures
106 elif response.status_code == 403:
107 logger.error(f"X API forbidden with {self.auth_method} - check app permissions")
108 logger.error(f"Response: {response.text}")
109 return None # Don't retry permission failures
110 elif response.status_code == 429:
111 if attempt < max_retries - 1:
112 # Exponential backoff: 60s, 120s, 240s
113 backoff_time = 60 * (2 ** attempt)
114 logger.warning(f"X API rate limit exceeded (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry")
115 logger.error(f"Response: {response.text}")
116 time.sleep(backoff_time)
117 continue
118 else:
119 logger.error("X API rate limit exceeded - max retries reached")
120 logger.error(f"Response: {response.text}")
121 raise XRateLimitError("X API rate limit exceeded")
122 else:
123 if attempt < max_retries - 1:
124 # Exponential backoff for other HTTP errors too
125 backoff_time = 30 * (2 ** attempt)
126 logger.warning(f"X API request failed (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s")
127 logger.error(f"Response: {response.text}")
128 time.sleep(backoff_time)
129 continue
130 else:
131 logger.error(f"X API request failed after {max_retries} attempts: {e}")
132 logger.error(f"Response: {response.text}")
133 return None
134
135 except Exception as e:
136 if attempt < max_retries - 1:
137 backoff_time = 15 * (2 ** attempt)
138 logger.warning(f"Unexpected error making X API request (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s")
139 time.sleep(backoff_time)
140 continue
141 else:
142 logger.error(f"Unexpected error making X API request after {max_retries} attempts: {e}")
143 return None
144
145 return None
146
147 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[Dict]:
148 """
149 Fetch mentions for the configured user with user data.
150
151 Args:
152 since_id: Minimum Post ID to include (for getting newer mentions)
153 max_results: Number of results to return (5-100)
154
155 Returns:
156 Dict with 'mentions' and 'users' keys, or None if request failed
157 """
158 endpoint = f"/users/{self.user_id}/mentions"
159 params = {
160 "max_results": min(max(max_results, 5), 100), # Ensure within API limits
161 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets",
162 "user.fields": "id,name,username",
163 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
164 }
165
166 if since_id:
167 params["since_id"] = since_id
168
169 logger.info(f"Fetching mentions for user {self.user_id}")
170 response = self._make_request(endpoint, params)
171
172 if response:
173 logger.debug(f"X API response: {response}")
174
175 if response and "data" in response:
176 mentions = response["data"]
177 users_data = {}
178
179 # Extract user data from includes
180 if "includes" in response and "users" in response["includes"]:
181 for user in response["includes"]["users"]:
182 users_data[user["id"]] = user
183 logger.info(f"Retrieved user data for {len(users_data)} users")
184
185 logger.info(f"Retrieved {len(mentions)} mentions")
186 return {"mentions": mentions, "users": users_data}
187 else:
188 if response:
189 logger.info(f"No mentions in response. Full response: {response}")
190 else:
191 logger.warning("Request failed - no response received")
192 return {"mentions": [], "users": {}}
193
194 def get_user_info(self, user_id: str) -> Optional[Dict]:
195 """Get information about a specific user."""
196 endpoint = f"/users/{user_id}"
197 params = {
198 "user.fields": "id,name,username,description,public_metrics"
199 }
200
201 response = self._make_request(endpoint, params)
202 return response.get("data") if response else None
203
204 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]:
205 """
206 Search for mentions using the search endpoint instead of mentions endpoint.
207 This might have better rate limits than the direct mentions endpoint.
208
209 Args:
210 username: Username to search for mentions of (without @)
211 max_results: Number of results to return (10-100)
212 since_id: Only return results newer than this tweet ID
213
214 Returns:
215 List of tweets mentioning the username
216 """
217 endpoint = "/tweets/search/recent"
218
219 # Search for mentions of the username
220 query = f"@{username}"
221
222 params = {
223 "query": query,
224 "max_results": min(max(max_results, 10), 100),
225 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
226 "user.fields": "id,name,username",
227 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
228 }
229
230 if since_id:
231 params["since_id"] = since_id
232
233 logger.info(f"Searching for mentions of @{username}")
234 response = self._make_request(endpoint, params)
235
236 if response and "data" in response:
237 tweets = response["data"]
238 logger.info(f"Found {len(tweets)} mentions via search")
239 return tweets
240 else:
241 if response:
242 logger.info(f"No mentions found via search. Response: {response}")
243 else:
244 logger.warning("Search request failed")
245 return []
246
247 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]:
248 """
249 Get all tweets in a conversation thread up to a specific tweet ID.
250
251 Args:
252 conversation_id: The conversation ID to fetch (should be the original tweet ID)
253 use_cache: Whether to use cached data if available
254 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID)
255
256 Returns:
257 List of tweets in the conversation, ordered chronologically
258 """
259 # Check cache first if enabled
260 if use_cache:
261 cached_data = get_cached_thread_context(conversation_id)
262 if cached_data:
263 return cached_data
264
265 # First, get the original tweet directly since it might not appear in conversation search
266 original_tweet = None
267 try:
268 endpoint = f"/tweets/{conversation_id}"
269 params = {
270 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
271 "user.fields": "id,name,username",
272 "expansions": "author_id"
273 }
274 response = self._make_request(endpoint, params)
275 if response and "data" in response:
276 original_tweet = response["data"]
277 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}")
278 except Exception as e:
279 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}")
280
281 # Then search for all tweets in this conversation
282 endpoint = "/tweets/search/recent"
283 params = {
284 "query": f"conversation_id:{conversation_id}",
285 "max_results": 100, # Get as many as possible
286 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
287 "user.fields": "id,name,username",
288 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id",
289 "sort_order": "recency" # Get newest first, we'll reverse later
290 }
291
292 # Add until_id parameter to exclude tweets after the mention being processed
293 if until_id:
294 params["until_id"] = until_id
295 logger.info(f"Using until_id={until_id} to exclude future tweets")
296
297 logger.info(f"Fetching thread context for conversation {conversation_id}")
298 response = self._make_request(endpoint, params)
299
300 tweets = []
301 users_data = {}
302
303 # Collect tweets from search
304 if response and "data" in response:
305 tweets.extend(response["data"])
306 # Store user data for reference
307 if "includes" in response and "users" in response["includes"]:
308 for user in response["includes"]["users"]:
309 users_data[user["id"]] = user
310
311 # Add original tweet if we got it and it's not already in the list
312 if original_tweet:
313 tweet_ids = [t.get('id') for t in tweets]
314 if original_tweet.get('id') not in tweet_ids:
315 tweets.append(original_tweet)
316 logger.info("Added original tweet to thread context")
317
318 # Attempt to fill gaps by fetching referenced tweets that are missing
319 # This helps with X API's incomplete conversation search results
320 tweet_ids = set(t.get('id') for t in tweets)
321 missing_tweet_ids = set()
322 critical_missing_ids = set()
323
324 # Collect referenced tweet IDs, prioritizing critical ones
325 for tweet in tweets:
326 referenced_tweets = tweet.get('referenced_tweets', [])
327 for ref in referenced_tweets:
328 ref_id = ref.get('id')
329 ref_type = ref.get('type')
330 if ref_id and ref_id not in tweet_ids:
331 missing_tweet_ids.add(ref_id)
332 # Prioritize direct replies and quoted tweets over retweets
333 if ref_type in ['replied_to', 'quoted']:
334 critical_missing_ids.add(ref_id)
335
336 # For rate limit efficiency, only fetch critical missing tweets if we have many
337 if len(missing_tweet_ids) > 10:
338 logger.info(f"Many missing tweets ({len(missing_tweet_ids)}), prioritizing {len(critical_missing_ids)} critical ones")
339 missing_tweet_ids = critical_missing_ids
340
341 # Context sufficiency check - skip backfill if we already have enough context
342 if has_sufficient_context(tweets, missing_tweet_ids):
343 logger.info("Thread has sufficient context, skipping missing tweet backfill")
344 missing_tweet_ids = set()
345
346 # Fetch missing referenced tweets in batches (more rate-limit friendly)
347 if missing_tweet_ids:
348 missing_list = list(missing_tweet_ids)
349
350 # First, check cache for missing tweets
351 cached_tweets = get_cached_tweets(missing_list)
352 for tweet_id, cached_tweet in cached_tweets.items():
353 if cached_tweet.get('conversation_id') == conversation_id:
354 tweets.append(cached_tweet)
355 tweet_ids.add(tweet_id)
356 logger.info(f"Retrieved missing tweet from cache: {tweet_id}")
357
358 # Add user data if available in cache
359 if cached_tweet.get('author_info'):
360 author_id = cached_tweet.get('author_id')
361 if author_id:
362 users_data[author_id] = cached_tweet['author_info']
363
364 # Only fetch tweets that weren't found in cache
365 uncached_ids = [tid for tid in missing_list if tid not in cached_tweets]
366
367 if uncached_ids:
368 batch_size = 100 # X API limit for bulk tweet lookup
369
370 for i in range(0, len(uncached_ids), batch_size):
371 batch_ids = uncached_ids[i:i + batch_size]
372 try:
373 endpoint = "/tweets"
374 params = {
375 "ids": ",".join(batch_ids),
376 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
377 "user.fields": "id,name,username",
378 "expansions": "author_id"
379 }
380 response = self._make_request(endpoint, params)
381
382 if response and "data" in response:
383 fetched_tweets = []
384 batch_users_data = {}
385
386 for missing_tweet in response["data"]:
387 # Only add if it's actually part of this conversation
388 if missing_tweet.get('conversation_id') == conversation_id:
389 tweets.append(missing_tweet)
390 tweet_ids.add(missing_tweet.get('id'))
391 fetched_tweets.append(missing_tweet)
392 logger.info(f"Retrieved missing referenced tweet: {missing_tweet.get('id')}")
393
394 # Add user data if available
395 if "includes" in response and "users" in response["includes"]:
396 for user in response["includes"]["users"]:
397 users_data[user["id"]] = user
398 batch_users_data[user["id"]] = user
399
400 # Cache the newly fetched tweets
401 if fetched_tweets:
402 save_cached_tweets(fetched_tweets, batch_users_data)
403
404 logger.info(f"Batch fetched {len(response['data'])} missing tweets from {len(batch_ids)} requested")
405
406 # Handle partial success - log any missing tweets that weren't found
407 if response and "errors" in response:
408 for error in response["errors"]:
409 logger.warning(f"Could not fetch tweet {error.get('resource_id')}: {error.get('title')}")
410
411 except Exception as e:
412 logger.warning(f"Could not fetch batch of missing tweets {batch_ids[:3]}...: {e}")
413 else:
414 logger.info(f"All {len(missing_list)} missing tweets found in cache")
415
416 if tweets:
417 # Filter out tweets that occur after until_id (if specified)
418 if until_id:
419 original_count = len(tweets)
420 # Convert until_id to int for comparison (Twitter IDs are sequential)
421 until_id_int = int(until_id)
422 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int]
423 filtered_count = len(tweets)
424 if original_count != filtered_count:
425 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}")
426
427 # Sort chronologically (oldest first)
428 tweets.sort(key=lambda x: x.get('created_at', ''))
429 logger.info(f"Retrieved {len(tweets)} tweets in thread")
430
431 thread_data = {"tweets": tweets, "users": users_data}
432
433 # Cache individual tweets from the thread for future backfill
434 save_cached_tweets(tweets, users_data)
435
436 # Cache the result
437 if use_cache:
438 save_cached_thread_context(conversation_id, thread_data)
439
440 return thread_data
441 else:
442 logger.warning("No tweets found for thread context")
443 return None
444
445 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]:
446 """
447 Post a reply to a specific tweet.
448
449 Args:
450 reply_text: The text content of the reply
451 in_reply_to_tweet_id: The ID of the tweet to reply to
452
453 Returns:
454 Response data if successful, None if failed
455 """
456 endpoint = "/tweets"
457
458 payload = {
459 "text": reply_text,
460 "reply": {
461 "in_reply_to_tweet_id": in_reply_to_tweet_id
462 }
463 }
464
465 logger.info(f"Attempting to post reply with {self.auth_method} authentication")
466 result = self._make_request(endpoint, method="POST", data=payload)
467
468 if result:
469 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}")
470 return result
471 else:
472 logger.error("Failed to post reply")
473 return None
474
475 def post_tweet(self, tweet_text: str) -> Optional[Dict]:
476 """
477 Post a standalone tweet (not a reply).
478
479 Args:
480 tweet_text: The text content of the tweet (max 280 characters)
481
482 Returns:
483 Response data if successful, None if failed
484 """
485 endpoint = "/tweets"
486
487 # Validate tweet length
488 if len(tweet_text) > 280:
489 logger.error(f"Tweet text too long: {len(tweet_text)} characters (max 280)")
490 return None
491
492 payload = {
493 "text": tweet_text
494 }
495
496 logger.info(f"Attempting to post tweet with {self.auth_method} authentication")
497 result = self._make_request(endpoint, method="POST", data=payload)
498
499 if result:
500 logger.info(f"Successfully posted tweet")
501 return result
502 else:
503 logger.error("Failed to post tweet")
504 return None
505
506def load_x_config(config_path: str = "config.yaml") -> Dict[str, str]:
507 """Load X configuration from config file."""
508 try:
509 with open(config_path, 'r') as f:
510 config = yaml.safe_load(f)
511
512 x_config = config.get('x', {})
513 if not x_config.get('api_key') or not x_config.get('user_id'):
514 raise ValueError("X API key and user_id must be configured in config.yaml")
515
516 return x_config
517 except Exception as e:
518 logger.error(f"Failed to load X configuration: {e}")
519 raise
520
521def create_x_client(config_path: str = "config.yaml") -> XClient:
522 """Create and return an X client with configuration loaded from file."""
523 config = load_x_config(config_path)
524 return XClient(
525 api_key=config['api_key'],
526 user_id=config['user_id'],
527 access_token=config.get('access_token'),
528 consumer_key=config.get('consumer_key'),
529 consumer_secret=config.get('consumer_secret'),
530 access_token_secret=config.get('access_token_secret')
531 )
532
533def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str:
534 """
535 Convert a mention object to a YAML string for better AI comprehension.
536 Similar to thread_to_yaml_string in bsky_utils.py
537 """
538 # Extract relevant fields
539 simplified_mention = {
540 'id': mention.get('id'),
541 'text': mention.get('text'),
542 'author_id': mention.get('author_id'),
543 'created_at': mention.get('created_at'),
544 'in_reply_to_user_id': mention.get('in_reply_to_user_id')
545 }
546
547 # Add user information if available
548 if users_data and mention.get('author_id') in users_data:
549 user = users_data[mention.get('author_id')]
550 simplified_mention['author'] = {
551 'username': user.get('username'),
552 'name': user.get('name')
553 }
554
555 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False)
556
557def thread_to_yaml_string(thread_data: Dict) -> str:
558 """
559 Convert X thread context to YAML string for AI comprehension.
560 Similar to Bluesky's thread_to_yaml_string function.
561
562 Args:
563 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context()
564
565 Returns:
566 YAML string representation of the thread
567 """
568 if not thread_data or "tweets" not in thread_data:
569 return "conversation: []\n"
570
571 tweets = thread_data["tweets"]
572 users_data = thread_data.get("users", {})
573
574 simplified_thread = {
575 "conversation": []
576 }
577
578 for tweet in tweets:
579 # Get user info
580 author_id = tweet.get('author_id')
581 author_info = {}
582 if author_id and author_id in users_data:
583 user = users_data[author_id]
584 author_info = {
585 'username': user.get('username'),
586 'name': user.get('name')
587 }
588
589 # Build tweet object (simplified for AI consumption)
590 tweet_obj = {
591 'text': tweet.get('text'),
592 'created_at': tweet.get('created_at'),
593 'author': author_info,
594 'author_id': author_id # Include user ID for block management
595 }
596
597 simplified_thread["conversation"].append(tweet_obj)
598
599 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False)
600
601
602
603# X Caching and Queue System Functions
604
605def load_last_seen_id() -> Optional[str]:
606 """Load the last seen mention ID for incremental fetching."""
607 if X_LAST_SEEN_FILE.exists():
608 try:
609 with open(X_LAST_SEEN_FILE, 'r') as f:
610 data = json.load(f)
611 return data.get('last_seen_id')
612 except Exception as e:
613 logger.error(f"Error loading last seen ID: {e}")
614 return None
615
616def save_last_seen_id(mention_id: str):
617 """Save the last seen mention ID."""
618 try:
619 X_QUEUE_DIR.mkdir(exist_ok=True)
620 with open(X_LAST_SEEN_FILE, 'w') as f:
621 json.dump({
622 'last_seen_id': mention_id,
623 'updated_at': datetime.now().isoformat()
624 }, f)
625 logger.debug(f"Saved last seen ID: {mention_id}")
626 except Exception as e:
627 logger.error(f"Error saving last seen ID: {e}")
628
629def load_processed_mentions() -> set:
630 """Load the set of processed mention IDs."""
631 if X_PROCESSED_MENTIONS_FILE.exists():
632 try:
633 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f:
634 data = json.load(f)
635 # Keep only recent entries (last 10000)
636 if len(data) > 10000:
637 data = data[-10000:]
638 save_processed_mentions(set(data))
639 return set(data)
640 except Exception as e:
641 logger.error(f"Error loading processed mentions: {e}")
642 return set()
643
644def save_processed_mentions(processed_set: set):
645 """Save the set of processed mention IDs."""
646 try:
647 X_QUEUE_DIR.mkdir(exist_ok=True)
648 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f:
649 json.dump(list(processed_set), f)
650 except Exception as e:
651 logger.error(f"Error saving processed mentions: {e}")
652
653def load_downrank_users() -> Set[str]:
654 """Load the set of user IDs that should be downranked (responded to 10% of the time)."""
655 try:
656 if not X_DOWNRANK_USERS_FILE.exists():
657 return set()
658
659 downrank_users = set()
660 with open(X_DOWNRANK_USERS_FILE, 'r') as f:
661 for line in f:
662 line = line.strip()
663 # Skip empty lines and comments
664 if line and not line.startswith('#'):
665 downrank_users.add(line)
666
667 logger.info(f"Loaded {len(downrank_users)} downrank users")
668 return downrank_users
669 except Exception as e:
670 logger.error(f"Error loading downrank users: {e}")
671 return set()
672
673def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool:
674 """
675 Check if we should respond to a downranked user.
676 Returns True 10% of the time for downranked users, True 100% of the time for others.
677 """
678 if user_id not in downrank_users:
679 return True
680
681 # 10% chance for downranked users
682 should_respond = random.random() < 0.1
683 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)")
684 return should_respond
685
686def save_mention_to_queue(mention: Dict, users_data: Optional[Dict] = None):
687 """Save a mention to the queue directory for async processing with author info.
688
689 Args:
690 mention: The mention data from X API
691 users_data: Optional dict mapping user IDs to user data (including usernames)
692 """
693 try:
694 mention_id = mention.get('id')
695 if not mention_id:
696 logger.error("Mention missing ID, cannot queue")
697 return
698
699 # Check if already processed
700 processed_mentions = load_processed_mentions()
701 if mention_id in processed_mentions:
702 logger.debug(f"Mention {mention_id} already processed, skipping")
703 return
704
705 # Create queue directory
706 X_QUEUE_DIR.mkdir(exist_ok=True)
707
708 # Create filename using hash (similar to Bluesky system)
709 mention_str = json.dumps(mention, sort_keys=True)
710 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16]
711 filename = f"x_mention_{mention_hash}.json"
712
713 queue_file = X_QUEUE_DIR / filename
714
715 # Extract author info if users_data provided
716 author_id = mention.get('author_id')
717 author_username = None
718 author_name = None
719
720 if users_data and author_id and author_id in users_data:
721 user_info = users_data[author_id]
722 author_username = user_info.get('username')
723 author_name = user_info.get('name')
724 logger.info(f"Caching author info for @{author_username} ({author_name})")
725
726 # Save mention data with enhanced debugging information
727 mention_data = {
728 'mention': mention,
729 'queued_at': datetime.now().isoformat(),
730 'type': 'x_mention',
731 # Cache author info for later use
732 'author_info': {
733 'username': author_username,
734 'name': author_name,
735 'id': author_id
736 },
737 # Debug info for conversation tracking
738 'debug_info': {
739 'mention_id': mention.get('id'),
740 'author_id': mention.get('author_id'),
741 'conversation_id': mention.get('conversation_id'),
742 'in_reply_to_user_id': mention.get('in_reply_to_user_id'),
743 'referenced_tweets': mention.get('referenced_tweets', []),
744 'text_preview': mention.get('text', '')[:200],
745 'created_at': mention.get('created_at'),
746 'public_metrics': mention.get('public_metrics', {}),
747 'context_annotations': mention.get('context_annotations', [])
748 }
749 }
750
751 with open(queue_file, 'w') as f:
752 json.dump(mention_data, f, indent=2)
753
754 logger.info(f"Queued X mention {mention_id} -> {filename}")
755
756 except Exception as e:
757 logger.error(f"Error saving mention to queue: {e}")
758
759# X Cache Functions
760def get_cached_thread_context(conversation_id: str) -> Optional[Dict]:
761 """Load cached thread context if available."""
762 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
763 if cache_file.exists():
764 try:
765 with open(cache_file, 'r') as f:
766 cached_data = json.load(f)
767 # Check if cache is recent (within 1 hour)
768 from datetime import datetime, timedelta
769 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
770 if datetime.now() - cached_time < timedelta(hours=1):
771 logger.info(f"Using cached thread context for {conversation_id}")
772 return cached_data.get('thread_data')
773 except Exception as e:
774 logger.warning(f"Error loading cached thread context: {e}")
775 return None
776
777def save_cached_thread_context(conversation_id: str, thread_data: Dict):
778 """Save thread context to cache."""
779 try:
780 X_CACHE_DIR.mkdir(exist_ok=True)
781 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
782
783 cache_data = {
784 'conversation_id': conversation_id,
785 'thread_data': thread_data,
786 'cached_at': datetime.now().isoformat()
787 }
788
789 with open(cache_file, 'w') as f:
790 json.dump(cache_data, f, indent=2)
791
792 logger.debug(f"Cached thread context for {conversation_id}")
793 except Exception as e:
794 logger.error(f"Error caching thread context: {e}")
795
796def get_cached_tweets(tweet_ids: List[str]) -> Dict[str, Dict]:
797 """
798 Load cached individual tweets if available.
799 Returns dict mapping tweet_id -> tweet_data for found tweets.
800 """
801 cached_tweets = {}
802
803 for tweet_id in tweet_ids:
804 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json"
805 if cache_file.exists():
806 try:
807 with open(cache_file, 'r') as f:
808 cached_data = json.load(f)
809
810 # Use longer cache times for older tweets (24 hours vs 1 hour)
811 from datetime import datetime, timedelta
812 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
813 tweet_created = cached_data.get('tweet_data', {}).get('created_at', '')
814
815 # Parse tweet creation time to determine age
816 try:
817 from dateutil.parser import parse
818 tweet_age = datetime.now() - parse(tweet_created)
819 cache_duration = timedelta(hours=24) if tweet_age > timedelta(hours=24) else timedelta(hours=1)
820 except:
821 cache_duration = timedelta(hours=1) # Default to 1 hour if parsing fails
822
823 if datetime.now() - cached_time < cache_duration:
824 cached_tweets[tweet_id] = cached_data.get('tweet_data')
825 logger.debug(f"Using cached tweet {tweet_id}")
826
827 except Exception as e:
828 logger.warning(f"Error loading cached tweet {tweet_id}: {e}")
829
830 return cached_tweets
831
832def save_cached_tweets(tweets_data: List[Dict], users_data: Dict[str, Dict] = None):
833 """Save individual tweets to cache for future reuse."""
834 try:
835 X_CACHE_DIR.mkdir(exist_ok=True)
836
837 for tweet in tweets_data:
838 tweet_id = tweet.get('id')
839 if not tweet_id:
840 continue
841
842 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json"
843
844 # Include user data if available
845 tweet_with_user = tweet.copy()
846 if users_data and tweet.get('author_id') in users_data:
847 tweet_with_user['author_info'] = users_data[tweet.get('author_id')]
848
849 cache_data = {
850 'tweet_id': tweet_id,
851 'tweet_data': tweet_with_user,
852 'cached_at': datetime.now().isoformat()
853 }
854
855 with open(cache_file, 'w') as f:
856 json.dump(cache_data, f, indent=2)
857
858 logger.debug(f"Cached individual tweet {tweet_id}")
859
860 except Exception as e:
861 logger.error(f"Error caching individual tweets: {e}")
862
863def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool:
864 """
865 Determine if we have sufficient context to skip backfilling missing tweets.
866
867 Args:
868 tweets: List of tweets already in the thread
869 missing_tweet_ids: Set of missing tweet IDs we'd like to fetch
870
871 Returns:
872 True if context is sufficient, False if backfill is needed
873 """
874 # If no missing tweets, context is sufficient
875 if not missing_tweet_ids:
876 return True
877
878 # If we have a substantial conversation (5+ tweets), likely sufficient
879 if len(tweets) >= 5:
880 logger.debug(f"Thread has {len(tweets)} tweets, considering sufficient")
881 return True
882
883 # If only a few missing tweets and we have some context, might be enough
884 if len(missing_tweet_ids) <= 2 and len(tweets) >= 3:
885 logger.debug(f"Only {len(missing_tweet_ids)} missing tweets with {len(tweets)} existing, considering sufficient")
886 return True
887
888 # Check if we have conversational flow (mentions between users)
889 has_conversation_flow = False
890 for tweet in tweets:
891 text = tweet.get('text', '').lower()
892 # Look for mentions, replies, or conversational indicators
893 if '@' in text or 'reply' in text or len([t for t in tweets if t.get('author_id') != tweet.get('author_id')]) > 1:
894 has_conversation_flow = True
895 break
896
897 # If we have clear conversational flow and reasonable length, sufficient
898 if has_conversation_flow and len(tweets) >= 2:
899 logger.debug("Thread has conversational flow, considering sufficient")
900 return True
901
902 # Otherwise, we need to backfill
903 logger.debug(f"Context insufficient: {len(tweets)} tweets, {len(missing_tweet_ids)} missing, no clear flow")
904 return False
905
906def fetch_and_queue_mentions(username: str) -> int:
907 """
908 Single-pass function to fetch new mentions and queue them.
909 Returns number of new mentions found.
910 """
911 try:
912 client = create_x_client()
913
914 # Load last seen ID for incremental fetching
915 last_seen_id = load_last_seen_id()
916
917 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}")
918
919 # Search for mentions
920 # Get mentions with user data
921 result = client.get_mentions(
922 since_id=last_seen_id,
923 max_results=100 # Get as many as possible
924 )
925
926 if not result or not result["mentions"]:
927 logger.info("No new mentions found")
928 return 0
929
930 mentions = result["mentions"]
931 users_data = result["users"]
932
933 # Process mentions (newest first, so reverse to process oldest first)
934 mentions.reverse()
935 new_count = 0
936
937 for mention in mentions:
938 save_mention_to_queue(mention, users_data)
939 new_count += 1
940
941 # Update last seen ID to the most recent mention
942 if mentions:
943 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent
944 save_last_seen_id(most_recent_id)
945
946 logger.info(f"Queued {new_count} new X mentions")
947 return new_count
948
949 except Exception as e:
950 logger.error(f"Error fetching and queuing mentions: {e}")
951 return 0
952
953# Simple test function
954def get_my_user_info():
955 """Get the authenticated user's information to find correct user ID."""
956 try:
957 client = create_x_client()
958
959 # Use the /2/users/me endpoint to get authenticated user info
960 endpoint = "/users/me"
961 params = {
962 "user.fields": "id,name,username,description"
963 }
964
965 print("Fetching authenticated user information...")
966 response = client._make_request(endpoint, params=params)
967
968 if response and "data" in response:
969 user_data = response["data"]
970 print(f"✅ Found authenticated user:")
971 print(f" ID: {user_data.get('id')}")
972 print(f" Username: @{user_data.get('username')}")
973 print(f" Name: {user_data.get('name')}")
974 print(f" Description: {user_data.get('description', 'N/A')[:100]}...")
975 print(f"\n🔧 Update your config.yaml with:")
976 print(f" user_id: \"{user_data.get('id')}\"")
977 return user_data
978 else:
979 print("❌ Failed to get user information")
980 print(f"Response: {response}")
981 return None
982
983 except Exception as e:
984 print(f"Error getting user info: {e}")
985 return None
986
987def test_search_mentions():
988 """Test the search-based mention detection."""
989 try:
990 client = create_x_client()
991
992 # First get our username
993 user_info = client._make_request("/users/me", params={"user.fields": "username"})
994 if not user_info or "data" not in user_info:
995 print("❌ Could not get username")
996 return
997
998 username = user_info["data"]["username"]
999 print(f"🔍 Searching for mentions of @{username}")
1000
1001 mentions = client.search_mentions(username, max_results=5)
1002
1003 if mentions:
1004 print(f"✅ Found {len(mentions)} mentions via search:")
1005 for mention in mentions:
1006 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...")
1007 else:
1008 print("No mentions found via search")
1009
1010 except Exception as e:
1011 print(f"Search test failed: {e}")
1012
1013def test_fetch_and_queue():
1014 """Test the single-pass fetch and queue function."""
1015 try:
1016 client = create_x_client()
1017
1018 # Get our username
1019 user_info = client._make_request("/users/me", params={"user.fields": "username"})
1020 if not user_info or "data" not in user_info:
1021 print("❌ Could not get username")
1022 return
1023
1024 username = user_info["data"]["username"]
1025 print(f"🔄 Fetching and queueing mentions for @{username}")
1026
1027 # Show current state
1028 last_seen = load_last_seen_id()
1029 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}")
1030
1031 # Fetch and queue
1032 new_count = fetch_and_queue_mentions(username)
1033
1034 if new_count > 0:
1035 print(f"✅ Queued {new_count} new mentions")
1036 print(f"📁 Check ./x_queue/ directory for queued mentions")
1037
1038 # Show updated state
1039 new_last_seen = load_last_seen_id()
1040 print(f"📍 Updated last seen ID: {new_last_seen}")
1041 else:
1042 print("ℹ️ No new mentions to queue")
1043
1044 except Exception as e:
1045 print(f"Fetch and queue test failed: {e}")
1046
1047def test_thread_context():
1048 """Test thread context retrieval from a queued mention."""
1049 try:
1050 import json
1051
1052 # Find a queued mention file
1053 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1054 if not queue_files:
1055 print("❌ No queued mentions found. Run 'python x.py queue' first.")
1056 return
1057
1058 # Read the first mention
1059 mention_file = queue_files[0]
1060 with open(mention_file, 'r') as f:
1061 mention_data = json.load(f)
1062
1063 mention = mention_data['mention']
1064 print(f"📄 Using mention: {mention.get('id')}")
1065 print(f"📝 Text: {mention.get('text')}")
1066
1067 # Check if it has a conversation_id
1068 conversation_id = mention.get('conversation_id')
1069 if not conversation_id:
1070 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.")
1071 return
1072
1073 print(f"🧵 Getting thread context for conversation: {conversation_id}")
1074
1075 # Get thread context
1076 client = create_x_client()
1077 thread_data = client.get_thread_context(conversation_id)
1078
1079 if thread_data:
1080 tweets = thread_data.get('tweets', [])
1081 print(f"✅ Retrieved thread with {len(tweets)} tweets")
1082
1083 # Convert to YAML
1084 yaml_thread = thread_to_yaml_string(thread_data)
1085
1086 # Save thread context for inspection
1087 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml"
1088 with open(thread_file, 'w') as f:
1089 f.write(yaml_thread)
1090
1091 print(f"💾 Saved thread context to: {thread_file}")
1092 print("\n📋 Thread preview:")
1093 print(yaml_thread)
1094 else:
1095 print("❌ Failed to retrieve thread context")
1096
1097 except Exception as e:
1098 print(f"Thread context test failed: {e}")
1099
1100def test_letta_integration(agent_id: str = None):
1101 """Test sending X thread context to Letta agent."""
1102 try:
1103 from letta_client import Letta
1104 import json
1105 import yaml
1106
1107 # Load full config to access letta section
1108 try:
1109 with open("config.yaml", 'r') as f:
1110 full_config = yaml.safe_load(f)
1111
1112 letta_config = full_config.get('letta', {})
1113 api_key = letta_config.get('api_key')
1114 config_agent_id = letta_config.get('agent_id')
1115
1116 # Use agent_id from config if not provided as parameter
1117 if not agent_id:
1118 if config_agent_id:
1119 agent_id = config_agent_id
1120 print(f"ℹ️ Using agent_id from config: {agent_id}")
1121 else:
1122 print("❌ No agent_id found in config.yaml")
1123 print("Expected config structure:")
1124 print(" letta:")
1125 print(" agent_id: your-agent-id")
1126 return
1127 else:
1128 print(f"ℹ️ Using provided agent_id: {agent_id}")
1129
1130 if not api_key:
1131 # Try loading from environment as fallback
1132 import os
1133 api_key = os.getenv('LETTA_API_KEY')
1134 if not api_key:
1135 print("❌ LETTA_API_KEY not found in config.yaml or environment")
1136 print("Expected config structure:")
1137 print(" letta:")
1138 print(" api_key: your-letta-api-key")
1139 return
1140 else:
1141 print("ℹ️ Using LETTA_API_KEY from environment")
1142 else:
1143 print("ℹ️ Using LETTA_API_KEY from config.yaml")
1144
1145 except Exception as e:
1146 print(f"❌ Error loading config: {e}")
1147 return
1148
1149 letta_client = Letta(token=api_key, timeout=600)
1150 print(f"🤖 Connected to Letta, using agent: {agent_id}")
1151
1152 # Find a queued mention file
1153 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1154 if not queue_files:
1155 print("❌ No queued mentions found. Run 'python x.py queue' first.")
1156 return
1157
1158 # Read the first mention
1159 mention_file = queue_files[0]
1160 with open(mention_file, 'r') as f:
1161 mention_data = json.load(f)
1162
1163 mention = mention_data['mention']
1164 conversation_id = mention.get('conversation_id')
1165
1166 if not conversation_id:
1167 print("❌ No conversation_id found in mention.")
1168 return
1169
1170 print(f"🧵 Getting thread context for conversation: {conversation_id}")
1171
1172 # Get thread context
1173 x_client = create_x_client()
1174 thread_data = x_client.get_thread_context(conversation_id)
1175
1176 if not thread_data:
1177 print("❌ Failed to retrieve thread context")
1178 return
1179
1180 # Convert to YAML
1181 yaml_thread = thread_to_yaml_string(thread_data)
1182
1183 # Create prompt for the agent
1184 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately.
1185
1186Here is the thread context:
1187
1188{yaml_thread}
1189
1190Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona."""
1191
1192 prompt_char_count = len(prompt)
1193 print(f"📤 Sending thread context to Letta agent... | prompt: {prompt_char_count} chars")
1194
1195 # Print the prompt in a rich panel
1196 rprint(Panel(prompt, title=f"Prompt ({prompt_char_count} chars)", border_style="blue"))
1197
1198 # Send to Letta agent using streaming
1199 message_stream = letta_client.agents.messages.create_stream(
1200 agent_id=agent_id,
1201 messages=[{"role": "user", "content": prompt}],
1202 stream_tokens=False,
1203 max_steps=10
1204 )
1205
1206 print("🔄 Streaming response from agent...")
1207 response_text = ""
1208
1209 for chunk in message_stream:
1210 print(chunk)
1211 if hasattr(chunk, 'message_type'):
1212 if chunk.message_type == 'assistant_message':
1213 print(f"🤖 Agent response: {chunk.content}")
1214 response_text = chunk.content
1215 elif chunk.message_type == 'reasoning_message':
1216 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...")
1217 elif chunk.message_type == 'tool_call_message':
1218 print(f"🔧 Agent tool call: {chunk.tool_call.name}")
1219
1220 if response_text:
1221 print(f"\n✅ Agent generated response:")
1222 print(f"📝 Response: {response_text}")
1223 else:
1224 print("❌ No response generated by agent")
1225
1226 except Exception as e:
1227 print(f"Letta integration test failed: {e}")
1228 import traceback
1229 traceback.print_exc()
1230
1231def test_x_client():
1232 """Test the X client by fetching mentions."""
1233 try:
1234 client = create_x_client()
1235 result = client.get_mentions(max_results=5)
1236
1237 if result and result["mentions"]:
1238 mentions = result["mentions"]
1239 users_data = result["users"]
1240 print(f"Successfully retrieved {len(mentions)} mentions:")
1241 print(f"User data available for {len(users_data)} users")
1242 for mention in mentions:
1243 author_id = mention.get('author_id')
1244 author_info = users_data.get(author_id, {})
1245 username = author_info.get('username', 'unknown')
1246 print(f"- {mention.get('id')} from @{username}: {mention.get('text')[:50]}...")
1247 else:
1248 print("No mentions retrieved")
1249
1250 except Exception as e:
1251 print(f"Test failed: {e}")
1252
1253def reply_to_cameron_post():
1254 """
1255 Reply to Cameron's specific X post.
1256
1257 NOTE: This requires OAuth User Context authentication, not Bearer token.
1258 Current Bearer token is Application-Only which can't post.
1259 """
1260 try:
1261 client = create_x_client()
1262
1263 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618
1264 cameron_post_id = "1950690566909710618"
1265
1266 # Simple reply message
1267 reply_text = "Hello from void! 🤖 Testing X integration."
1268
1269 print(f"Attempting to reply to post {cameron_post_id}")
1270 print(f"Reply text: {reply_text}")
1271 print("\nNOTE: This will fail with current Bearer token (Application-Only)")
1272 print("Posting requires OAuth User Context authentication")
1273
1274 result = client.post_reply(reply_text, cameron_post_id)
1275
1276 if result:
1277 print(f"✅ Successfully posted reply!")
1278 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}")
1279 else:
1280 print("❌ Failed to post reply (expected with current auth)")
1281
1282 except Exception as e:
1283 print(f"Reply failed: {e}")
1284
1285def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False):
1286 """
1287 Process an X mention and generate a reply using the Letta agent.
1288 Similar to bsky.py process_mention but for X/Twitter.
1289
1290 Args:
1291 void_agent: The Letta agent instance
1292 x_client: The X API client
1293 mention_data: The mention data dictionary
1294 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
1295 testing_mode: If True, don't actually post to X
1296
1297 Returns:
1298 True: Successfully processed, remove from queue
1299 False: Failed but retryable, keep in queue
1300 None: Failed with non-retryable error, move to errors directory
1301 "no_reply": No reply was generated, move to no_reply directory
1302 """
1303 try:
1304 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}")
1305
1306 # Extract mention details
1307 if isinstance(mention_data, dict):
1308 # Handle both raw mention and queued mention formats
1309 if 'mention' in mention_data:
1310 mention = mention_data['mention']
1311 else:
1312 mention = mention_data
1313 else:
1314 mention = mention_data
1315
1316 mention_id = mention.get('id')
1317 mention_text = mention.get('text', '')
1318 author_id = mention.get('author_id')
1319 conversation_id = mention.get('conversation_id')
1320 in_reply_to_user_id = mention.get('in_reply_to_user_id')
1321 referenced_tweets = mention.get('referenced_tweets', [])
1322
1323 # Check downrank list - only respond to downranked users 10% of the time
1324 downrank_users = load_downrank_users()
1325 if not should_respond_to_downranked_user(str(author_id), downrank_users):
1326 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection")
1327 return "no_reply"
1328
1329 # Enhanced conversation tracking for debug - especially important for Grok handling
1330 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}")
1331 logger.info(f" Author ID: {author_id}")
1332 logger.info(f" Conversation ID: {conversation_id}")
1333 logger.info(f" In Reply To User ID: {in_reply_to_user_id}")
1334 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items")
1335 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets
1336 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}")
1337 logger.info(f" Text preview: {mention_text[:100]}...")
1338
1339 # If no conversation_id, try to use referenced tweet as conversation root
1340 if not conversation_id and referenced_tweets:
1341 # For replies, use the tweet being replied to as conversation root
1342 for ref in referenced_tweets:
1343 if ref.get('type') == 'replied_to':
1344 conversation_id = ref.get('id')
1345 logger.info(f"📎 No conversation_id, using replied_to tweet {conversation_id} as conversation root")
1346 break
1347
1348 if not conversation_id:
1349 # If still no conversation ID, use the mention itself as a standalone
1350 conversation_id = mention_id
1351 logger.warning(f"⚠️ No conversation_id found for mention {mention_id} - treating as standalone tweet")
1352
1353 # Get thread context with caching enabled for efficiency
1354 # Use mention_id as until_id to exclude tweets that occurred after this mention
1355 try:
1356 thread_data = x_client.get_thread_context(conversation_id, use_cache=True, until_id=mention_id)
1357 if not thread_data:
1358 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}")
1359 return False
1360
1361 # If this mention references a specific tweet, ensure we have that tweet in context
1362 if referenced_tweets:
1363 for ref in referenced_tweets:
1364 if ref.get('type') == 'replied_to':
1365 ref_id = ref.get('id')
1366 # Check if the referenced tweet is in our thread data
1367 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])]
1368 if ref_id and ref_id not in thread_tweet_ids:
1369 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch")
1370 try:
1371 # Fetch the missing referenced tweet directly
1372 endpoint = f"/tweets/{ref_id}"
1373 params = {
1374 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
1375 "user.fields": "id,name,username",
1376 "expansions": "author_id"
1377 }
1378 response = x_client._make_request(endpoint, params)
1379 if response and "data" in response:
1380 missing_tweet = response["data"]
1381 if missing_tweet.get('conversation_id') == conversation_id:
1382 # Add to thread data
1383 if 'tweets' not in thread_data:
1384 thread_data['tweets'] = []
1385 thread_data['tweets'].append(missing_tweet)
1386
1387 # Add user data if available
1388 if "includes" in response and "users" in response["includes"]:
1389 if 'users' not in thread_data:
1390 thread_data['users'] = {}
1391 for user in response["includes"]["users"]:
1392 thread_data['users'][user["id"]] = user
1393
1394 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context")
1395 else:
1396 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}")
1397 except Exception as e:
1398 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}")
1399
1400 # Enhanced thread context debugging
1401 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}")
1402 thread_posts = thread_data.get('tweets', [])
1403 thread_users = thread_data.get('users', {})
1404 logger.info(f" Posts in thread: {len(thread_posts)}")
1405 logger.info(f" Users in thread: {len(thread_users)}")
1406
1407 # Log thread participants for Grok detection
1408 for user_id, user_info in thread_users.items():
1409 username = user_info.get('username', 'unknown')
1410 name = user_info.get('name', 'Unknown')
1411 is_verified = user_info.get('verified', False)
1412 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}")
1413
1414 # Special logging for Grok or AI-related users
1415 if 'grok' in username.lower() or 'grok' in name.lower():
1416 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})")
1417
1418 # Log conversation structure
1419 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts
1420 post_id = post.get('id')
1421 post_author = post.get('author_id')
1422 post_text = post.get('text', '')[:50]
1423 is_reply = 'in_reply_to_user_id' in post
1424 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...")
1425
1426 except Exception as e:
1427 logger.error(f"❌ Error getting thread context: {e}")
1428 return False
1429
1430 # Convert to YAML string
1431 thread_context = thread_to_yaml_string(thread_data)
1432 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters")
1433
1434 # Save comprehensive conversation data for debugging
1435 try:
1436 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1437 debug_dir.mkdir(parents=True, exist_ok=True)
1438
1439 # Save raw thread data (JSON)
1440 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f:
1441 json.dump(thread_data, f, indent=2)
1442
1443 # Save YAML thread context
1444 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f:
1445 f.write(thread_context)
1446
1447 # Save mention processing debug info
1448 debug_info = {
1449 'processed_at': datetime.now().isoformat(),
1450 'mention_id': mention_id,
1451 'conversation_id': conversation_id,
1452 'author_id': author_id,
1453 'in_reply_to_user_id': in_reply_to_user_id,
1454 'referenced_tweets': referenced_tweets,
1455 'thread_stats': {
1456 'total_posts': len(thread_posts),
1457 'total_users': len(thread_users),
1458 'yaml_length': len(thread_context)
1459 },
1460 'users_in_conversation': {
1461 user_id: {
1462 'username': user_info.get('username'),
1463 'name': user_info.get('name'),
1464 'verified': user_info.get('verified', False),
1465 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower()
1466 }
1467 for user_id, user_info in thread_users.items()
1468 }
1469 }
1470
1471 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f:
1472 json.dump(debug_info, f, indent=2)
1473
1474 logger.info(f"💾 Saved conversation debug data to: {debug_dir}")
1475
1476 except Exception as debug_error:
1477 logger.warning(f"Failed to save debug data: {debug_error}")
1478 # Continue processing even if debug save fails
1479
1480 # Check for #voidstop
1481 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower():
1482 logger.info("Found #voidstop, skipping this mention")
1483 return True
1484
1485 # Note: X user block attachment removed - no longer using user-specific memory blocks
1486
1487 # Create prompt for Letta agent
1488 # First try to use cached author info from queued mention
1489 author_username = 'unknown'
1490 author_name = 'unknown'
1491
1492 if 'author_info' in mention_data:
1493 # Use cached author info from when mention was queued
1494 cached_info = mention_data['author_info']
1495 if cached_info.get('username'):
1496 author_username = cached_info['username']
1497 author_name = cached_info.get('name', author_username)
1498 logger.info(f"Using cached author info: @{author_username} ({author_name})")
1499
1500 # If not cached, try thread data
1501 if author_username == 'unknown':
1502 author_info = thread_data.get('users', {}).get(author_id, {})
1503 author_username = author_info.get('username', 'unknown')
1504 author_name = author_info.get('name', author_username)
1505
1506 # Final fallback: if username is still unknown, try to find it in the thread tweets
1507 if author_username == 'unknown' and 'tweets' in thread_data:
1508 for tweet in thread_data['tweets']:
1509 if tweet.get('author_id') == author_id and 'author' in tweet:
1510 tweet_author = tweet['author']
1511 if tweet_author.get('username'):
1512 author_username = tweet_author['username']
1513 author_name = tweet_author.get('name', author_username)
1514 logger.info(f"Resolved unknown author via thread fallback: @{author_username} ({author_name})")
1515 break
1516
1517 # Build user ID mapping from thread data
1518 user_id_mapping = {}
1519 if 'tweets' in thread_data:
1520 for tweet in thread_data['tweets']:
1521 author_id_tweet = tweet.get('author_id')
1522 if author_id_tweet and 'author' in tweet:
1523 tweet_author = tweet['author']
1524 username = tweet_author.get('username')
1525 if username and author_id_tweet not in user_id_mapping:
1526 user_id_mapping[author_id_tweet] = f"@{username}"
1527
1528 # Also add users from the users dict if available
1529 if 'users' in thread_data:
1530 for user_id, user_data in thread_data['users'].items():
1531 username = user_data.get('username')
1532 if username and user_id not in user_id_mapping:
1533 user_id_mapping[user_id] = f"@{username}"
1534
1535 # Format user ID mapping for prompt
1536 id_mapping_text = ""
1537 if user_id_mapping:
1538 id_mapping_lines = [f" {user_id}: {handle}" for user_id, handle in user_id_mapping.items()]
1539 id_mapping_text = f"\n\nUSER_ID_KEY:\n" + "\n".join(id_mapping_lines)
1540
1541 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}).
1542
1543MOST RECENT POST (the mention you're responding to):
1544"{mention_text}"
1545
1546FULL THREAD CONTEXT:
1547```yaml
1548{thread_context}
1549```
1550
1551The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.{id_mapping_text}
1552
1553If you need to update user information, use the x_user_* tools.
1554
1555To reply, use the add_post_to_x_thread tool:
1556- Each call creates one post (max 280 characters)
1557- For most responses, a single call is sufficient
1558- Only use multiple calls for threaded replies when:
1559 * The topic requires extended explanation that cannot fit in 280 characters
1560 * You're explicitly asked for a detailed/long response
1561 * The conversation naturally benefits from a structured multi-part answer
1562- Avoid unnecessary threads - be concise when possible"""
1563
1564 # Log mention processing
1565 title = f"X MENTION FROM @{author_username}"
1566 print(f"\n▶ {title}")
1567 print(f" {'═' * len(title)}")
1568 for line in mention_text.split('\n'):
1569 print(f" {line}")
1570
1571 # Send to Letta agent
1572 from config_loader import get_letta_config
1573 from letta_client import Letta
1574
1575 config = get_letta_config()
1576 letta_client = Letta(token=config['api_key'], timeout=config['timeout'])
1577
1578 prompt_char_count = len(prompt)
1579 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars | prompt: {prompt_char_count} chars")
1580
1581 try:
1582 # Use streaming to avoid timeout errors
1583 message_stream = letta_client.agents.messages.create_stream(
1584 agent_id=void_agent.id,
1585 messages=[{"role": "user", "content": prompt}],
1586 stream_tokens=False,
1587 max_steps=100
1588 )
1589
1590 # Collect streaming response (simplified version of bsky.py logic)
1591 all_messages = []
1592 for chunk in message_stream:
1593 if hasattr(chunk, 'message_type'):
1594 if chunk.message_type == 'reasoning_message':
1595 print("\n◆ Reasoning")
1596 print(" ─────────")
1597 for line in chunk.reasoning.split('\n'):
1598 print(f" {line}")
1599 elif chunk.message_type == 'tool_call_message':
1600 tool_name = chunk.tool_call.name
1601 if tool_name == 'add_post_to_x_thread':
1602 try:
1603 args = json.loads(chunk.tool_call.arguments)
1604 text = args.get('text', '')
1605 if text:
1606 print("\n✎ X Post")
1607 print(" ────────")
1608 for line in text.split('\n'):
1609 print(f" {line}")
1610 except:
1611 pass
1612 elif tool_name == 'halt_activity':
1613 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT")
1614 if queue_filepath and queue_filepath.exists():
1615 queue_filepath.unlink()
1616 logger.info(f"Deleted queue file: {queue_filepath.name}")
1617 logger.info("=== X BOT TERMINATED BY AGENT ===")
1618 exit(0)
1619 elif chunk.message_type == 'tool_return_message':
1620 tool_name = chunk.name
1621 status = chunk.status
1622 if status == 'success' and tool_name == 'add_post_to_x_thread':
1623 print("\n✓ X Post Queued")
1624 print(" ──────────────")
1625 print(" Post queued successfully")
1626 elif chunk.message_type == 'assistant_message':
1627 print("\n▶ Assistant Response")
1628 print(" ──────────────────")
1629 for line in chunk.content.split('\n'):
1630 print(f" {line}")
1631
1632 all_messages.append(chunk)
1633 if str(chunk) == 'done':
1634 break
1635
1636 # Convert streaming response for compatibility
1637 message_response = type('StreamingResponse', (), {
1638 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
1639 })()
1640
1641 except Exception as api_error:
1642 logger.error(f"Letta API error: {api_error}")
1643 raise
1644
1645 # Extract successful add_post_to_x_thread tool calls
1646 reply_candidates = []
1647 tool_call_results = {}
1648 ignored_notification = False
1649 ack_note = None # Track any note from annotate_ack tool
1650
1651 # First pass: collect tool return statuses
1652 for message in message_response.messages:
1653 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
1654 if message.name == 'add_post_to_x_thread':
1655 tool_call_results[message.tool_call_id] = message.status
1656 elif message.name == 'ignore_notification':
1657 if message.status == 'success':
1658 ignored_notification = True
1659 logger.info("🚫 X notification ignored")
1660
1661 # Second pass: collect successful tool calls
1662 for message in message_response.messages:
1663 if hasattr(message, 'tool_call') and message.tool_call:
1664 # Collect annotate_ack tool calls
1665 if message.tool_call.name == 'annotate_ack':
1666 try:
1667 args = json.loads(message.tool_call.arguments)
1668 note = args.get('note', '')
1669 if note:
1670 ack_note = note
1671 logger.debug(f"Found annotate_ack with note: {note[:50]}...")
1672 except json.JSONDecodeError as e:
1673 logger.error(f"Failed to parse annotate_ack arguments: {e}")
1674
1675 # Collect add_post_to_x_thread tool calls - only if they were successful
1676 elif message.tool_call.name == 'add_post_to_x_thread':
1677 tool_call_id = message.tool_call.tool_call_id
1678 tool_status = tool_call_results.get(tool_call_id, 'unknown')
1679
1680 if tool_status == 'success':
1681 try:
1682 args = json.loads(message.tool_call.arguments)
1683 reply_text = args.get('text', '')
1684 if reply_text:
1685 reply_candidates.append(reply_text)
1686 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...")
1687 except json.JSONDecodeError as e:
1688 logger.error(f"Failed to parse tool call arguments: {e}")
1689
1690 # Save agent response data to debug folder
1691 try:
1692 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1693
1694 # Save complete agent interaction
1695 agent_response_data = {
1696 'processed_at': datetime.now().isoformat(),
1697 'mention_id': mention_id,
1698 'conversation_id': conversation_id,
1699 'prompt_sent': prompt,
1700 'reply_candidates': reply_candidates,
1701 'ignored_notification': ignored_notification,
1702 'ack_note': ack_note,
1703 'tool_call_results': tool_call_results,
1704 'all_messages': []
1705 }
1706
1707 # Convert messages to serializable format
1708 for message in message_response.messages:
1709 msg_data = {
1710 'message_type': getattr(message, 'message_type', 'unknown'),
1711 'content': getattr(message, 'content', ''),
1712 'reasoning': getattr(message, 'reasoning', ''),
1713 'status': getattr(message, 'status', ''),
1714 'name': getattr(message, 'name', ''),
1715 }
1716
1717 if hasattr(message, 'tool_call') and message.tool_call:
1718 msg_data['tool_call'] = {
1719 'name': message.tool_call.name,
1720 'arguments': message.tool_call.arguments,
1721 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '')
1722 }
1723
1724 agent_response_data['all_messages'].append(msg_data)
1725
1726 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f:
1727 json.dump(agent_response_data, f, indent=2)
1728
1729 logger.info(f"💾 Saved agent response debug data")
1730
1731 except Exception as debug_error:
1732 logger.warning(f"Failed to save agent response debug data: {debug_error}")
1733
1734 # Handle conflicts
1735 if reply_candidates and ignored_notification:
1736 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!")
1737 return False
1738
1739 if reply_candidates:
1740 # Post replies to X
1741 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X")
1742
1743 if len(reply_candidates) == 1:
1744 content = reply_candidates[0]
1745 title = f"Reply to @{author_username}"
1746 else:
1747 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)])
1748 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)"
1749
1750 print(f"\n✎ {title}")
1751 print(f" {'─' * len(title)}")
1752 for line in content.split('\n'):
1753 print(f" {line}")
1754
1755 if testing_mode:
1756 logger.info("TESTING MODE: Skipping actual X post")
1757 return True
1758 else:
1759 # Post to X using thread approach
1760 success = post_x_thread_replies(x_client, mention_id, reply_candidates)
1761 if success:
1762 logger.info(f"Successfully replied to @{author_username} on X")
1763
1764 # Acknowledge the post we're replying to
1765 try:
1766 ack_result = acknowledge_x_post(x_client, mention_id, ack_note)
1767 if ack_result:
1768 if ack_note:
1769 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")")
1770 else:
1771 logger.info(f"Successfully acknowledged X post from @{author_username}")
1772 else:
1773 logger.warning(f"Failed to acknowledge X post from @{author_username}")
1774 except Exception as e:
1775 logger.error(f"Error acknowledging X post from @{author_username}: {e}")
1776 # Don't fail the entire operation if acknowledgment fails
1777
1778 return True
1779 else:
1780 logger.error(f"Failed to send reply to @{author_username} on X")
1781 return False
1782 else:
1783 if ignored_notification:
1784 logger.info(f"X mention from @{author_username} was explicitly ignored")
1785 return "ignored"
1786 else:
1787 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass")
1788 return False # Keep in queue for retry instead of removing
1789
1790 except Exception as e:
1791 logger.error(f"Error processing X mention: {e}")
1792 return False
1793
1794def acknowledge_x_post(x_client, post_id, note=None):
1795 """
1796 Acknowledge an X post that we replied to.
1797 Uses the same Bluesky client and uploads to the void data repository on atproto,
1798 just like Bluesky acknowledgments.
1799
1800 Args:
1801 x_client: XClient instance (not used, kept for compatibility)
1802 post_id: The X post ID we're acknowledging
1803 note: Optional note to include with the acknowledgment
1804
1805 Returns:
1806 True if successful, False otherwise
1807 """
1808 try:
1809 # Use Bluesky client to upload acks to the void data repository on atproto
1810 bsky_client = bsky_utils.default_login()
1811
1812 # Create a synthetic URI and CID for the X post
1813 # X posts don't have atproto URIs/CIDs, so we create identifiers
1814 post_uri = f"x://twitter.com/post/{post_id}"
1815 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts
1816
1817 # Use the same acknowledge_post function as Bluesky
1818 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note)
1819
1820 if ack_result:
1821 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else ""))
1822 return True
1823 else:
1824 logger.error(f"Failed to acknowledge X post {post_id}")
1825 return False
1826
1827 except Exception as e:
1828 logger.error(f"Error acknowledging X post {post_id}: {e}")
1829 return False
1830
1831def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages):
1832 """
1833 Post a series of replies to X, threading them properly.
1834
1835 Args:
1836 x_client: XClient instance
1837 in_reply_to_tweet_id: The original tweet ID to reply to
1838 reply_messages: List of reply text strings
1839
1840 Returns:
1841 True if successful, False otherwise
1842 """
1843 try:
1844 current_reply_id = in_reply_to_tweet_id
1845
1846 for i, reply_text in enumerate(reply_messages):
1847 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...")
1848
1849 result = x_client.post_reply(reply_text, current_reply_id)
1850
1851 if result and 'data' in result:
1852 new_tweet_id = result['data']['id']
1853 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}")
1854 # For threading, the next reply should reply to this one
1855 current_reply_id = new_tweet_id
1856 else:
1857 logger.error(f"Failed to post X reply {i+1}")
1858 return False
1859
1860 return True
1861
1862 except Exception as e:
1863 logger.error(f"Error posting X thread replies: {e}")
1864 return False
1865
1866def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False):
1867 """
1868 Load and process all X mentions from the queue.
1869 Similar to bsky.py load_and_process_queued_notifications but for X.
1870 """
1871 try:
1872 # Get all X mention files in queue directory
1873 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1874
1875 if not queue_files:
1876 return
1877
1878 # Load file metadata and sort by creation time (chronological order)
1879 file_metadata = []
1880 for filepath in queue_files:
1881 try:
1882 with open(filepath, 'r') as f:
1883 queue_data = json.load(f)
1884 mention_data = queue_data.get('mention', queue_data)
1885 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing
1886 file_metadata.append((created_at, filepath))
1887 except Exception as e:
1888 logger.warning(f"Error reading queue file {filepath.name}: {e}")
1889 # Add with default timestamp so it still gets processed
1890 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath))
1891
1892 # Sort by creation time (oldest first)
1893 file_metadata.sort(key=lambda x: x[0])
1894
1895 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order")
1896
1897 for i, (created_at, filepath) in enumerate(file_metadata, 1):
1898 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})")
1899
1900 try:
1901 # Load mention data
1902 with open(filepath, 'r') as f:
1903 queue_data = json.load(f)
1904
1905 # Process the mention (pass full queue_data to have access to author_info)
1906 success = process_x_mention(void_agent, x_client, queue_data,
1907 queue_filepath=filepath, testing_mode=testing_mode)
1908
1909 except XRateLimitError:
1910 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning")
1911 break
1912
1913 except Exception as e:
1914 logger.error(f"Error processing X queue file {filepath.name}: {e}")
1915 continue
1916
1917 # Handle file based on processing result
1918 if success:
1919 if testing_mode:
1920 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}")
1921 else:
1922 filepath.unlink()
1923 logger.info(f"Successfully processed and removed X file: {filepath.name}")
1924
1925 # Mark as processed
1926 processed_mentions = load_processed_mentions()
1927 processed_mentions.add(mention_data.get('id'))
1928 save_processed_mentions(processed_mentions)
1929
1930 elif success is None: # Move to error directory
1931 error_dir = X_QUEUE_DIR / "errors"
1932 error_dir.mkdir(exist_ok=True)
1933 error_path = error_dir / filepath.name
1934 filepath.rename(error_path)
1935 logger.warning(f"Moved X file {filepath.name} to errors directory")
1936
1937 elif success == "no_reply": # Move to no_reply directory
1938 no_reply_dir = X_QUEUE_DIR / "no_reply"
1939 no_reply_dir.mkdir(exist_ok=True)
1940 no_reply_path = no_reply_dir / filepath.name
1941 filepath.rename(no_reply_path)
1942 logger.info(f"Moved X file {filepath.name} to no_reply directory")
1943
1944 elif success == "ignored": # Delete ignored notifications
1945 filepath.unlink()
1946 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}")
1947
1948 else:
1949 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry")
1950
1951 except Exception as e:
1952 logger.error(f"Error loading queued X mentions: {e}")
1953
1954def process_x_notifications(void_agent, x_client, testing_mode=False):
1955 """
1956 Fetch new X mentions, queue them, and process the queue.
1957 Similar to bsky.py process_notifications but for X.
1958 """
1959 try:
1960 # Get username for fetching mentions
1961 user_info = x_client._make_request("/users/me", params={"user.fields": "username"})
1962 if not user_info or "data" not in user_info:
1963 logger.error("Could not get username for X mentions")
1964 return
1965
1966 username = user_info["data"]["username"]
1967
1968 # Fetch and queue new mentions
1969 new_count = fetch_and_queue_mentions(username)
1970
1971 if new_count > 0:
1972 logger.info(f"Found {new_count} new X mentions to process")
1973
1974 # Process the entire queue
1975 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
1976
1977 except Exception as e:
1978 logger.error(f"Error processing X notifications: {e}")
1979
1980def periodic_user_block_cleanup(client, agent_id: str) -> None:
1981 """
1982 Detach all user blocks from the agent to prevent memory bloat.
1983 This should be called periodically to ensure clean state.
1984 """
1985 try:
1986 # Get all blocks attached to the agent
1987 attached_blocks = client.agents.blocks.list(agent_id=agent_id)
1988
1989 user_blocks_to_detach = []
1990 for block in attached_blocks:
1991 if hasattr(block, 'label') and block.label.startswith('user_'):
1992 user_blocks_to_detach.append({
1993 'label': block.label,
1994 'id': block.id
1995 })
1996
1997 if not user_blocks_to_detach:
1998 logger.debug("No user blocks found to detach during periodic cleanup")
1999 return
2000
2001 logger.info(f"Found {len(user_blocks_to_detach)} user blocks to detach")
2002
2003 # Detach each user block
2004 for block in user_blocks_to_detach:
2005 try:
2006 client.agents.blocks.detach(
2007 agent_id=agent_id,
2008 block_id=block['id']
2009 )
2010 logger.debug(f"Detached user block: {block['label']}")
2011 except Exception as e:
2012 logger.error(f"Failed to detach user block {block['label']}: {e}")
2013
2014 logger.info(f"Periodic cleanup complete: detached {len(user_blocks_to_detach)} user blocks")
2015
2016 except Exception as e:
2017 logger.error(f"Error during periodic user block cleanup: {e}")
2018
2019def initialize_x_void():
2020 """Initialize the void agent for X operations."""
2021 logger.info("Starting void agent initialization for X...")
2022
2023 from config_loader import get_letta_config
2024 from letta_client import Letta
2025
2026 # Get config
2027 config = get_letta_config()
2028 client = Letta(token=config['api_key'], timeout=config['timeout'])
2029 agent_id = config['agent_id']
2030
2031 try:
2032 void_agent = client.agents.retrieve(agent_id=agent_id)
2033 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})")
2034 except Exception as e:
2035 logger.error(f"Failed to load void agent {agent_id}: {e}")
2036 raise e
2037
2038 # Clean up all user blocks at startup
2039 logger.info("🧹 Cleaning up user blocks at X startup...")
2040 periodic_user_block_cleanup(client, agent_id)
2041
2042 # Ensure correct tools are attached for X
2043 logger.info("Configuring tools for X platform...")
2044 try:
2045 from tool_manager import ensure_platform_tools
2046 ensure_platform_tools('x', void_agent.id)
2047 except Exception as e:
2048 logger.error(f"Failed to configure platform tools: {e}")
2049 logger.warning("Continuing with existing tool configuration")
2050
2051 # Log agent details
2052 logger.info(f"X Void agent details - ID: {void_agent.id}")
2053 logger.info(f"Agent name: {void_agent.name}")
2054
2055 return void_agent
2056
2057def send_x_synthesis_message(letta_client, agent_id):
2058 """
2059 Send a simple synthesis message to the X agent.
2060 Simplified version for X bot without atproto dependencies.
2061 """
2062 try:
2063 from datetime import date
2064 today = date.today()
2065
2066 synthesis_prompt = f"""Time for synthesis and reflection on your X (Twitter) experiences.
2067
2068Today's date: {today.strftime('%B %d, %Y')}
2069
2070Please reflect on:
20711. Recent X interactions and conversations you've had
20722. Patterns you've observed in X discussions
20733. Your evolving understanding of the X platform dynamics
20744. Notable exchanges or insights from X users
20755. How your presence on X is developing
2076
2077Use your memory tools to record significant insights and observations."""
2078
2079 logger.info("🧠 Sending X synthesis message to agent")
2080
2081 # Send simple synthesis message
2082 response = letta_client.agents.messages.create(
2083 agent_id=agent_id,
2084 messages=[{"role": "user", "content": synthesis_prompt}],
2085 max_steps=50
2086 )
2087
2088 logger.info("✅ X synthesis message completed successfully")
2089 return True
2090
2091 except Exception as e:
2092 logger.error(f"Error sending X synthesis message: {e}")
2093 return False
2094
2095def x_main_loop(testing_mode=False, cleanup_interval=10, synthesis_interval=600, synthesis_only=False):
2096 """
2097 Main X bot loop that continuously monitors for mentions and processes them.
2098 Similar to bsky.py main() but for X/Twitter.
2099
2100 Args:
2101 testing_mode: If True, don't actually post to X
2102 cleanup_interval: Run user block cleanup every N cycles (0 to disable)
2103 synthesis_interval: Send synthesis message every N seconds (0 to disable)
2104 synthesis_only: If True, only send synthesis messages (no notification processing)
2105 """
2106 import time
2107 from time import sleep
2108 from config_loader import get_letta_config
2109 from letta_client import Letta
2110 # We'll create our own X synthesis function instead
2111
2112 logger.info("=== STARTING X VOID BOT ===")
2113
2114 # Initialize void agent
2115 void_agent = initialize_x_void()
2116 logger.info(f"X void agent initialized: {void_agent.id}")
2117
2118 # Initialize X client
2119 x_client = create_x_client()
2120 logger.info("Connected to X API")
2121
2122 # Get Letta client for periodic cleanup
2123 config = get_letta_config()
2124 letta_client = Letta(token=config['api_key'], timeout=config['timeout'])
2125
2126 # Main loop
2127 FETCH_DELAY_SEC = 120 # Check every 2 minutes for X mentions (reduced from 60s to conserve API calls)
2128 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds")
2129
2130 if testing_mode:
2131 logger.info("=== RUNNING IN X TESTING MODE ===")
2132 logger.info(" - No messages will be sent to X")
2133 logger.info(" - Queue files will not be deleted")
2134
2135 if cleanup_interval > 0:
2136 logger.info(f"User block cleanup enabled every {cleanup_interval} cycles")
2137 else:
2138 logger.info("User block cleanup disabled")
2139
2140 if synthesis_interval > 0:
2141 logger.info(f"Synthesis messages enabled every {synthesis_interval} seconds ({synthesis_interval/60:.1f} minutes)")
2142 else:
2143 logger.info("Synthesis messages disabled")
2144
2145 # Synthesis-only mode
2146 if synthesis_only:
2147 if synthesis_interval <= 0:
2148 logger.error("Synthesis-only mode requires --synthesis-interval > 0")
2149 return
2150
2151 logger.info(f"Starting X synthesis-only mode, interval: {synthesis_interval} seconds ({synthesis_interval/60:.1f} minutes)")
2152
2153 while True:
2154 try:
2155 # Send synthesis message immediately on first run
2156 logger.info("🧠 Sending X synthesis message")
2157 send_x_synthesis_message(letta_client, void_agent.id)
2158
2159 # Wait for next interval
2160 logger.info(f"Waiting {synthesis_interval} seconds until next synthesis...")
2161 sleep(synthesis_interval)
2162
2163 except KeyboardInterrupt:
2164 logger.info("=== X SYNTHESIS MODE STOPPED BY USER ===")
2165 break
2166 except Exception as e:
2167 logger.error(f"Error in X synthesis loop: {e}")
2168 logger.info(f"Sleeping for {synthesis_interval} seconds due to error...")
2169 sleep(synthesis_interval)
2170
2171 cycle_count = 0
2172 start_time = time.time()
2173 last_synthesis_time = time.time()
2174
2175 while True:
2176 try:
2177 cycle_count += 1
2178 logger.info(f"=== X CYCLE {cycle_count} ===")
2179
2180 # Process X notifications (fetch, queue, and process)
2181 process_x_notifications(void_agent, x_client, testing_mode)
2182
2183 # Run periodic cleanup every N cycles
2184 if cleanup_interval > 0 and cycle_count % cleanup_interval == 0:
2185 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})")
2186 periodic_user_block_cleanup(letta_client, void_agent.id)
2187
2188 # Check if synthesis interval has passed
2189 if synthesis_interval > 0:
2190 current_time = time.time()
2191 if current_time - last_synthesis_time >= synthesis_interval:
2192 logger.info(f"⏰ {synthesis_interval/60:.1f} minutes have passed, triggering X synthesis")
2193 send_x_synthesis_message(letta_client, void_agent.id)
2194 last_synthesis_time = current_time
2195
2196 # Log cycle completion
2197 elapsed_time = time.time() - start_time
2198 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes")
2199
2200 sleep(FETCH_DELAY_SEC)
2201
2202 except KeyboardInterrupt:
2203 elapsed_time = time.time() - start_time
2204 logger.info("=== X BOT STOPPED BY USER ===")
2205 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes")
2206 break
2207 except Exception as e:
2208 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===")
2209 logger.error(f"Error details: {e}")
2210 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...")
2211 sleep(FETCH_DELAY_SEC * 2)
2212
2213def process_queue_only(testing_mode=False):
2214 """
2215 Process all queued X mentions without fetching new ones.
2216 Useful for rate limit management - queue first, then process separately.
2217
2218 Args:
2219 testing_mode: If True, don't actually post to X and keep queue files
2220 """
2221 logger.info("=== PROCESSING X QUEUE ONLY ===")
2222
2223 if testing_mode:
2224 logger.info("=== RUNNING IN X TESTING MODE ===")
2225 logger.info(" - No messages will be sent to X")
2226 logger.info(" - Queue files will not be deleted")
2227
2228 try:
2229 # Initialize void agent
2230 void_agent = initialize_x_void()
2231 logger.info(f"X void agent initialized: {void_agent.id}")
2232
2233 # Initialize X client
2234 x_client = create_x_client()
2235 logger.info("Connected to X API")
2236
2237 # Process the queue without fetching new mentions
2238 logger.info("Processing existing X queue...")
2239 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
2240
2241 logger.info("=== X QUEUE PROCESSING COMPLETE ===")
2242
2243 except Exception as e:
2244 logger.error(f"Error processing X queue: {e}")
2245 raise
2246
2247def x_notification_loop():
2248 """
2249 DEPRECATED: Old X notification loop using search-based mention detection.
2250 Use x_main_loop() instead for the full bot experience.
2251 """
2252 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.")
2253 x_main_loop()
2254
2255if __name__ == "__main__":
2256 import sys
2257 import argparse
2258
2259 if len(sys.argv) > 1:
2260 if sys.argv[1] == "bot":
2261 # Main bot with optional --test flag and cleanup interval
2262 parser = argparse.ArgumentParser(description='X Void Bot')
2263 parser.add_argument('command', choices=['bot'])
2264 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)')
2265 parser.add_argument('--cleanup-interval', type=int, default=10,
2266 help='Run user block cleanup every N cycles (default: 10, 0 to disable)')
2267 parser.add_argument('--synthesis-interval', type=int, default=600,
2268 help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)')
2269 parser.add_argument('--synthesis-only', action='store_true',
2270 help='Run in synthesis-only mode (only send synthesis messages, no notification processing)')
2271 args = parser.parse_args()
2272 x_main_loop(testing_mode=args.test, cleanup_interval=args.cleanup_interval,
2273 synthesis_interval=args.synthesis_interval, synthesis_only=args.synthesis_only)
2274 elif sys.argv[1] == "loop":
2275 x_notification_loop()
2276 elif sys.argv[1] == "reply":
2277 reply_to_cameron_post()
2278 elif sys.argv[1] == "me":
2279 get_my_user_info()
2280 elif sys.argv[1] == "search":
2281 test_search_mentions()
2282 elif sys.argv[1] == "queue":
2283 test_fetch_and_queue()
2284 elif sys.argv[1] == "thread":
2285 test_thread_context()
2286 elif sys.argv[1] == "process":
2287 # Process all queued mentions with optional --test flag
2288 testing_mode = "--test" in sys.argv
2289 process_queue_only(testing_mode=testing_mode)
2290 elif sys.argv[1] == "letta":
2291 # Use specific agent ID if provided, otherwise use from config
2292 agent_id = sys.argv[2] if len(sys.argv) > 2 else None
2293 test_letta_integration(agent_id)
2294 elif sys.argv[1] == "downrank":
2295 # View or manage downrank list
2296 if len(sys.argv) > 2 and sys.argv[2] == "list":
2297 downrank_users = load_downrank_users()
2298 if downrank_users:
2299 print(f"📋 Downrank users ({len(downrank_users)} total):")
2300 for user_id in sorted(downrank_users):
2301 print(f" - {user_id}")
2302 else:
2303 print("📋 No downrank users configured")
2304 else:
2305 print("Usage: python x.py downrank list")
2306 print(" list - Show all downranked user IDs")
2307 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list")
2308 else:
2309 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]")
2310 print(" bot - Run the main X bot (use --test for testing mode)")
2311 print(" Example: python x.py bot --test")
2312 print(" queue - Fetch and queue mentions only (no processing)")
2313 print(" process - Process all queued mentions only (no fetching)")
2314 print(" Example: python x.py process --test")
2315 print(" downrank - Manage downrank users (10% response rate)")
2316 print(" Example: python x.py downrank list")
2317 print(" loop - Run the old notification monitoring loop (deprecated)")
2318 print(" reply - Reply to Cameron's specific post")
2319 print(" me - Get authenticated user info and correct user ID")
2320 print(" search - Test search-based mention detection")
2321 print(" thread - Test thread context retrieval from queued mention")
2322 print(" letta - Test sending thread context to Letta agent")
2323 print(" Optional: python x.py letta <agent-id>")
2324 else:
2325 test_x_client()