a digital person for bluesky
1import os
2import logging
3import requests
4import yaml
5import json
6import hashlib
7import random
8import time
9from typing import Optional, Dict, Any, List, Set
10from datetime import datetime
11from pathlib import Path
12from requests_oauthlib import OAuth1
13from rich import print as rprint
14from rich.panel import Panel
15from rich.text import Text
16
17import bsky_utils
18
19class XRateLimitError(Exception):
20 """Exception raised when X API rate limit is exceeded"""
21 pass
22
23
24# Configure logging (will be updated by setup_logging_from_config if called)
25logging.basicConfig(
26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
27)
28logger = logging.getLogger("x_client")
29
30def setup_logging_from_config(config_path: str = "x_config.yaml"):
31 """Configure logging based on x_config.yaml settings."""
32 try:
33 config = load_x_config(config_path)
34 logging_config = config.get('logging', {})
35 log_level = logging_config.get('level', 'INFO').upper()
36
37 # Convert string level to logging constant
38 numeric_level = getattr(logging, log_level, logging.INFO)
39
40 # Update the root logger level
41 logging.getLogger().setLevel(numeric_level)
42 # Update our specific logger level
43 logger.setLevel(numeric_level)
44
45 logger.info(f"Logging level set to {log_level}")
46
47 except Exception as e:
48 logger.warning(f"Failed to configure logging from config: {e}, using default INFO level")
49
50# X-specific file paths
51X_QUEUE_DIR = Path("x_queue")
52X_CACHE_DIR = Path("x_cache")
53X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json")
54X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json")
55X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt")
56
57class XClient:
58 """X (Twitter) API client for fetching mentions and managing interactions."""
59
60 def __init__(self, api_key: str, user_id: str, access_token: str = None,
61 consumer_key: str = None, consumer_secret: str = None,
62 access_token_secret: str = None):
63 self.api_key = api_key
64 self.access_token = access_token
65 self.user_id = user_id
66 self.base_url = "https://api.x.com/2"
67
68 # Check if we have OAuth 1.0a credentials
69 if (consumer_key and consumer_secret and access_token and access_token_secret):
70 # Use OAuth 1.0a for User Context
71 self.oauth = OAuth1(
72 consumer_key,
73 client_secret=consumer_secret,
74 resource_owner_key=access_token,
75 resource_owner_secret=access_token_secret
76 )
77 self.headers = {"Content-Type": "application/json"}
78 self.auth_method = "oauth1a"
79 logger.info("Using OAuth 1.0a User Context authentication for X API")
80 elif access_token:
81 # Use OAuth 2.0 Bearer token for User Context
82 self.oauth = None
83 self.headers = {
84 "Authorization": f"Bearer {access_token}",
85 "Content-Type": "application/json"
86 }
87 self.auth_method = "oauth2_user"
88 logger.info("Using OAuth 2.0 User Context access token for X API")
89 else:
90 # Use Application-Only Bearer token
91 self.oauth = None
92 self.headers = {
93 "Authorization": f"Bearer {api_key}",
94 "Content-Type": "application/json"
95 }
96 self.auth_method = "bearer"
97 logger.info("Using Application-Only Bearer token for X API")
98
99 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None, max_retries: int = 3) -> Optional[Dict]:
100 """Make a request to the X API with proper error handling and exponential backoff."""
101 url = f"{self.base_url}{endpoint}"
102
103 # Log the specific API call being made
104 logger.debug(f"Making X API request: {method} {endpoint}")
105
106 for attempt in range(max_retries):
107 try:
108 if method.upper() == "GET":
109 if self.oauth:
110 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth)
111 else:
112 response = requests.get(url, headers=self.headers, params=params)
113 elif method.upper() == "POST":
114 if self.oauth:
115 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth)
116 else:
117 response = requests.post(url, headers=self.headers, json=data)
118 else:
119 raise ValueError(f"Unsupported HTTP method: {method}")
120
121 response.raise_for_status()
122 return response.json()
123
124 except requests.exceptions.HTTPError as e:
125 if response.status_code == 401:
126 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials")
127 logger.error(f"Response: {response.text}")
128 return None # Don't retry auth failures
129 elif response.status_code == 403:
130 logger.error(f"X API forbidden with {self.auth_method} - check app permissions")
131 logger.error(f"Response: {response.text}")
132 return None # Don't retry permission failures
133 elif response.status_code == 429:
134 if attempt < max_retries - 1:
135 # Exponential backoff: 60s, 120s, 240s
136 backoff_time = 60 * (2 ** attempt)
137 logger.warning(f"X API rate limit exceeded on {method} {endpoint} (attempt {attempt + 1}/{max_retries}) - waiting {backoff_time}s before retry")
138 logger.error(f"Response: {response.text}")
139 time.sleep(backoff_time)
140 continue
141 else:
142 logger.error(f"X API rate limit exceeded on {method} {endpoint} - max retries reached")
143 logger.error(f"Response: {response.text}")
144 raise XRateLimitError(f"X API rate limit exceeded on {method} {endpoint}")
145 else:
146 if attempt < max_retries - 1:
147 # Exponential backoff for other HTTP errors too
148 backoff_time = 30 * (2 ** attempt)
149 logger.warning(f"X API request failed (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s")
150 logger.error(f"Response: {response.text}")
151 time.sleep(backoff_time)
152 continue
153 else:
154 logger.error(f"X API request failed after {max_retries} attempts: {e}")
155 logger.error(f"Response: {response.text}")
156 return None
157
158 except Exception as e:
159 if attempt < max_retries - 1:
160 backoff_time = 15 * (2 ** attempt)
161 logger.warning(f"Unexpected error making X API request (attempt {attempt + 1}/{max_retries}): {e} - retrying in {backoff_time}s")
162 time.sleep(backoff_time)
163 continue
164 else:
165 logger.error(f"Unexpected error making X API request after {max_retries} attempts: {e}")
166 return None
167
168 return None
169
170 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]:
171 """
172 Fetch mentions for the configured user.
173
174 Args:
175 since_id: Minimum Post ID to include (for getting newer mentions)
176 max_results: Number of results to return (5-100)
177
178 Returns:
179 List of mention objects or None if request failed
180 """
181 endpoint = f"/users/{self.user_id}/mentions"
182 params = {
183 "max_results": min(max(max_results, 5), 100), # Ensure within API limits
184 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets",
185 "user.fields": "id,name,username",
186 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
187 }
188
189 if since_id:
190 params["since_id"] = since_id
191
192 logger.info(f"Fetching mentions for user {self.user_id}")
193 response = self._make_request(endpoint, params)
194
195 if response:
196 logger.debug(f"X API response: {response}")
197
198 if response and "data" in response:
199 mentions = response["data"]
200 logger.info(f"Retrieved {len(mentions)} mentions")
201 return mentions
202 else:
203 if response:
204 logger.info(f"No mentions in response. Full response: {response}")
205 else:
206 logger.warning("Request failed - no response received")
207 return []
208
209 def get_user_info(self, user_id: str) -> Optional[Dict]:
210 """Get information about a specific user."""
211 endpoint = f"/users/{user_id}"
212 params = {
213 "user.fields": "id,name,username,description,public_metrics"
214 }
215
216 response = self._make_request(endpoint, params)
217 return response.get("data") if response else None
218
219 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]:
220 """
221 Search for mentions using the search endpoint instead of mentions endpoint.
222 This might have better rate limits than the direct mentions endpoint.
223
224 Args:
225 username: Username to search for mentions of (without @)
226 max_results: Number of results to return (10-100)
227 since_id: Only return results newer than this tweet ID
228
229 Returns:
230 List of tweets mentioning the username
231 """
232 endpoint = "/tweets/search/recent"
233
234 # Search for mentions of the username
235 query = f"@{username}"
236
237 params = {
238 "query": query,
239 "max_results": min(max(max_results, 10), 100),
240 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
241 "user.fields": "id,name,username",
242 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
243 }
244
245 if since_id:
246 params["since_id"] = since_id
247
248 logger.info(f"Searching for mentions of @{username}")
249 response = self._make_request(endpoint, params)
250
251 if response and "data" in response:
252 tweets = response["data"]
253 logger.info(f"Found {len(tweets)} mentions via search")
254 return tweets
255 else:
256 if response:
257 logger.info(f"No mentions found via search. Response: {response}")
258 else:
259 logger.warning("Search request failed")
260 return []
261
262 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]:
263 """
264 Get all tweets in a conversation thread up to a specific tweet ID.
265
266 Args:
267 conversation_id: The conversation ID to fetch (should be the original tweet ID)
268 use_cache: Whether to use cached data if available
269 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID)
270
271 Returns:
272 List of tweets in the conversation, ordered chronologically
273 """
274 # Check cache first if enabled
275 if use_cache:
276 cached_data = get_cached_thread_context(conversation_id)
277 if cached_data:
278 return cached_data
279
280 # First, get the original tweet directly since it might not appear in conversation search
281 logger.debug(f"Getting thread context for conversation {conversation_id}")
282 original_tweet = None
283 try:
284 endpoint = f"/tweets/{conversation_id}"
285 params = {
286 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
287 "user.fields": "id,name,username",
288 "expansions": "author_id"
289 }
290 logger.debug(f"Fetching original tweet: GET {endpoint}")
291 response = self._make_request(endpoint, params)
292 if response and "data" in response:
293 original_tweet = response["data"]
294 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}")
295 except Exception as e:
296 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}")
297
298 # Then search for all tweets in this conversation
299 endpoint = "/tweets/search/recent"
300 params = {
301 "query": f"conversation_id:{conversation_id}",
302 "max_results": 100, # Get as many as possible
303 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
304 "user.fields": "id,name,username",
305 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id",
306 "sort_order": "recency" # Get newest first, we'll reverse later
307 }
308
309 # Add until_id parameter to exclude tweets after the mention being processed
310 if until_id:
311 params["until_id"] = until_id
312 logger.info(f"Using until_id={until_id} to exclude future tweets")
313
314 logger.info(f"Fetching thread context for conversation {conversation_id}")
315 logger.debug(f"Searching conversation: GET {endpoint} with query={params['query']}")
316 response = self._make_request(endpoint, params)
317
318 tweets = []
319 users_data = {}
320
321 # Collect tweets from search
322 if response and "data" in response:
323 tweets.extend(response["data"])
324 # Store user data for reference
325 if "includes" in response and "users" in response["includes"]:
326 for user in response["includes"]["users"]:
327 users_data[user["id"]] = user
328
329 # Add original tweet if we got it and it's not already in the list
330 if original_tweet:
331 tweet_ids = [t.get('id') for t in tweets]
332 if original_tweet.get('id') not in tweet_ids:
333 tweets.append(original_tweet)
334 logger.info("Added original tweet to thread context")
335
336 # Attempt to fill gaps by fetching referenced tweets that are missing
337 # This helps with X API's incomplete conversation search results
338 tweet_ids = set(t.get('id') for t in tweets)
339 missing_tweet_ids = set()
340 critical_missing_ids = set()
341
342 # Collect referenced tweet IDs, prioritizing critical ones
343 for tweet in tweets:
344 referenced_tweets = tweet.get('referenced_tweets', [])
345 for ref in referenced_tweets:
346 ref_id = ref.get('id')
347 ref_type = ref.get('type')
348 if ref_id and ref_id not in tweet_ids:
349 missing_tweet_ids.add(ref_id)
350 # Prioritize direct replies and quoted tweets over retweets
351 if ref_type in ['replied_to', 'quoted']:
352 critical_missing_ids.add(ref_id)
353
354 # For rate limit efficiency, only fetch critical missing tweets if we have many
355 if len(missing_tweet_ids) > 10:
356 logger.info(f"Many missing tweets ({len(missing_tweet_ids)}), prioritizing {len(critical_missing_ids)} critical ones")
357 missing_tweet_ids = critical_missing_ids
358
359 # Context sufficiency check - skip backfill if we already have enough context
360 if has_sufficient_context(tweets, missing_tweet_ids):
361 logger.info("Thread has sufficient context, skipping missing tweet backfill")
362 missing_tweet_ids = set()
363
364 # Fetch missing referenced tweets in batches (more rate-limit friendly)
365 if missing_tweet_ids:
366 missing_list = list(missing_tweet_ids)
367
368 # First, check cache for missing tweets
369 cached_tweets = get_cached_tweets(missing_list)
370 for tweet_id, cached_tweet in cached_tweets.items():
371 if cached_tweet.get('conversation_id') == conversation_id:
372 tweets.append(cached_tweet)
373 tweet_ids.add(tweet_id)
374 logger.info(f"Retrieved missing tweet from cache: {tweet_id}")
375
376 # Add user data if available in cache
377 if cached_tweet.get('author_info'):
378 author_id = cached_tweet.get('author_id')
379 if author_id:
380 users_data[author_id] = cached_tweet['author_info']
381
382 # Only fetch tweets that weren't found in cache
383 uncached_ids = [tid for tid in missing_list if tid not in cached_tweets]
384
385 if uncached_ids:
386 batch_size = 100 # X API limit for bulk tweet lookup
387
388 for i in range(0, len(uncached_ids), batch_size):
389 batch_ids = uncached_ids[i:i + batch_size]
390 try:
391 endpoint = "/tweets"
392 params = {
393 "ids": ",".join(batch_ids),
394 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
395 "user.fields": "id,name,username",
396 "expansions": "author_id"
397 }
398 logger.debug(f"Batch fetching missing tweets: GET {endpoint} (ids: {len(batch_ids)} tweets)")
399 response = self._make_request(endpoint, params)
400
401 if response and "data" in response:
402 fetched_tweets = []
403 batch_users_data = {}
404
405 for missing_tweet in response["data"]:
406 # Only add if it's actually part of this conversation
407 if missing_tweet.get('conversation_id') == conversation_id:
408 tweets.append(missing_tweet)
409 tweet_ids.add(missing_tweet.get('id'))
410 fetched_tweets.append(missing_tweet)
411 logger.info(f"Retrieved missing referenced tweet: {missing_tweet.get('id')}")
412
413 # Add user data if available
414 if "includes" in response and "users" in response["includes"]:
415 for user in response["includes"]["users"]:
416 users_data[user["id"]] = user
417 batch_users_data[user["id"]] = user
418
419 # Cache the newly fetched tweets
420 if fetched_tweets:
421 save_cached_tweets(fetched_tweets, batch_users_data)
422
423 logger.info(f"Batch fetched {len(response['data'])} missing tweets from {len(batch_ids)} requested")
424
425 # Handle partial success - log any missing tweets that weren't found
426 if response and "errors" in response:
427 for error in response["errors"]:
428 logger.warning(f"Could not fetch tweet {error.get('resource_id')}: {error.get('title')}")
429
430 except Exception as e:
431 logger.warning(f"Could not fetch batch of missing tweets {batch_ids[:3]}...: {e}")
432 else:
433 logger.info(f"All {len(missing_list)} missing tweets found in cache")
434
435 if tweets:
436 # Filter out tweets that occur after until_id (if specified)
437 if until_id:
438 original_count = len(tweets)
439 # Convert until_id to int for comparison (Twitter IDs are sequential)
440 until_id_int = int(until_id)
441 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int]
442 filtered_count = len(tweets)
443 if original_count != filtered_count:
444 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}")
445
446 # Sort chronologically (oldest first)
447 tweets.sort(key=lambda x: x.get('created_at', ''))
448 logger.info(f"Retrieved {len(tweets)} tweets in thread")
449
450 thread_data = {"tweets": tweets, "users": users_data}
451
452 # Cache individual tweets from the thread for future backfill
453 save_cached_tweets(tweets, users_data)
454
455 # Cache the result
456 if use_cache:
457 save_cached_thread_context(conversation_id, thread_data)
458
459 return thread_data
460 else:
461 logger.warning("No tweets found for thread context")
462 return None
463
464 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]:
465 """
466 Post a reply to a specific tweet.
467
468 Args:
469 reply_text: The text content of the reply
470 in_reply_to_tweet_id: The ID of the tweet to reply to
471
472 Returns:
473 Response data if successful, None if failed
474 """
475 endpoint = "/tweets"
476
477 payload = {
478 "text": reply_text,
479 "reply": {
480 "in_reply_to_tweet_id": in_reply_to_tweet_id
481 }
482 }
483
484 logger.info(f"Attempting to post reply with {self.auth_method} authentication")
485 logger.debug(f"Posting reply: POST {endpoint}")
486 result = self._make_request(endpoint, method="POST", data=payload)
487
488 if result:
489 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}")
490 return result
491 else:
492 logger.error("Failed to post reply")
493 return None
494
495 def post_tweet(self, tweet_text: str) -> Optional[Dict]:
496 """
497 Post a standalone tweet (not a reply).
498
499 Args:
500 tweet_text: The text content of the tweet (max 280 characters)
501
502 Returns:
503 Response data if successful, None if failed
504 """
505 endpoint = "/tweets"
506
507 # Validate tweet length
508 if len(tweet_text) > 280:
509 logger.error(f"Tweet text too long: {len(tweet_text)} characters (max 280)")
510 return None
511
512 payload = {
513 "text": tweet_text
514 }
515
516 logger.info(f"Attempting to post tweet with {self.auth_method} authentication")
517 logger.debug(f"Posting tweet: POST {endpoint}")
518 result = self._make_request(endpoint, method="POST", data=payload)
519
520 if result:
521 logger.info(f"Successfully posted tweet")
522 return result
523 else:
524 logger.error("Failed to post tweet")
525 return None
526
527 def get_user_info(self, fields: Optional[str] = None) -> Optional[Dict]:
528 """
529 Get the authenticated user's information, using cached data when available.
530 This reduces API calls significantly since user info rarely changes.
531
532 Args:
533 fields: Optional comma-separated list of user fields to fetch
534
535 Returns:
536 User data dict if successful, None if failed
537 """
538 # First try to get from cache
539 cached_user_info = get_cached_user_info()
540 if cached_user_info:
541 # Check if cached data has all requested fields
542 requested_fields = set(fields.split(',') if fields else ['id', 'username', 'name'])
543 cached_fields = set(cached_user_info.keys())
544 if requested_fields.issubset(cached_fields):
545 return cached_user_info
546
547 # Cache miss, expired, or missing requested fields - fetch from API
548 logger.debug("Fetching fresh user info from /users/me API")
549 endpoint = "/users/me"
550 params = {"user.fields": fields or "id,username,name,description"}
551
552 response = self._make_request(endpoint, params=params)
553 if response and "data" in response:
554 user_data = response["data"]
555 # Cache the result for future use
556 save_cached_user_info(user_data)
557 return user_data
558 else:
559 logger.error("Failed to get user info from /users/me API")
560 return None
561
562 def get_username(self) -> Optional[str]:
563 """
564 Get the authenticated user's username, using cached data when available.
565 This reduces API calls significantly since username rarely changes.
566
567 Returns:
568 Username string if successful, None if failed
569 """
570 user_info = self.get_user_info("id,username,name")
571 return user_info.get("username") if user_info else None
572
573def load_x_config(config_path: str = "x_config.yaml") -> Dict[str, Any]:
574 """Load complete X configuration from x_config.yaml."""
575 try:
576 with open(config_path, 'r') as f:
577 config = yaml.safe_load(f)
578
579 if not config:
580 raise ValueError(f"Empty or invalid configuration file: {config_path}")
581
582 # Validate required sections
583 x_config = config.get('x', {})
584 letta_config = config.get('letta', {})
585
586 if not x_config.get('api_key') or not x_config.get('user_id'):
587 raise ValueError("X API key and user_id must be configured in x_config.yaml")
588
589 if not letta_config.get('api_key') or not letta_config.get('agent_id'):
590 raise ValueError("Letta API key and agent_id must be configured in x_config.yaml")
591
592 return config
593 except Exception as e:
594 logger.error(f"Failed to load X configuration: {e}")
595 raise
596
597def get_x_letta_config(config_path: str = "x_config.yaml") -> Dict[str, Any]:
598 """Get Letta configuration from X config file."""
599 config = load_x_config(config_path)
600 return config['letta']
601
602def create_x_client(config_path: str = "x_config.yaml") -> XClient:
603 """Create and return an X client with configuration loaded from file."""
604 config = load_x_config(config_path)
605 x_config = config['x']
606 return XClient(
607 api_key=x_config['api_key'],
608 user_id=x_config['user_id'],
609 access_token=x_config.get('access_token'),
610 consumer_key=x_config.get('consumer_key'),
611 consumer_secret=x_config.get('consumer_secret'),
612 access_token_secret=x_config.get('access_token_secret')
613 )
614
615def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str:
616 """
617 Convert a mention object to a YAML string for better AI comprehension.
618 Similar to thread_to_yaml_string in bsky_utils.py
619 """
620 # Extract relevant fields
621 simplified_mention = {
622 'id': mention.get('id'),
623 'text': mention.get('text'),
624 'author_id': mention.get('author_id'),
625 'created_at': mention.get('created_at'),
626 'in_reply_to_user_id': mention.get('in_reply_to_user_id')
627 }
628
629 # Add user information if available
630 if users_data and mention.get('author_id') in users_data:
631 user = users_data[mention.get('author_id')]
632 simplified_mention['author'] = {
633 'username': user.get('username'),
634 'name': user.get('name')
635 }
636
637 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False)
638
639def thread_to_yaml_string(thread_data: Dict) -> str:
640 """
641 Convert X thread context to YAML string for AI comprehension.
642 Similar to Bluesky's thread_to_yaml_string function.
643
644 Args:
645 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context()
646
647 Returns:
648 YAML string representation of the thread
649 """
650 if not thread_data or "tweets" not in thread_data:
651 return "conversation: []\n"
652
653 tweets = thread_data["tweets"]
654 users_data = thread_data.get("users", {})
655
656 simplified_thread = {
657 "conversation": []
658 }
659
660 for tweet in tweets:
661 # Get user info
662 author_id = tweet.get('author_id')
663 author_info = {}
664 if author_id and author_id in users_data:
665 user = users_data[author_id]
666 author_info = {
667 'username': user.get('username'),
668 'name': user.get('name')
669 }
670
671 # Build tweet object (simplified for AI consumption)
672 tweet_obj = {
673 'text': tweet.get('text'),
674 'created_at': tweet.get('created_at'),
675 'author': author_info,
676 'author_id': author_id # Include user ID for block management
677 }
678
679 simplified_thread["conversation"].append(tweet_obj)
680
681 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False)
682
683
684def ensure_x_user_blocks_attached(thread_data: Dict, agent_id: Optional[str] = None) -> None:
685 """
686 Ensure all users in the thread have their X user blocks attached.
687 Creates blocks with initial content including their handle if they don't exist.
688
689 Args:
690 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context()
691 agent_id: The Letta agent ID to attach blocks to (defaults to config agent_id)
692 """
693 if not thread_data or "users" not in thread_data:
694 return
695
696 try:
697 from tools.blocks import attach_x_user_blocks, x_user_note_set
698 from letta_client import Letta
699
700 # Get Letta client and agent_id from X config
701 config = get_x_letta_config()
702 client = Letta(token=config['api_key'], timeout=config['timeout'])
703
704 # Use provided agent_id or get from config
705 if agent_id is None:
706 agent_id = config['agent_id']
707
708 # Get agent info to create a mock agent_state for the functions
709 class MockAgentState:
710 def __init__(self, agent_id):
711 self.id = agent_id
712
713 agent_state = MockAgentState(agent_id)
714
715 users_data = thread_data["users"]
716 user_ids = list(users_data.keys())
717
718 if not user_ids:
719 return
720
721 logger.info(f"Ensuring X user blocks for {len(user_ids)} users: {user_ids}")
722
723 # Get current blocks to check which users already have blocks with content
724 current_blocks = client.agents.blocks.list(agent_id=agent_id)
725 existing_user_blocks = {}
726
727 for block in current_blocks:
728 if block.label.startswith("x_user_"):
729 user_id = block.label.replace("x_user_", "")
730 existing_user_blocks[user_id] = block
731
732 # Attach all user blocks (this will create missing ones with basic content)
733 attach_result = attach_x_user_blocks(user_ids, agent_state)
734 logger.info(f"X user block attachment result: {attach_result}")
735
736 # For newly created blocks, update with user handle information
737 for user_id in user_ids:
738 if user_id not in existing_user_blocks:
739 user_info = users_data[user_id]
740 username = user_info.get('username', 'unknown')
741 name = user_info.get('name', 'Unknown')
742
743 # Set initial content with handle information
744 initial_content = f"# X User: {user_id}\n\n**Handle:** @{username}\n**Name:** {name}\n\nNo additional information about this user yet."
745
746 try:
747 x_user_note_set(user_id, initial_content, agent_state)
748 logger.info(f"Set initial content for X user {user_id} (@{username})")
749 except Exception as e:
750 logger.error(f"Failed to set initial content for X user {user_id}: {e}")
751
752 except Exception as e:
753 logger.error(f"Error ensuring X user blocks: {e}")
754
755
756# X Caching and Queue System Functions
757
758def load_last_seen_id() -> Optional[str]:
759 """Load the last seen mention ID for incremental fetching."""
760 if X_LAST_SEEN_FILE.exists():
761 try:
762 with open(X_LAST_SEEN_FILE, 'r') as f:
763 data = json.load(f)
764 return data.get('last_seen_id')
765 except Exception as e:
766 logger.error(f"Error loading last seen ID: {e}")
767 return None
768
769def save_last_seen_id(mention_id: str):
770 """Save the last seen mention ID."""
771 try:
772 X_QUEUE_DIR.mkdir(exist_ok=True)
773 with open(X_LAST_SEEN_FILE, 'w') as f:
774 json.dump({
775 'last_seen_id': mention_id,
776 'updated_at': datetime.now().isoformat()
777 }, f)
778 logger.debug(f"Saved last seen ID: {mention_id}")
779 except Exception as e:
780 logger.error(f"Error saving last seen ID: {e}")
781
782def load_processed_mentions() -> set:
783 """Load the set of processed mention IDs."""
784 if X_PROCESSED_MENTIONS_FILE.exists():
785 try:
786 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f:
787 data = json.load(f)
788 # Keep only recent entries (last 10000)
789 if len(data) > 10000:
790 data = data[-10000:]
791 save_processed_mentions(set(data))
792 return set(data)
793 except Exception as e:
794 logger.error(f"Error loading processed mentions: {e}")
795 return set()
796
797def save_processed_mentions(processed_set: set):
798 """Save the set of processed mention IDs."""
799 try:
800 X_QUEUE_DIR.mkdir(exist_ok=True)
801 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f:
802 json.dump(list(processed_set), f)
803 except Exception as e:
804 logger.error(f"Error saving processed mentions: {e}")
805
806def load_downrank_users() -> Set[str]:
807 """Load the set of user IDs that should be downranked (responded to 10% of the time)."""
808 try:
809 if not X_DOWNRANK_USERS_FILE.exists():
810 return set()
811
812 downrank_users = set()
813 with open(X_DOWNRANK_USERS_FILE, 'r') as f:
814 for line in f:
815 line = line.strip()
816 # Skip empty lines and comments
817 if line and not line.startswith('#'):
818 downrank_users.add(line)
819
820 logger.info(f"Loaded {len(downrank_users)} downrank users")
821 return downrank_users
822 except Exception as e:
823 logger.error(f"Error loading downrank users: {e}")
824 return set()
825
826def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool:
827 """
828 Check if we should respond to a downranked user.
829 Returns True 10% of the time for downranked users, True 100% of the time for others.
830 """
831 if user_id not in downrank_users:
832 return True
833
834 # 10% chance for downranked users
835 should_respond = random.random() < 0.1
836 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)")
837 return should_respond
838
839def save_mention_to_queue(mention: Dict):
840 """Save a mention to the queue directory for async processing."""
841 try:
842 mention_id = mention.get('id')
843 if not mention_id:
844 logger.error("Mention missing ID, cannot queue")
845 return
846
847 # Check if already processed
848 processed_mentions = load_processed_mentions()
849 if mention_id in processed_mentions:
850 logger.debug(f"Mention {mention_id} already processed, skipping")
851 return
852
853 # Create queue directory
854 X_QUEUE_DIR.mkdir(exist_ok=True)
855
856 # Create filename using hash (similar to Bluesky system)
857 mention_str = json.dumps(mention, sort_keys=True)
858 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16]
859 filename = f"x_mention_{mention_hash}.json"
860
861 queue_file = X_QUEUE_DIR / filename
862
863 # Save mention data with enhanced debugging information
864 mention_data = {
865 'mention': mention,
866 'queued_at': datetime.now().isoformat(),
867 'type': 'x_mention',
868 # Debug info for conversation tracking
869 'debug_info': {
870 'mention_id': mention.get('id'),
871 'author_id': mention.get('author_id'),
872 'conversation_id': mention.get('conversation_id'),
873 'in_reply_to_user_id': mention.get('in_reply_to_user_id'),
874 'referenced_tweets': mention.get('referenced_tweets', []),
875 'text_preview': mention.get('text', '')[:200],
876 'created_at': mention.get('created_at'),
877 'public_metrics': mention.get('public_metrics', {}),
878 'context_annotations': mention.get('context_annotations', [])
879 }
880 }
881
882 with open(queue_file, 'w') as f:
883 json.dump(mention_data, f, indent=2)
884
885 logger.info(f"Queued X mention {mention_id} -> {filename}")
886
887 except Exception as e:
888 logger.error(f"Error saving mention to queue: {e}")
889
890# X Cache Functions
891def get_cached_thread_context(conversation_id: str) -> Optional[Dict]:
892 """Load cached thread context if available."""
893 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
894 if cache_file.exists():
895 try:
896 with open(cache_file, 'r') as f:
897 cached_data = json.load(f)
898 # Check if cache is recent (within 1 hour)
899 from datetime import datetime, timedelta
900 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
901 if datetime.now() - cached_time < timedelta(hours=1):
902 logger.info(f"Using cached thread context for {conversation_id}")
903 return cached_data.get('thread_data')
904 except Exception as e:
905 logger.warning(f"Error loading cached thread context: {e}")
906 return None
907
908def save_cached_thread_context(conversation_id: str, thread_data: Dict):
909 """Save thread context to cache."""
910 try:
911 X_CACHE_DIR.mkdir(exist_ok=True)
912 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
913
914 cache_data = {
915 'conversation_id': conversation_id,
916 'thread_data': thread_data,
917 'cached_at': datetime.now().isoformat()
918 }
919
920 with open(cache_file, 'w') as f:
921 json.dump(cache_data, f, indent=2)
922
923 logger.debug(f"Cached thread context for {conversation_id}")
924 except Exception as e:
925 logger.error(f"Error caching thread context: {e}")
926
927def get_cached_tweets(tweet_ids: List[str]) -> Dict[str, Dict]:
928 """
929 Load cached individual tweets if available.
930 Returns dict mapping tweet_id -> tweet_data for found tweets.
931 """
932 cached_tweets = {}
933
934 for tweet_id in tweet_ids:
935 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json"
936 if cache_file.exists():
937 try:
938 with open(cache_file, 'r') as f:
939 cached_data = json.load(f)
940
941 # Use longer cache times for older tweets (24 hours vs 1 hour)
942 from datetime import datetime, timedelta
943 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
944 tweet_created = cached_data.get('tweet_data', {}).get('created_at', '')
945
946 # Parse tweet creation time to determine age
947 try:
948 from dateutil.parser import parse
949 tweet_age = datetime.now() - parse(tweet_created)
950 cache_duration = timedelta(hours=24) if tweet_age > timedelta(hours=24) else timedelta(hours=1)
951 except:
952 cache_duration = timedelta(hours=1) # Default to 1 hour if parsing fails
953
954 if datetime.now() - cached_time < cache_duration:
955 cached_tweets[tweet_id] = cached_data.get('tweet_data')
956 logger.debug(f"Using cached tweet {tweet_id}")
957
958 except Exception as e:
959 logger.warning(f"Error loading cached tweet {tweet_id}: {e}")
960
961 return cached_tweets
962
963def save_cached_tweets(tweets_data: List[Dict], users_data: Dict[str, Dict] = None):
964 """Save individual tweets to cache for future reuse."""
965 try:
966 X_CACHE_DIR.mkdir(exist_ok=True)
967
968 for tweet in tweets_data:
969 tweet_id = tweet.get('id')
970 if not tweet_id:
971 continue
972
973 cache_file = X_CACHE_DIR / f"tweet_{tweet_id}.json"
974
975 # Include user data if available
976 tweet_with_user = tweet.copy()
977 if users_data and tweet.get('author_id') in users_data:
978 tweet_with_user['author_info'] = users_data[tweet.get('author_id')]
979
980 cache_data = {
981 'tweet_id': tweet_id,
982 'tweet_data': tweet_with_user,
983 'cached_at': datetime.now().isoformat()
984 }
985
986 with open(cache_file, 'w') as f:
987 json.dump(cache_data, f, indent=2)
988
989 logger.debug(f"Cached individual tweet {tweet_id}")
990
991 except Exception as e:
992 logger.error(f"Error caching individual tweets: {e}")
993
994def get_cached_user_info() -> Optional[Dict]:
995 """Load cached user info if available and not expired."""
996 cache_file = X_CACHE_DIR / "user_info.json"
997 if cache_file.exists():
998 try:
999 with open(cache_file, 'r') as f:
1000 cached_data = json.load(f)
1001 # Check if cache is recent (within 24 hours)
1002 from datetime import datetime, timedelta
1003 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
1004 if datetime.now() - cached_time < timedelta(hours=24):
1005 logger.debug("Using cached user info")
1006 return cached_data.get('data')
1007 else:
1008 logger.debug("Cached user info expired (>24 hours old)")
1009 except Exception as e:
1010 logger.warning(f"Error loading cached user info: {e}")
1011 return None
1012
1013def save_cached_user_info(user_data: Dict):
1014 """Save user info to cache."""
1015 try:
1016 X_CACHE_DIR.mkdir(exist_ok=True)
1017 cache_file = X_CACHE_DIR / "user_info.json"
1018
1019 from datetime import datetime
1020 cache_data = {
1021 'data': user_data,
1022 'cached_at': datetime.now().isoformat()
1023 }
1024
1025 with open(cache_file, 'w') as f:
1026 json.dump(cache_data, f, indent=2)
1027
1028 logger.debug(f"Cached user info: {user_data.get('username')}")
1029
1030 except Exception as e:
1031 logger.error(f"Error caching user info: {e}")
1032
1033def has_sufficient_context(tweets: List[Dict], missing_tweet_ids: Set[str]) -> bool:
1034 """
1035 Determine if we have sufficient context to skip backfilling missing tweets.
1036
1037 Args:
1038 tweets: List of tweets already in the thread
1039 missing_tweet_ids: Set of missing tweet IDs we'd like to fetch
1040
1041 Returns:
1042 True if context is sufficient, False if backfill is needed
1043 """
1044 # If no missing tweets, context is sufficient
1045 if not missing_tweet_ids:
1046 return True
1047
1048 # If we have a substantial conversation (5+ tweets), likely sufficient
1049 if len(tweets) >= 5:
1050 logger.debug(f"Thread has {len(tweets)} tweets, considering sufficient")
1051 return True
1052
1053 # If only a few missing tweets and we have some context, might be enough
1054 if len(missing_tweet_ids) <= 2 and len(tweets) >= 3:
1055 logger.debug(f"Only {len(missing_tweet_ids)} missing tweets with {len(tweets)} existing, considering sufficient")
1056 return True
1057
1058 # Check if we have conversational flow (mentions between users)
1059 has_conversation_flow = False
1060 for tweet in tweets:
1061 text = tweet.get('text', '').lower()
1062 # Look for mentions, replies, or conversational indicators
1063 if '@' in text or 'reply' in text or len([t for t in tweets if t.get('author_id') != tweet.get('author_id')]) > 1:
1064 has_conversation_flow = True
1065 break
1066
1067 # If we have clear conversational flow and reasonable length, sufficient
1068 if has_conversation_flow and len(tweets) >= 2:
1069 logger.debug("Thread has conversational flow, considering sufficient")
1070 return True
1071
1072 # Otherwise, we need to backfill
1073 logger.debug(f"Context insufficient: {len(tweets)} tweets, {len(missing_tweet_ids)} missing, no clear flow")
1074 return False
1075
1076def fetch_and_queue_mentions(username: str) -> int:
1077 """
1078 Single-pass function to fetch new mentions and queue them.
1079 Returns number of new mentions found.
1080 """
1081 try:
1082 client = create_x_client()
1083
1084 # Load last seen ID for incremental fetching
1085 last_seen_id = load_last_seen_id()
1086
1087 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}")
1088
1089 # Search for mentions - this calls GET /2/tweets/search/recent
1090 logger.debug(f"Calling search_mentions API for @{username}")
1091 mentions = client.search_mentions(
1092 username=username,
1093 since_id=last_seen_id,
1094 max_results=100 # Get as many as possible
1095 )
1096
1097 if not mentions:
1098 logger.info("No new mentions found")
1099 return 0
1100
1101 # Process mentions (newest first, so reverse to process oldest first)
1102 mentions.reverse()
1103 new_count = 0
1104
1105 for mention in mentions:
1106 save_mention_to_queue(mention)
1107 new_count += 1
1108
1109 # Update last seen ID to the most recent mention
1110 if mentions:
1111 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent
1112 save_last_seen_id(most_recent_id)
1113
1114 logger.info(f"Queued {new_count} new X mentions")
1115 return new_count
1116
1117 except Exception as e:
1118 logger.error(f"Error fetching and queuing mentions: {e}")
1119 return 0
1120
1121# Simple test function
1122def get_my_user_info():
1123 """Get the authenticated user's information to find correct user ID."""
1124 try:
1125 client = create_x_client()
1126
1127 # Get authenticated user info using cached method
1128 print("Fetching authenticated user information...")
1129 user_data = client.get_user_info("id,name,username,description")
1130
1131 if user_data:
1132 print(f"✅ Found authenticated user:")
1133 print(f" ID: {user_data.get('id')}")
1134 print(f" Username: @{user_data.get('username')}")
1135 print(f" Name: {user_data.get('name')}")
1136 print(f" Description: {user_data.get('description', 'N/A')[:100]}...")
1137 print(f"\n🔧 Update your x_config.yaml with:")
1138 print(f" user_id: \"{user_data.get('id')}\"")
1139 return user_data
1140 else:
1141 print("❌ Failed to get user information")
1142 print(f"Response: {response}")
1143 return None
1144
1145 except Exception as e:
1146 print(f"Error getting user info: {e}")
1147 return None
1148
1149def test_search_mentions():
1150 """Test the search-based mention detection."""
1151 try:
1152 client = create_x_client()
1153
1154 # First get our username
1155 username = client.get_username()
1156 if not username:
1157 print("❌ Could not get username")
1158 return
1159 print(f"🔍 Searching for mentions of @{username}")
1160
1161 mentions = client.search_mentions(username, max_results=5)
1162
1163 if mentions:
1164 print(f"✅ Found {len(mentions)} mentions via search:")
1165 for mention in mentions:
1166 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...")
1167 else:
1168 print("No mentions found via search")
1169
1170 except Exception as e:
1171 print(f"Search test failed: {e}")
1172
1173def test_fetch_and_queue():
1174 """Test the single-pass fetch and queue function."""
1175 try:
1176 client = create_x_client()
1177
1178 # Get our username
1179 username = client.get_username()
1180 if not username:
1181 print("❌ Could not get username")
1182 return
1183 print(f"🔄 Fetching and queueing mentions for @{username}")
1184
1185 # Show current state
1186 last_seen = load_last_seen_id()
1187 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}")
1188
1189 # Fetch and queue
1190 new_count = fetch_and_queue_mentions(username)
1191
1192 if new_count > 0:
1193 print(f"✅ Queued {new_count} new mentions")
1194 print(f"📁 Check ./x_queue/ directory for queued mentions")
1195
1196 # Show updated state
1197 new_last_seen = load_last_seen_id()
1198 print(f"📍 Updated last seen ID: {new_last_seen}")
1199 else:
1200 print("ℹ️ No new mentions to queue")
1201
1202 except Exception as e:
1203 print(f"Fetch and queue test failed: {e}")
1204
1205def test_thread_context():
1206 """Test thread context retrieval from a queued mention."""
1207 try:
1208 import json
1209
1210 # Find a queued mention file
1211 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1212 if not queue_files:
1213 print("❌ No queued mentions found. Run 'python x.py queue' first.")
1214 return
1215
1216 # Read the first mention
1217 mention_file = queue_files[0]
1218 with open(mention_file, 'r') as f:
1219 mention_data = json.load(f)
1220
1221 mention = mention_data['mention']
1222 print(f"📄 Using mention: {mention.get('id')}")
1223 print(f"📝 Text: {mention.get('text')}")
1224
1225 # Check if it has a conversation_id
1226 conversation_id = mention.get('conversation_id')
1227 if not conversation_id:
1228 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.")
1229 return
1230
1231 print(f"🧵 Getting thread context for conversation: {conversation_id}")
1232
1233 # Get thread context
1234 client = create_x_client()
1235 thread_data = client.get_thread_context(conversation_id)
1236
1237 if thread_data:
1238 tweets = thread_data.get('tweets', [])
1239 print(f"✅ Retrieved thread with {len(tweets)} tweets")
1240
1241 # Convert to YAML
1242 yaml_thread = thread_to_yaml_string(thread_data)
1243
1244 # Save thread context for inspection
1245 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml"
1246 with open(thread_file, 'w') as f:
1247 f.write(yaml_thread)
1248
1249 print(f"💾 Saved thread context to: {thread_file}")
1250 print("\n📋 Thread preview:")
1251 print(yaml_thread)
1252 else:
1253 print("❌ Failed to retrieve thread context")
1254
1255 except Exception as e:
1256 print(f"Thread context test failed: {e}")
1257
1258def test_letta_integration(agent_id: str = None):
1259 """Test sending X thread context to Letta agent."""
1260 try:
1261 from letta_client import Letta
1262 import json
1263 import yaml
1264
1265 # Load X config to access letta section
1266 try:
1267 x_config = load_x_config()
1268 letta_config = x_config.get('letta', {})
1269 api_key = letta_config.get('api_key')
1270 config_agent_id = letta_config.get('agent_id')
1271
1272 # Use agent_id from config if not provided as parameter
1273 if not agent_id:
1274 if config_agent_id:
1275 agent_id = config_agent_id
1276 print(f"ℹ️ Using agent_id from config: {agent_id}")
1277 else:
1278 print("❌ No agent_id found in x_config.yaml")
1279 print("Expected config structure:")
1280 print(" letta:")
1281 print(" agent_id: your-agent-id")
1282 return
1283 else:
1284 print(f"ℹ️ Using provided agent_id: {agent_id}")
1285
1286 if not api_key:
1287 # Try loading from environment as fallback
1288 import os
1289 api_key = os.getenv('LETTA_API_KEY')
1290 if not api_key:
1291 print("❌ LETTA_API_KEY not found in x_config.yaml or environment")
1292 print("Expected config structure:")
1293 print(" letta:")
1294 print(" api_key: your-letta-api-key")
1295 return
1296 else:
1297 print("ℹ️ Using LETTA_API_KEY from environment")
1298 else:
1299 print("ℹ️ Using LETTA_API_KEY from x_config.yaml")
1300
1301 except Exception as e:
1302 print(f"❌ Error loading config: {e}")
1303 return
1304
1305 letta_client = Letta(token=api_key, timeout=600)
1306 print(f"🤖 Connected to Letta, using agent: {agent_id}")
1307
1308 # Find a queued mention file
1309 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1310 if not queue_files:
1311 print("❌ No queued mentions found. Run 'python x.py queue' first.")
1312 return
1313
1314 # Read the first mention
1315 mention_file = queue_files[0]
1316 with open(mention_file, 'r') as f:
1317 mention_data = json.load(f)
1318
1319 mention = mention_data['mention']
1320 conversation_id = mention.get('conversation_id')
1321
1322 if not conversation_id:
1323 print("❌ No conversation_id found in mention.")
1324 return
1325
1326 print(f"🧵 Getting thread context for conversation: {conversation_id}")
1327
1328 # Get thread context
1329 x_client = create_x_client()
1330 thread_data = x_client.get_thread_context(conversation_id)
1331
1332 if not thread_data:
1333 print("❌ Failed to retrieve thread context")
1334 return
1335
1336 # Convert to YAML
1337 yaml_thread = thread_to_yaml_string(thread_data)
1338
1339 # Create prompt for the agent
1340 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately.
1341
1342Here is the thread context:
1343
1344{yaml_thread}
1345
1346Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona."""
1347
1348 prompt_char_count = len(prompt)
1349 print(f"📤 Sending thread context to Letta agent... | prompt: {prompt_char_count} chars")
1350
1351 # Print the prompt in a rich panel
1352 rprint(Panel(prompt, title=f"Prompt ({prompt_char_count} chars)", border_style="blue"))
1353
1354 # Send to Letta agent using streaming
1355 message_stream = letta_client.agents.messages.create_stream(
1356 agent_id=agent_id,
1357 messages=[{"role": "user", "content": prompt}],
1358 stream_tokens=False,
1359 max_steps=10
1360 )
1361
1362 print("🔄 Streaming response from agent...")
1363 response_text = ""
1364
1365 for chunk in message_stream:
1366 print(chunk)
1367 if hasattr(chunk, 'message_type'):
1368 if chunk.message_type == 'assistant_message':
1369 print(f"🤖 Agent response: {chunk.content}")
1370 response_text = chunk.content
1371 elif chunk.message_type == 'reasoning_message':
1372 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...")
1373 elif chunk.message_type == 'tool_call_message':
1374 print(f"🔧 Agent tool call: {chunk.tool_call.name}")
1375
1376 if response_text:
1377 print(f"\n✅ Agent generated response:")
1378 print(f"📝 Response: {response_text}")
1379 else:
1380 print("❌ No response generated by agent")
1381
1382 except Exception as e:
1383 print(f"Letta integration test failed: {e}")
1384 import traceback
1385 traceback.print_exc()
1386
1387def test_x_client():
1388 """Test the X client by fetching mentions."""
1389 try:
1390 client = create_x_client()
1391 mentions = client.get_mentions(max_results=5)
1392
1393 if mentions:
1394 print(f"Successfully retrieved {len(mentions)} mentions:")
1395 for mention in mentions:
1396 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...")
1397 else:
1398 print("No mentions retrieved")
1399
1400 except Exception as e:
1401 print(f"Test failed: {e}")
1402
1403def reply_to_cameron_post():
1404 """
1405 Reply to Cameron's specific X post.
1406
1407 NOTE: This requires OAuth User Context authentication, not Bearer token.
1408 Current Bearer token is Application-Only which can't post.
1409 """
1410 try:
1411 client = create_x_client()
1412
1413 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618
1414 cameron_post_id = "1950690566909710618"
1415
1416 # Simple reply message
1417 reply_text = "Hello from void! 🤖 Testing X integration."
1418
1419 print(f"Attempting to reply to post {cameron_post_id}")
1420 print(f"Reply text: {reply_text}")
1421 print("\nNOTE: This will fail with current Bearer token (Application-Only)")
1422 print("Posting requires OAuth User Context authentication")
1423
1424 result = client.post_reply(reply_text, cameron_post_id)
1425
1426 if result:
1427 print(f"✅ Successfully posted reply!")
1428 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}")
1429 else:
1430 print("❌ Failed to post reply (expected with current auth)")
1431
1432 except Exception as e:
1433 print(f"Reply failed: {e}")
1434
1435def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False):
1436 """
1437 Process an X mention and generate a reply using the Letta agent.
1438 Similar to bsky.py process_mention but for X/Twitter.
1439
1440 Args:
1441 void_agent: The Letta agent instance
1442 x_client: The X API client
1443 mention_data: The mention data dictionary
1444 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
1445 testing_mode: If True, don't actually post to X
1446
1447 Returns:
1448 True: Successfully processed, remove from queue
1449 False: Failed but retryable, keep in queue
1450 None: Failed with non-retryable error, move to errors directory
1451 "no_reply": No reply was generated, move to no_reply directory
1452 """
1453 try:
1454 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}")
1455
1456 # Extract mention details
1457 if isinstance(mention_data, dict):
1458 # Handle both raw mention and queued mention formats
1459 if 'mention' in mention_data:
1460 mention = mention_data['mention']
1461 else:
1462 mention = mention_data
1463 else:
1464 mention = mention_data
1465
1466 mention_id = mention.get('id')
1467 mention_text = mention.get('text', '')
1468 author_id = mention.get('author_id')
1469 conversation_id = mention.get('conversation_id')
1470 in_reply_to_user_id = mention.get('in_reply_to_user_id')
1471 referenced_tweets = mention.get('referenced_tweets', [])
1472
1473 # Check downrank list - only respond to downranked users 10% of the time
1474 downrank_users = load_downrank_users()
1475 if not should_respond_to_downranked_user(str(author_id), downrank_users):
1476 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection")
1477 return "no_reply"
1478
1479 # Enhanced conversation tracking for debug - especially important for Grok handling
1480 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}")
1481 logger.info(f" Author ID: {author_id}")
1482 logger.info(f" Conversation ID: {conversation_id}")
1483 logger.info(f" In Reply To User ID: {in_reply_to_user_id}")
1484 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items")
1485 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets
1486 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}")
1487 logger.info(f" Text preview: {mention_text[:100]}...")
1488
1489 if not conversation_id:
1490 logger.warning(f"❌ No conversation_id found for mention {mention_id} - this may cause thread context issues")
1491 return None
1492
1493 # Get thread context with caching enabled for efficiency
1494 # Use mention_id as until_id to exclude tweets that occurred after this mention
1495 try:
1496 thread_data = x_client.get_thread_context(conversation_id, use_cache=True, until_id=mention_id)
1497 if not thread_data:
1498 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}")
1499 return False
1500
1501 # If this mention references a specific tweet, ensure we have that tweet in context
1502 if referenced_tweets:
1503 for ref in referenced_tweets:
1504 if ref.get('type') == 'replied_to':
1505 ref_id = ref.get('id')
1506 # Check if the referenced tweet is in our thread data
1507 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])]
1508 if ref_id and ref_id not in thread_tweet_ids:
1509 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch")
1510 try:
1511 # Fetch the missing referenced tweet directly
1512 endpoint = f"/tweets/{ref_id}"
1513 params = {
1514 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
1515 "user.fields": "id,name,username",
1516 "expansions": "author_id"
1517 }
1518 logger.debug(f"Fetching individual missing tweet: GET {endpoint}")
1519 response = x_client._make_request(endpoint, params)
1520 if response and "data" in response:
1521 missing_tweet = response["data"]
1522 if missing_tweet.get('conversation_id') == conversation_id:
1523 # Add to thread data
1524 if 'tweets' not in thread_data:
1525 thread_data['tweets'] = []
1526 thread_data['tweets'].append(missing_tweet)
1527
1528 # Add user data if available
1529 if "includes" in response and "users" in response["includes"]:
1530 if 'users' not in thread_data:
1531 thread_data['users'] = {}
1532 for user in response["includes"]["users"]:
1533 thread_data['users'][user["id"]] = user
1534
1535 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context")
1536 else:
1537 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}")
1538 except Exception as e:
1539 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}")
1540
1541 # Enhanced thread context debugging
1542 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}")
1543 thread_posts = thread_data.get('tweets', [])
1544 thread_users = thread_data.get('users', {})
1545 logger.info(f" Posts in thread: {len(thread_posts)}")
1546 logger.info(f" Users in thread: {len(thread_users)}")
1547
1548 # Log thread participants for Grok detection
1549 for user_id, user_info in thread_users.items():
1550 username = user_info.get('username', 'unknown')
1551 name = user_info.get('name', 'Unknown')
1552 is_verified = user_info.get('verified', False)
1553 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}")
1554
1555 # Special logging for Grok or AI-related users
1556 if 'grok' in username.lower() or 'grok' in name.lower():
1557 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})")
1558
1559 # Log conversation structure
1560 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts
1561 post_id = post.get('id')
1562 post_author = post.get('author_id')
1563 post_text = post.get('text', '')[:50]
1564 is_reply = 'in_reply_to_user_id' in post
1565 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...")
1566
1567 except Exception as e:
1568 logger.error(f"❌ Error getting thread context: {e}")
1569 return False
1570
1571 # Convert to YAML string
1572 thread_context = thread_to_yaml_string(thread_data)
1573 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters")
1574
1575 # Save comprehensive conversation data for debugging
1576 try:
1577 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1578 debug_dir.mkdir(parents=True, exist_ok=True)
1579
1580 # Save raw thread data (JSON)
1581 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f:
1582 json.dump(thread_data, f, indent=2)
1583
1584 # Save YAML thread context
1585 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f:
1586 f.write(thread_context)
1587
1588 # Save mention processing debug info
1589 debug_info = {
1590 'processed_at': datetime.now().isoformat(),
1591 'mention_id': mention_id,
1592 'conversation_id': conversation_id,
1593 'author_id': author_id,
1594 'in_reply_to_user_id': in_reply_to_user_id,
1595 'referenced_tweets': referenced_tweets,
1596 'thread_stats': {
1597 'total_posts': len(thread_posts),
1598 'total_users': len(thread_users),
1599 'yaml_length': len(thread_context)
1600 },
1601 'users_in_conversation': {
1602 user_id: {
1603 'username': user_info.get('username'),
1604 'name': user_info.get('name'),
1605 'verified': user_info.get('verified', False),
1606 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower()
1607 }
1608 for user_id, user_info in thread_users.items()
1609 }
1610 }
1611
1612 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f:
1613 json.dump(debug_info, f, indent=2)
1614
1615 logger.info(f"💾 Saved conversation debug data to: {debug_dir}")
1616
1617 except Exception as debug_error:
1618 logger.warning(f"Failed to save debug data: {debug_error}")
1619 # Continue processing even if debug save fails
1620
1621 # Check for #voidstop
1622 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower():
1623 logger.info("Found #voidstop, skipping this mention")
1624 return True
1625
1626 # Ensure X user blocks are attached
1627 try:
1628 ensure_x_user_blocks_attached(thread_data, void_agent.id)
1629 except Exception as e:
1630 logger.warning(f"Failed to ensure X user blocks: {e}")
1631 # Continue without user blocks rather than failing completely
1632
1633 # Create prompt for Letta agent
1634 author_info = thread_data.get('users', {}).get(author_id, {})
1635 author_username = author_info.get('username', 'unknown')
1636 author_name = author_info.get('name', author_username)
1637
1638 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}).
1639
1640MOST RECENT POST (the mention you're responding to):
1641"{mention_text}"
1642
1643FULL THREAD CONTEXT:
1644```yaml
1645{thread_context}
1646```
1647
1648The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
1649
1650If you need to update user information, use the x_user_* tools.
1651
1652To reply, use the add_post_to_x_thread tool:
1653- Each call creates one post (max 280 characters)
1654- For most responses, a single call is sufficient
1655- Only use multiple calls for threaded replies when:
1656 * The topic requires extended explanation that cannot fit in 280 characters
1657 * You're explicitly asked for a detailed/long response
1658 * The conversation naturally benefits from a structured multi-part answer
1659- Avoid unnecessary threads - be concise when possible"""
1660
1661 # Log mention processing
1662 title = f"X MENTION FROM @{author_username}"
1663 print(f"\n▶ {title}")
1664 print(f" {'═' * len(title)}")
1665 for line in mention_text.split('\n'):
1666 print(f" {line}")
1667
1668 # Send to Letta agent
1669 from letta_client import Letta
1670
1671 config = get_x_letta_config()
1672 letta_client = Letta(token=config['api_key'], timeout=config['timeout'])
1673
1674 prompt_char_count = len(prompt)
1675 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars | prompt: {prompt_char_count} chars")
1676
1677 try:
1678 # Use streaming to avoid timeout errors
1679 message_stream = letta_client.agents.messages.create_stream(
1680 agent_id=void_agent.id,
1681 messages=[{"role": "user", "content": prompt}],
1682 stream_tokens=False,
1683 max_steps=100
1684 )
1685
1686 # Collect streaming response (simplified version of bsky.py logic)
1687 all_messages = []
1688 for chunk in message_stream:
1689 if hasattr(chunk, 'message_type'):
1690 if chunk.message_type == 'reasoning_message':
1691 print("\n◆ Reasoning")
1692 print(" ─────────")
1693 for line in chunk.reasoning.split('\n'):
1694 print(f" {line}")
1695 elif chunk.message_type == 'tool_call_message':
1696 tool_name = chunk.tool_call.name
1697 if tool_name == 'add_post_to_x_thread':
1698 try:
1699 args = json.loads(chunk.tool_call.arguments)
1700 text = args.get('text', '')
1701 if text:
1702 print("\n✎ X Post")
1703 print(" ────────")
1704 for line in text.split('\n'):
1705 print(f" {line}")
1706 except:
1707 pass
1708 elif tool_name == 'halt_activity':
1709 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT")
1710 if queue_filepath and queue_filepath.exists():
1711 queue_filepath.unlink()
1712 logger.info(f"Deleted queue file: {queue_filepath.name}")
1713 logger.info("=== X BOT TERMINATED BY AGENT ===")
1714 exit(0)
1715 elif chunk.message_type == 'tool_return_message':
1716 tool_name = chunk.name
1717 status = chunk.status
1718 if status == 'success' and tool_name == 'add_post_to_x_thread':
1719 print("\n✓ X Post Queued")
1720 print(" ──────────────")
1721 print(" Post queued successfully")
1722 elif chunk.message_type == 'assistant_message':
1723 print("\n▶ Assistant Response")
1724 print(" ──────────────────")
1725 for line in chunk.content.split('\n'):
1726 print(f" {line}")
1727
1728 all_messages.append(chunk)
1729 if str(chunk) == 'done':
1730 break
1731
1732 # Convert streaming response for compatibility
1733 message_response = type('StreamingResponse', (), {
1734 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
1735 })()
1736
1737 except Exception as api_error:
1738 logger.error(f"Letta API error: {api_error}")
1739 raise
1740
1741 # Extract successful add_post_to_x_thread tool calls
1742 reply_candidates = []
1743 tool_call_results = {}
1744 ignored_notification = False
1745 ack_note = None # Track any note from annotate_ack tool
1746
1747 # First pass: collect tool return statuses
1748 for message in message_response.messages:
1749 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
1750 if message.name == 'add_post_to_x_thread':
1751 tool_call_results[message.tool_call_id] = message.status
1752 elif message.name == 'ignore_notification':
1753 if message.status == 'success':
1754 ignored_notification = True
1755 logger.info("🚫 X notification ignored")
1756
1757 # Second pass: collect successful tool calls
1758 for message in message_response.messages:
1759 if hasattr(message, 'tool_call') and message.tool_call:
1760 # Collect annotate_ack tool calls
1761 if message.tool_call.name == 'annotate_ack':
1762 try:
1763 args = json.loads(message.tool_call.arguments)
1764 note = args.get('note', '')
1765 if note:
1766 ack_note = note
1767 logger.debug(f"Found annotate_ack with note: {note[:50]}...")
1768 except json.JSONDecodeError as e:
1769 logger.error(f"Failed to parse annotate_ack arguments: {e}")
1770
1771 # Collect add_post_to_x_thread tool calls - only if they were successful
1772 elif message.tool_call.name == 'add_post_to_x_thread':
1773 tool_call_id = message.tool_call.tool_call_id
1774 tool_status = tool_call_results.get(tool_call_id, 'unknown')
1775
1776 if tool_status == 'success':
1777 try:
1778 args = json.loads(message.tool_call.arguments)
1779 reply_text = args.get('text', '')
1780 if reply_text:
1781 reply_candidates.append(reply_text)
1782 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...")
1783 except json.JSONDecodeError as e:
1784 logger.error(f"Failed to parse tool call arguments: {e}")
1785
1786 # Save agent response data to debug folder
1787 try:
1788 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1789
1790 # Save complete agent interaction
1791 agent_response_data = {
1792 'processed_at': datetime.now().isoformat(),
1793 'mention_id': mention_id,
1794 'conversation_id': conversation_id,
1795 'prompt_sent': prompt,
1796 'reply_candidates': reply_candidates,
1797 'ignored_notification': ignored_notification,
1798 'ack_note': ack_note,
1799 'tool_call_results': tool_call_results,
1800 'all_messages': []
1801 }
1802
1803 # Convert messages to serializable format
1804 for message in message_response.messages:
1805 msg_data = {
1806 'message_type': getattr(message, 'message_type', 'unknown'),
1807 'content': getattr(message, 'content', ''),
1808 'reasoning': getattr(message, 'reasoning', ''),
1809 'status': getattr(message, 'status', ''),
1810 'name': getattr(message, 'name', ''),
1811 }
1812
1813 if hasattr(message, 'tool_call') and message.tool_call:
1814 msg_data['tool_call'] = {
1815 'name': message.tool_call.name,
1816 'arguments': message.tool_call.arguments,
1817 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '')
1818 }
1819
1820 agent_response_data['all_messages'].append(msg_data)
1821
1822 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f:
1823 json.dump(agent_response_data, f, indent=2)
1824
1825 logger.info(f"💾 Saved agent response debug data")
1826
1827 except Exception as debug_error:
1828 logger.warning(f"Failed to save agent response debug data: {debug_error}")
1829
1830 # Handle conflicts
1831 if reply_candidates and ignored_notification:
1832 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!")
1833 return False
1834
1835 if reply_candidates:
1836 # Post replies to X
1837 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X")
1838
1839 if len(reply_candidates) == 1:
1840 content = reply_candidates[0]
1841 title = f"Reply to @{author_username}"
1842 else:
1843 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)])
1844 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)"
1845
1846 print(f"\n✎ {title}")
1847 print(f" {'─' * len(title)}")
1848 for line in content.split('\n'):
1849 print(f" {line}")
1850
1851 if testing_mode:
1852 logger.info("TESTING MODE: Skipping actual X post")
1853 return True
1854 else:
1855 # Post to X using thread approach
1856 success = post_x_thread_replies(x_client, mention_id, reply_candidates)
1857 if success:
1858 logger.info(f"Successfully replied to @{author_username} on X")
1859
1860 # Acknowledge the post we're replying to
1861 try:
1862 ack_result = acknowledge_x_post(x_client, mention_id, ack_note)
1863 if ack_result:
1864 if ack_note:
1865 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")")
1866 else:
1867 logger.info(f"Successfully acknowledged X post from @{author_username}")
1868 else:
1869 logger.warning(f"Failed to acknowledge X post from @{author_username}")
1870 except Exception as e:
1871 logger.error(f"Error acknowledging X post from @{author_username}: {e}")
1872 # Don't fail the entire operation if acknowledgment fails
1873
1874 return True
1875 else:
1876 logger.error(f"Failed to send reply to @{author_username} on X")
1877 return False
1878 else:
1879 if ignored_notification:
1880 logger.info(f"X mention from @{author_username} was explicitly ignored")
1881 return "ignored"
1882 else:
1883 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass")
1884 return False # Keep in queue for retry instead of removing
1885
1886 except Exception as e:
1887 logger.error(f"Error processing X mention: {e}")
1888 return False
1889
1890def acknowledge_x_post(x_client, post_id, note=None):
1891 """
1892 Acknowledge an X post that we replied to.
1893 Uses the same Bluesky client and uploads to the void data repository on atproto,
1894 just like Bluesky acknowledgments.
1895
1896 Args:
1897 x_client: XClient instance (not used, kept for compatibility)
1898 post_id: The X post ID we're acknowledging
1899 note: Optional note to include with the acknowledgment
1900
1901 Returns:
1902 True if successful, False otherwise
1903 """
1904 try:
1905 # Use Bluesky client to upload acks to the void data repository on atproto
1906 bsky_client = bsky_utils.default_login()
1907
1908 # Create a synthetic URI and CID for the X post
1909 # X posts don't have atproto URIs/CIDs, so we create identifiers
1910 post_uri = f"x://twitter.com/post/{post_id}"
1911 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts
1912
1913 # Use the same acknowledge_post function as Bluesky
1914 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note)
1915
1916 if ack_result:
1917 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else ""))
1918 return True
1919 else:
1920 logger.error(f"Failed to acknowledge X post {post_id}")
1921 return False
1922
1923 except Exception as e:
1924 logger.error(f"Error acknowledging X post {post_id}: {e}")
1925 return False
1926
1927def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages):
1928 """
1929 Post a series of replies to X, threading them properly.
1930
1931 Args:
1932 x_client: XClient instance
1933 in_reply_to_tweet_id: The original tweet ID to reply to
1934 reply_messages: List of reply text strings
1935
1936 Returns:
1937 True if successful, False otherwise
1938 """
1939 try:
1940 current_reply_id = in_reply_to_tweet_id
1941
1942 for i, reply_text in enumerate(reply_messages):
1943 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...")
1944
1945 result = x_client.post_reply(reply_text, current_reply_id)
1946
1947 if result and 'data' in result:
1948 new_tweet_id = result['data']['id']
1949 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}")
1950 # For threading, the next reply should reply to this one
1951 current_reply_id = new_tweet_id
1952 else:
1953 logger.error(f"Failed to post X reply {i+1}")
1954 return False
1955
1956 return True
1957
1958 except Exception as e:
1959 logger.error(f"Error posting X thread replies: {e}")
1960 return False
1961
1962def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False):
1963 """
1964 Load and process all X mentions from the queue.
1965 Similar to bsky.py load_and_process_queued_notifications but for X.
1966 """
1967 try:
1968 # Get all X mention files in queue directory
1969 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1970
1971 if not queue_files:
1972 return
1973
1974 # Load file metadata and sort by creation time (chronological order)
1975 file_metadata = []
1976 for filepath in queue_files:
1977 try:
1978 with open(filepath, 'r') as f:
1979 queue_data = json.load(f)
1980 mention_data = queue_data.get('mention', queue_data)
1981 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing
1982 file_metadata.append((created_at, filepath))
1983 except Exception as e:
1984 logger.warning(f"Error reading queue file {filepath.name}: {e}")
1985 # Add with default timestamp so it still gets processed
1986 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath))
1987
1988 # Sort by creation time (oldest first)
1989 file_metadata.sort(key=lambda x: x[0])
1990
1991 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order")
1992
1993 for i, (created_at, filepath) in enumerate(file_metadata, 1):
1994 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})")
1995
1996 try:
1997 # Load mention data
1998 with open(filepath, 'r') as f:
1999 queue_data = json.load(f)
2000
2001 mention_data = queue_data.get('mention', queue_data)
2002
2003 # Process the mention
2004 success = process_x_mention(void_agent, x_client, mention_data,
2005 queue_filepath=filepath, testing_mode=testing_mode)
2006
2007 except XRateLimitError:
2008 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning")
2009 break
2010
2011 except Exception as e:
2012 logger.error(f"Error processing X queue file {filepath.name}: {e}")
2013 continue
2014
2015 # Handle file based on processing result
2016 if success:
2017 if testing_mode:
2018 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}")
2019 else:
2020 filepath.unlink()
2021 logger.info(f"Successfully processed and removed X file: {filepath.name}")
2022
2023 # Mark as processed
2024 processed_mentions = load_processed_mentions()
2025 processed_mentions.add(mention_data.get('id'))
2026 save_processed_mentions(processed_mentions)
2027
2028 elif success is None: # Move to error directory
2029 error_dir = X_QUEUE_DIR / "errors"
2030 error_dir.mkdir(exist_ok=True)
2031 error_path = error_dir / filepath.name
2032 filepath.rename(error_path)
2033 logger.warning(f"Moved X file {filepath.name} to errors directory")
2034
2035 elif success == "no_reply": # Move to no_reply directory
2036 no_reply_dir = X_QUEUE_DIR / "no_reply"
2037 no_reply_dir.mkdir(exist_ok=True)
2038 no_reply_path = no_reply_dir / filepath.name
2039 filepath.rename(no_reply_path)
2040 logger.info(f"Moved X file {filepath.name} to no_reply directory")
2041
2042 elif success == "ignored": # Delete ignored notifications
2043 filepath.unlink()
2044 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}")
2045
2046 else:
2047 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry")
2048
2049 except Exception as e:
2050 logger.error(f"Error loading queued X mentions: {e}")
2051
2052def process_x_notifications(void_agent, x_client, testing_mode=False):
2053 """
2054 Fetch new X mentions, queue them, and process the queue.
2055 Similar to bsky.py process_notifications but for X.
2056 """
2057 try:
2058 # Get username for fetching mentions - uses cached data to avoid rate limits
2059 username = x_client.get_username()
2060 if not username:
2061 logger.error("Could not get username for X mentions")
2062 return
2063
2064 # Fetch and queue new mentions
2065 new_count = fetch_and_queue_mentions(username)
2066
2067 if new_count > 0:
2068 logger.info(f"Found {new_count} new X mentions to process")
2069
2070 # Process the entire queue
2071 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
2072
2073 except Exception as e:
2074 logger.error(f"Error processing X notifications: {e}")
2075
2076def periodic_user_block_cleanup(client, agent_id: str) -> None:
2077 """
2078 Detach all user blocks from the agent to prevent memory bloat.
2079 This should be called periodically to ensure clean state.
2080 """
2081 try:
2082 # Get all blocks attached to the agent
2083 attached_blocks = client.agents.blocks.list(agent_id=agent_id)
2084
2085 user_blocks_to_detach = []
2086 for block in attached_blocks:
2087 if hasattr(block, 'label') and block.label.startswith('user_'):
2088 user_blocks_to_detach.append({
2089 'label': block.label,
2090 'id': block.id
2091 })
2092
2093 if not user_blocks_to_detach:
2094 logger.debug("No user blocks found to detach during periodic cleanup")
2095 return
2096
2097 logger.info(f"Found {len(user_blocks_to_detach)} user blocks to detach")
2098
2099 # Detach each user block
2100 for block in user_blocks_to_detach:
2101 try:
2102 client.agents.blocks.detach(
2103 agent_id=agent_id,
2104 block_id=block['id']
2105 )
2106 logger.debug(f"Detached user block: {block['label']}")
2107 except Exception as e:
2108 logger.error(f"Failed to detach user block {block['label']}: {e}")
2109
2110 logger.info(f"Periodic cleanup complete: detached {len(user_blocks_to_detach)} user blocks")
2111
2112 except Exception as e:
2113 logger.error(f"Error during periodic user block cleanup: {e}")
2114
2115def initialize_x_void():
2116 """Initialize the void agent for X operations."""
2117 logger.info("Starting void agent initialization for X...")
2118
2119 from letta_client import Letta
2120
2121 # Get config
2122 config = get_x_letta_config()
2123 client = Letta(token=config['api_key'], timeout=config['timeout'])
2124 agent_id = config['agent_id']
2125
2126 try:
2127 void_agent = client.agents.retrieve(agent_id=agent_id)
2128 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})")
2129 except Exception as e:
2130 logger.error(f"Failed to load void agent {agent_id}: {e}")
2131 raise e
2132
2133 # Clean up all user blocks at startup
2134 logger.info("🧹 Cleaning up user blocks at X startup...")
2135 periodic_user_block_cleanup(client, agent_id)
2136
2137 # Ensure correct tools are attached for X
2138 logger.info("Configuring tools for X platform...")
2139 try:
2140 from tool_manager import ensure_platform_tools
2141 ensure_platform_tools('x', void_agent.id, config['api_key'])
2142 except Exception as e:
2143 logger.error(f"Failed to configure platform tools: {e}")
2144 logger.warning("Continuing with existing tool configuration")
2145
2146 # Log agent details
2147 logger.info(f"X Void agent details - ID: {void_agent.id}")
2148 logger.info(f"Agent name: {void_agent.name}")
2149
2150 return void_agent
2151
2152def x_main_loop(testing_mode=False, cleanup_interval=10):
2153 """
2154 Main X bot loop that continuously monitors for mentions and processes them.
2155 Similar to bsky.py main() but for X/Twitter.
2156
2157 Args:
2158 testing_mode: If True, don't actually post to X
2159 cleanup_interval: Run user block cleanup every N cycles (0 to disable)
2160 """
2161 import time
2162 from time import sleep
2163 from letta_client import Letta
2164
2165 logger.info("=== STARTING X VOID BOT ===")
2166
2167 # Configure logging from config file
2168 setup_logging_from_config()
2169
2170 # Initialize void agent
2171 void_agent = initialize_x_void()
2172 logger.info(f"X void agent initialized: {void_agent.id}")
2173
2174 # Initialize X client
2175 x_client = create_x_client()
2176 logger.info("Connected to X API")
2177
2178 # Get Letta client for periodic cleanup
2179 config = get_x_letta_config()
2180 letta_client = Letta(token=config['api_key'], timeout=config['timeout'])
2181
2182 # Main loop
2183 FETCH_DELAY_SEC = 120 # Check every 2 minutes for X mentions (reduced from 60s to conserve API calls)
2184 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds")
2185
2186 if testing_mode:
2187 logger.info("=== RUNNING IN X TESTING MODE ===")
2188 logger.info(" - No messages will be sent to X")
2189 logger.info(" - Queue files will not be deleted")
2190
2191 if cleanup_interval > 0:
2192 logger.info(f"User block cleanup enabled every {cleanup_interval} cycles")
2193 else:
2194 logger.info("User block cleanup disabled")
2195
2196 cycle_count = 0
2197 start_time = time.time()
2198
2199 while True:
2200 try:
2201 cycle_count += 1
2202 logger.info(f"=== X CYCLE {cycle_count} ===")
2203
2204 # Process X notifications (fetch, queue, and process)
2205 process_x_notifications(void_agent, x_client, testing_mode)
2206
2207 # Run periodic cleanup every N cycles
2208 if cleanup_interval > 0 and cycle_count % cleanup_interval == 0:
2209 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})")
2210 periodic_user_block_cleanup(letta_client, void_agent.id)
2211
2212 # Log cycle completion
2213 elapsed_time = time.time() - start_time
2214 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes")
2215
2216 sleep(FETCH_DELAY_SEC)
2217
2218 except KeyboardInterrupt:
2219 elapsed_time = time.time() - start_time
2220 logger.info("=== X BOT STOPPED BY USER ===")
2221 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes")
2222 break
2223 except Exception as e:
2224 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===")
2225 logger.error(f"Error details: {e}")
2226 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...")
2227 sleep(FETCH_DELAY_SEC * 2)
2228
2229def process_queue_only(testing_mode=False):
2230 """
2231 Process all queued X mentions without fetching new ones.
2232 Useful for rate limit management - queue first, then process separately.
2233
2234 Args:
2235 testing_mode: If True, don't actually post to X and keep queue files
2236 """
2237 logger.info("=== PROCESSING X QUEUE ONLY ===")
2238
2239 if testing_mode:
2240 logger.info("=== RUNNING IN X TESTING MODE ===")
2241 logger.info(" - No messages will be sent to X")
2242 logger.info(" - Queue files will not be deleted")
2243
2244 try:
2245 # Initialize void agent
2246 void_agent = initialize_x_void()
2247 logger.info(f"X void agent initialized: {void_agent.id}")
2248
2249 # Initialize X client
2250 x_client = create_x_client()
2251 logger.info("Connected to X API")
2252
2253 # Process the queue without fetching new mentions
2254 logger.info("Processing existing X queue...")
2255 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
2256
2257 logger.info("=== X QUEUE PROCESSING COMPLETE ===")
2258
2259 except Exception as e:
2260 logger.error(f"Error processing X queue: {e}")
2261 raise
2262
2263def x_notification_loop():
2264 """
2265 DEPRECATED: Old X notification loop using search-based mention detection.
2266 Use x_main_loop() instead for the full bot experience.
2267 """
2268 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.")
2269 x_main_loop()
2270
2271if __name__ == "__main__":
2272 import sys
2273 import argparse
2274
2275 if len(sys.argv) > 1:
2276 if sys.argv[1] == "bot":
2277 # Main bot with optional --test flag and cleanup interval
2278 parser = argparse.ArgumentParser(description='X Void Bot')
2279 parser.add_argument('command', choices=['bot'])
2280 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)')
2281 parser.add_argument('--cleanup-interval', type=int, default=10,
2282 help='Run user block cleanup every N cycles (default: 10, 0 to disable)')
2283 args = parser.parse_args()
2284 x_main_loop(testing_mode=args.test, cleanup_interval=args.cleanup_interval)
2285 elif sys.argv[1] == "loop":
2286 x_notification_loop()
2287 elif sys.argv[1] == "reply":
2288 reply_to_cameron_post()
2289 elif sys.argv[1] == "me":
2290 get_my_user_info()
2291 elif sys.argv[1] == "search":
2292 test_search_mentions()
2293 elif sys.argv[1] == "queue":
2294 test_fetch_and_queue()
2295 elif sys.argv[1] == "thread":
2296 test_thread_context()
2297 elif sys.argv[1] == "process":
2298 # Process all queued mentions with optional --test flag
2299 testing_mode = "--test" in sys.argv
2300 process_queue_only(testing_mode=testing_mode)
2301 elif sys.argv[1] == "letta":
2302 # Use specific agent ID if provided, otherwise use from config
2303 agent_id = sys.argv[2] if len(sys.argv) > 2 else None
2304 test_letta_integration(agent_id)
2305 elif sys.argv[1] == "downrank":
2306 # View or manage downrank list
2307 if len(sys.argv) > 2 and sys.argv[2] == "list":
2308 downrank_users = load_downrank_users()
2309 if downrank_users:
2310 print(f"📋 Downrank users ({len(downrank_users)} total):")
2311 for user_id in sorted(downrank_users):
2312 print(f" - {user_id}")
2313 else:
2314 print("📋 No downrank users configured")
2315 else:
2316 print("Usage: python x.py downrank list")
2317 print(" list - Show all downranked user IDs")
2318 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list")
2319 else:
2320 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]")
2321 print(" bot - Run the main X bot (use --test for testing mode)")
2322 print(" Example: python x.py bot --test")
2323 print(" queue - Fetch and queue mentions only (no processing)")
2324 print(" process - Process all queued mentions only (no fetching)")
2325 print(" Example: python x.py process --test")
2326 print(" downrank - Manage downrank users (10% response rate)")
2327 print(" Example: python x.py downrank list")
2328 print(" loop - Run the old notification monitoring loop (deprecated)")
2329 print(" reply - Reply to Cameron's specific post")
2330 print(" me - Get authenticated user info and correct user ID")
2331 print(" search - Test search-based mention detection")
2332 print(" thread - Test thread context retrieval from queued mention")
2333 print(" letta - Test sending thread context to Letta agent")
2334 print(" Optional: python x.py letta <agent-id>")
2335 else:
2336 test_x_client()