a digital person for bluesky
1import os
2import logging
3import requests
4import yaml
5import json
6import hashlib
7import random
8import time
9from typing import Optional, Dict, Any, List, Set
10from datetime import datetime
11from pathlib import Path
12from requests_oauthlib import OAuth1
13from rich import print as rprint
14from rich.panel import Panel
15from rich.text import Text
16
17import bsky_utils
18
19class XRateLimitError(Exception):
20 """Exception raised when X API rate limit is exceeded"""
21 pass
22
23
24# Configure logging
25logging.basicConfig(
26 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
27)
28logger = logging.getLogger("x_client")
29
30# X-specific file paths
31X_QUEUE_DIR = Path("x_queue")
32X_CACHE_DIR = Path("x_cache")
33X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json")
34X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json")
35X_DOWNRANK_USERS_FILE = Path("x_downrank_users.txt")
36
37class XClient:
38 """X (Twitter) API client for fetching mentions and managing interactions."""
39
40 def __init__(self, api_key: str, user_id: str, access_token: str = None,
41 consumer_key: str = None, consumer_secret: str = None,
42 access_token_secret: str = None):
43 self.api_key = api_key
44 self.access_token = access_token
45 self.user_id = user_id
46 self.base_url = "https://api.x.com/2"
47
48 # Check if we have OAuth 1.0a credentials
49 if (consumer_key and consumer_secret and access_token and access_token_secret):
50 # Use OAuth 1.0a for User Context
51 self.oauth = OAuth1(
52 consumer_key,
53 client_secret=consumer_secret,
54 resource_owner_key=access_token,
55 resource_owner_secret=access_token_secret
56 )
57 self.headers = {"Content-Type": "application/json"}
58 self.auth_method = "oauth1a"
59 logger.info("Using OAuth 1.0a User Context authentication for X API")
60 elif access_token:
61 # Use OAuth 2.0 Bearer token for User Context
62 self.oauth = None
63 self.headers = {
64 "Authorization": f"Bearer {access_token}",
65 "Content-Type": "application/json"
66 }
67 self.auth_method = "oauth2_user"
68 logger.info("Using OAuth 2.0 User Context access token for X API")
69 else:
70 # Use Application-Only Bearer token
71 self.oauth = None
72 self.headers = {
73 "Authorization": f"Bearer {api_key}",
74 "Content-Type": "application/json"
75 }
76 self.auth_method = "bearer"
77 logger.info("Using Application-Only Bearer token for X API")
78
79 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None) -> Optional[Dict]:
80 """Make a request to the X API with proper error handling."""
81 url = f"{self.base_url}{endpoint}"
82
83 try:
84 if method.upper() == "GET":
85 if self.oauth:
86 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth)
87 else:
88 response = requests.get(url, headers=self.headers, params=params)
89 elif method.upper() == "POST":
90 if self.oauth:
91 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth)
92 else:
93 response = requests.post(url, headers=self.headers, json=data)
94 else:
95 raise ValueError(f"Unsupported HTTP method: {method}")
96
97 response.raise_for_status()
98 return response.json()
99 except requests.exceptions.HTTPError as e:
100 if response.status_code == 401:
101 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials")
102 logger.error(f"Response: {response.text}")
103 elif response.status_code == 403:
104 logger.error(f"X API forbidden with {self.auth_method} - check app permissions")
105 logger.error(f"Response: {response.text}")
106 elif response.status_code == 429:
107 logger.error("X API rate limit exceeded - waiting 60 seconds before retry")
108 logger.error(f"Response: {response.text}")
109 time.sleep(60)
110 raise XRateLimitError("X API rate limit exceeded")
111 else:
112 logger.error(f"X API request failed: {e}")
113 logger.error(f"Response: {response.text}")
114 return None
115 except Exception as e:
116 logger.error(f"Unexpected error making X API request: {e}")
117 return None
118
119 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]:
120 """
121 Fetch mentions for the configured user.
122
123 Args:
124 since_id: Minimum Post ID to include (for getting newer mentions)
125 max_results: Number of results to return (5-100)
126
127 Returns:
128 List of mention objects or None if request failed
129 """
130 endpoint = f"/users/{self.user_id}/mentions"
131 params = {
132 "max_results": min(max(max_results, 5), 100), # Ensure within API limits
133 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets",
134 "user.fields": "id,name,username",
135 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
136 }
137
138 if since_id:
139 params["since_id"] = since_id
140
141 logger.info(f"Fetching mentions for user {self.user_id}")
142 response = self._make_request(endpoint, params)
143
144 if response:
145 logger.debug(f"X API response: {response}")
146
147 if response and "data" in response:
148 mentions = response["data"]
149 logger.info(f"Retrieved {len(mentions)} mentions")
150 return mentions
151 else:
152 if response:
153 logger.info(f"No mentions in response. Full response: {response}")
154 else:
155 logger.warning("Request failed - no response received")
156 return []
157
158 def get_user_info(self, user_id: str) -> Optional[Dict]:
159 """Get information about a specific user."""
160 endpoint = f"/users/{user_id}"
161 params = {
162 "user.fields": "id,name,username,description,public_metrics"
163 }
164
165 response = self._make_request(endpoint, params)
166 return response.get("data") if response else None
167
168 def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]:
169 """
170 Search for mentions using the search endpoint instead of mentions endpoint.
171 This might have better rate limits than the direct mentions endpoint.
172
173 Args:
174 username: Username to search for mentions of (without @)
175 max_results: Number of results to return (10-100)
176 since_id: Only return results newer than this tweet ID
177
178 Returns:
179 List of tweets mentioning the username
180 """
181 endpoint = "/tweets/search/recent"
182
183 # Search for mentions of the username
184 query = f"@{username}"
185
186 params = {
187 "query": query,
188 "max_results": min(max(max_results, 10), 100),
189 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
190 "user.fields": "id,name,username",
191 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
192 }
193
194 if since_id:
195 params["since_id"] = since_id
196
197 logger.info(f"Searching for mentions of @{username}")
198 response = self._make_request(endpoint, params)
199
200 if response and "data" in response:
201 tweets = response["data"]
202 logger.info(f"Found {len(tweets)} mentions via search")
203 return tweets
204 else:
205 if response:
206 logger.info(f"No mentions found via search. Response: {response}")
207 else:
208 logger.warning("Search request failed")
209 return []
210
211 def get_thread_context(self, conversation_id: str, use_cache: bool = True, until_id: Optional[str] = None) -> Optional[List[Dict]]:
212 """
213 Get all tweets in a conversation thread up to a specific tweet ID.
214
215 Args:
216 conversation_id: The conversation ID to fetch (should be the original tweet ID)
217 use_cache: Whether to use cached data if available
218 until_id: Optional tweet ID to use as upper bound (excludes posts after this ID)
219
220 Returns:
221 List of tweets in the conversation, ordered chronologically
222 """
223 # Check cache first if enabled
224 if use_cache:
225 cached_data = get_cached_thread_context(conversation_id)
226 if cached_data:
227 return cached_data
228
229 # First, get the original tweet directly since it might not appear in conversation search
230 original_tweet = None
231 try:
232 endpoint = f"/tweets/{conversation_id}"
233 params = {
234 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
235 "user.fields": "id,name,username",
236 "expansions": "author_id"
237 }
238 response = self._make_request(endpoint, params)
239 if response and "data" in response:
240 original_tweet = response["data"]
241 logger.info(f"Retrieved original tweet: {original_tweet.get('id')}")
242 except Exception as e:
243 logger.warning(f"Could not fetch original tweet {conversation_id}: {e}")
244
245 # Then search for all tweets in this conversation
246 endpoint = "/tweets/search/recent"
247 params = {
248 "query": f"conversation_id:{conversation_id}",
249 "max_results": 100, # Get as many as possible
250 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
251 "user.fields": "id,name,username",
252 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id",
253 "sort_order": "recency" # Get newest first, we'll reverse later
254 }
255
256 # Add until_id parameter to exclude tweets after the mention being processed
257 if until_id:
258 params["until_id"] = until_id
259 logger.info(f"Using until_id={until_id} to exclude future tweets")
260
261 logger.info(f"Fetching thread context for conversation {conversation_id}")
262 response = self._make_request(endpoint, params)
263
264 tweets = []
265 users_data = {}
266
267 # Collect tweets from search
268 if response and "data" in response:
269 tweets.extend(response["data"])
270 # Store user data for reference
271 if "includes" in response and "users" in response["includes"]:
272 for user in response["includes"]["users"]:
273 users_data[user["id"]] = user
274
275 # Add original tweet if we got it and it's not already in the list
276 if original_tweet:
277 tweet_ids = [t.get('id') for t in tweets]
278 if original_tweet.get('id') not in tweet_ids:
279 tweets.append(original_tweet)
280 logger.info("Added original tweet to thread context")
281
282 # Attempt to fill gaps by fetching referenced tweets that are missing
283 # This helps with X API's incomplete conversation search results
284 tweet_ids = set(t.get('id') for t in tweets)
285 missing_tweet_ids = set()
286
287 # Collect all referenced tweet IDs that aren't in our current set
288 for tweet in tweets:
289 referenced_tweets = tweet.get('referenced_tweets', [])
290 for ref in referenced_tweets:
291 ref_id = ref.get('id')
292 if ref_id and ref_id not in tweet_ids:
293 missing_tweet_ids.add(ref_id)
294
295 # Fetch missing referenced tweets individually
296 for missing_id in missing_tweet_ids:
297 try:
298 endpoint = f"/tweets/{missing_id}"
299 params = {
300 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
301 "user.fields": "id,name,username",
302 "expansions": "author_id"
303 }
304 response = self._make_request(endpoint, params)
305 if response and "data" in response:
306 missing_tweet = response["data"]
307 # Only add if it's actually part of this conversation
308 if missing_tweet.get('conversation_id') == conversation_id:
309 tweets.append(missing_tweet)
310 tweet_ids.add(missing_id)
311 logger.info(f"Retrieved missing referenced tweet: {missing_id}")
312
313 # Also add user data if available
314 if "includes" in response and "users" in response["includes"]:
315 for user in response["includes"]["users"]:
316 users_data[user["id"]] = user
317 except Exception as e:
318 logger.warning(f"Could not fetch missing tweet {missing_id}: {e}")
319
320 if tweets:
321 # Filter out tweets that occur after until_id (if specified)
322 if until_id:
323 original_count = len(tweets)
324 # Convert until_id to int for comparison (Twitter IDs are sequential)
325 until_id_int = int(until_id)
326 tweets = [t for t in tweets if int(t.get('id', '0')) <= until_id_int]
327 filtered_count = len(tweets)
328 if original_count != filtered_count:
329 logger.info(f"Filtered out {original_count - filtered_count} tweets after until_id {until_id}")
330
331 # Sort chronologically (oldest first)
332 tweets.sort(key=lambda x: x.get('created_at', ''))
333 logger.info(f"Retrieved {len(tweets)} tweets in thread")
334
335 thread_data = {"tweets": tweets, "users": users_data}
336
337 # Cache the result
338 if use_cache:
339 save_cached_thread_context(conversation_id, thread_data)
340
341 return thread_data
342 else:
343 logger.warning("No tweets found for thread context")
344 return None
345
346 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]:
347 """
348 Post a reply to a specific tweet.
349
350 Args:
351 reply_text: The text content of the reply
352 in_reply_to_tweet_id: The ID of the tweet to reply to
353
354 Returns:
355 Response data if successful, None if failed
356 """
357 endpoint = "/tweets"
358
359 payload = {
360 "text": reply_text,
361 "reply": {
362 "in_reply_to_tweet_id": in_reply_to_tweet_id
363 }
364 }
365
366 logger.info(f"Attempting to post reply with {self.auth_method} authentication")
367 result = self._make_request(endpoint, method="POST", data=payload)
368
369 if result:
370 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}")
371 return result
372 else:
373 logger.error("Failed to post reply")
374 return None
375
376def load_x_config(config_path: str = "config.yaml") -> Dict[str, str]:
377 """Load X configuration from config file."""
378 try:
379 with open(config_path, 'r') as f:
380 config = yaml.safe_load(f)
381
382 x_config = config.get('x', {})
383 if not x_config.get('api_key') or not x_config.get('user_id'):
384 raise ValueError("X API key and user_id must be configured in config.yaml")
385
386 return x_config
387 except Exception as e:
388 logger.error(f"Failed to load X configuration: {e}")
389 raise
390
391def create_x_client(config_path: str = "config.yaml") -> XClient:
392 """Create and return an X client with configuration loaded from file."""
393 config = load_x_config(config_path)
394 return XClient(
395 api_key=config['api_key'],
396 user_id=config['user_id'],
397 access_token=config.get('access_token'),
398 consumer_key=config.get('consumer_key'),
399 consumer_secret=config.get('consumer_secret'),
400 access_token_secret=config.get('access_token_secret')
401 )
402
403def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str:
404 """
405 Convert a mention object to a YAML string for better AI comprehension.
406 Similar to thread_to_yaml_string in bsky_utils.py
407 """
408 # Extract relevant fields
409 simplified_mention = {
410 'id': mention.get('id'),
411 'text': mention.get('text'),
412 'author_id': mention.get('author_id'),
413 'created_at': mention.get('created_at'),
414 'in_reply_to_user_id': mention.get('in_reply_to_user_id')
415 }
416
417 # Add user information if available
418 if users_data and mention.get('author_id') in users_data:
419 user = users_data[mention.get('author_id')]
420 simplified_mention['author'] = {
421 'username': user.get('username'),
422 'name': user.get('name')
423 }
424
425 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False)
426
427def thread_to_yaml_string(thread_data: Dict) -> str:
428 """
429 Convert X thread context to YAML string for AI comprehension.
430 Similar to Bluesky's thread_to_yaml_string function.
431
432 Args:
433 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context()
434
435 Returns:
436 YAML string representation of the thread
437 """
438 if not thread_data or "tweets" not in thread_data:
439 return "conversation: []\n"
440
441 tweets = thread_data["tweets"]
442 users_data = thread_data.get("users", {})
443
444 simplified_thread = {
445 "conversation": []
446 }
447
448 for tweet in tweets:
449 # Get user info
450 author_id = tweet.get('author_id')
451 author_info = {}
452 if author_id and author_id in users_data:
453 user = users_data[author_id]
454 author_info = {
455 'username': user.get('username'),
456 'name': user.get('name')
457 }
458
459 # Build tweet object (simplified for AI consumption)
460 tweet_obj = {
461 'text': tweet.get('text'),
462 'created_at': tweet.get('created_at'),
463 'author': author_info,
464 'author_id': author_id # Include user ID for block management
465 }
466
467 simplified_thread["conversation"].append(tweet_obj)
468
469 return yaml.dump(simplified_thread, default_flow_style=False, sort_keys=False)
470
471
472def ensure_x_user_blocks_attached(thread_data: Dict, agent_id: Optional[str] = None) -> None:
473 """
474 Ensure all users in the thread have their X user blocks attached.
475 Creates blocks with initial content including their handle if they don't exist.
476
477 Args:
478 thread_data: Dict with 'tweets' and 'users' keys from get_thread_context()
479 agent_id: The Letta agent ID to attach blocks to (defaults to config agent_id)
480 """
481 if not thread_data or "users" not in thread_data:
482 return
483
484 try:
485 from tools.blocks import attach_x_user_blocks, x_user_note_set
486 from config_loader import get_letta_config
487 from letta_client import Letta
488
489 # Get Letta client and agent_id from config
490 config = get_letta_config()
491 client = Letta(token=config['api_key'], timeout=config['timeout'])
492
493 # Use provided agent_id or get from config
494 if agent_id is None:
495 agent_id = config['agent_id']
496
497 # Get agent info to create a mock agent_state for the functions
498 class MockAgentState:
499 def __init__(self, agent_id):
500 self.id = agent_id
501
502 agent_state = MockAgentState(agent_id)
503
504 users_data = thread_data["users"]
505 user_ids = list(users_data.keys())
506
507 if not user_ids:
508 return
509
510 logger.info(f"Ensuring X user blocks for {len(user_ids)} users: {user_ids}")
511
512 # Get current blocks to check which users already have blocks with content
513 current_blocks = client.agents.blocks.list(agent_id=agent_id)
514 existing_user_blocks = {}
515
516 for block in current_blocks:
517 if block.label.startswith("x_user_"):
518 user_id = block.label.replace("x_user_", "")
519 existing_user_blocks[user_id] = block
520
521 # Attach all user blocks (this will create missing ones with basic content)
522 attach_result = attach_x_user_blocks(user_ids, agent_state)
523 logger.info(f"X user block attachment result: {attach_result}")
524
525 # For newly created blocks, update with user handle information
526 for user_id in user_ids:
527 if user_id not in existing_user_blocks:
528 user_info = users_data[user_id]
529 username = user_info.get('username', 'unknown')
530 name = user_info.get('name', 'Unknown')
531
532 # Set initial content with handle information
533 initial_content = f"# X User: {user_id}\n\n**Handle:** @{username}\n**Name:** {name}\n\nNo additional information about this user yet."
534
535 try:
536 x_user_note_set(user_id, initial_content, agent_state)
537 logger.info(f"Set initial content for X user {user_id} (@{username})")
538 except Exception as e:
539 logger.error(f"Failed to set initial content for X user {user_id}: {e}")
540
541 except Exception as e:
542 logger.error(f"Error ensuring X user blocks: {e}")
543
544
545# X Caching and Queue System Functions
546
547def load_last_seen_id() -> Optional[str]:
548 """Load the last seen mention ID for incremental fetching."""
549 if X_LAST_SEEN_FILE.exists():
550 try:
551 with open(X_LAST_SEEN_FILE, 'r') as f:
552 data = json.load(f)
553 return data.get('last_seen_id')
554 except Exception as e:
555 logger.error(f"Error loading last seen ID: {e}")
556 return None
557
558def save_last_seen_id(mention_id: str):
559 """Save the last seen mention ID."""
560 try:
561 X_QUEUE_DIR.mkdir(exist_ok=True)
562 with open(X_LAST_SEEN_FILE, 'w') as f:
563 json.dump({
564 'last_seen_id': mention_id,
565 'updated_at': datetime.now().isoformat()
566 }, f)
567 logger.debug(f"Saved last seen ID: {mention_id}")
568 except Exception as e:
569 logger.error(f"Error saving last seen ID: {e}")
570
571def load_processed_mentions() -> set:
572 """Load the set of processed mention IDs."""
573 if X_PROCESSED_MENTIONS_FILE.exists():
574 try:
575 with open(X_PROCESSED_MENTIONS_FILE, 'r') as f:
576 data = json.load(f)
577 # Keep only recent entries (last 10000)
578 if len(data) > 10000:
579 data = data[-10000:]
580 save_processed_mentions(set(data))
581 return set(data)
582 except Exception as e:
583 logger.error(f"Error loading processed mentions: {e}")
584 return set()
585
586def save_processed_mentions(processed_set: set):
587 """Save the set of processed mention IDs."""
588 try:
589 X_QUEUE_DIR.mkdir(exist_ok=True)
590 with open(X_PROCESSED_MENTIONS_FILE, 'w') as f:
591 json.dump(list(processed_set), f)
592 except Exception as e:
593 logger.error(f"Error saving processed mentions: {e}")
594
595def load_downrank_users() -> Set[str]:
596 """Load the set of user IDs that should be downranked (responded to 10% of the time)."""
597 try:
598 if not X_DOWNRANK_USERS_FILE.exists():
599 return set()
600
601 downrank_users = set()
602 with open(X_DOWNRANK_USERS_FILE, 'r') as f:
603 for line in f:
604 line = line.strip()
605 # Skip empty lines and comments
606 if line and not line.startswith('#'):
607 downrank_users.add(line)
608
609 logger.info(f"Loaded {len(downrank_users)} downrank users")
610 return downrank_users
611 except Exception as e:
612 logger.error(f"Error loading downrank users: {e}")
613 return set()
614
615def should_respond_to_downranked_user(user_id: str, downrank_users: Set[str]) -> bool:
616 """
617 Check if we should respond to a downranked user.
618 Returns True 10% of the time for downranked users, True 100% of the time for others.
619 """
620 if user_id not in downrank_users:
621 return True
622
623 # 10% chance for downranked users
624 should_respond = random.random() < 0.1
625 logger.info(f"Downranked user {user_id}: {'responding' if should_respond else 'skipping'} (10% chance)")
626 return should_respond
627
628def save_mention_to_queue(mention: Dict):
629 """Save a mention to the queue directory for async processing."""
630 try:
631 mention_id = mention.get('id')
632 if not mention_id:
633 logger.error("Mention missing ID, cannot queue")
634 return
635
636 # Check if already processed
637 processed_mentions = load_processed_mentions()
638 if mention_id in processed_mentions:
639 logger.debug(f"Mention {mention_id} already processed, skipping")
640 return
641
642 # Create queue directory
643 X_QUEUE_DIR.mkdir(exist_ok=True)
644
645 # Create filename using hash (similar to Bluesky system)
646 mention_str = json.dumps(mention, sort_keys=True)
647 mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16]
648 filename = f"x_mention_{mention_hash}.json"
649
650 queue_file = X_QUEUE_DIR / filename
651
652 # Save mention data with enhanced debugging information
653 mention_data = {
654 'mention': mention,
655 'queued_at': datetime.now().isoformat(),
656 'type': 'x_mention',
657 # Debug info for conversation tracking
658 'debug_info': {
659 'mention_id': mention.get('id'),
660 'author_id': mention.get('author_id'),
661 'conversation_id': mention.get('conversation_id'),
662 'in_reply_to_user_id': mention.get('in_reply_to_user_id'),
663 'referenced_tweets': mention.get('referenced_tweets', []),
664 'text_preview': mention.get('text', '')[:200],
665 'created_at': mention.get('created_at'),
666 'public_metrics': mention.get('public_metrics', {}),
667 'context_annotations': mention.get('context_annotations', [])
668 }
669 }
670
671 with open(queue_file, 'w') as f:
672 json.dump(mention_data, f, indent=2)
673
674 logger.info(f"Queued X mention {mention_id} -> {filename}")
675
676 except Exception as e:
677 logger.error(f"Error saving mention to queue: {e}")
678
679# X Cache Functions
680def get_cached_thread_context(conversation_id: str) -> Optional[Dict]:
681 """Load cached thread context if available."""
682 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
683 if cache_file.exists():
684 try:
685 with open(cache_file, 'r') as f:
686 cached_data = json.load(f)
687 # Check if cache is recent (within 1 hour)
688 from datetime import datetime, timedelta
689 cached_time = datetime.fromisoformat(cached_data.get('cached_at', ''))
690 if datetime.now() - cached_time < timedelta(hours=1):
691 logger.info(f"Using cached thread context for {conversation_id}")
692 return cached_data.get('thread_data')
693 except Exception as e:
694 logger.warning(f"Error loading cached thread context: {e}")
695 return None
696
697def save_cached_thread_context(conversation_id: str, thread_data: Dict):
698 """Save thread context to cache."""
699 try:
700 X_CACHE_DIR.mkdir(exist_ok=True)
701 cache_file = X_CACHE_DIR / f"thread_{conversation_id}.json"
702
703 cache_data = {
704 'conversation_id': conversation_id,
705 'thread_data': thread_data,
706 'cached_at': datetime.now().isoformat()
707 }
708
709 with open(cache_file, 'w') as f:
710 json.dump(cache_data, f, indent=2)
711
712 logger.debug(f"Cached thread context for {conversation_id}")
713 except Exception as e:
714 logger.error(f"Error caching thread context: {e}")
715
716def fetch_and_queue_mentions(username: str) -> int:
717 """
718 Single-pass function to fetch new mentions and queue them.
719 Returns number of new mentions found.
720 """
721 try:
722 client = create_x_client()
723
724 # Load last seen ID for incremental fetching
725 last_seen_id = load_last_seen_id()
726
727 logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}")
728
729 # Search for mentions
730 mentions = client.search_mentions(
731 username=username,
732 since_id=last_seen_id,
733 max_results=100 # Get as many as possible
734 )
735
736 if not mentions:
737 logger.info("No new mentions found")
738 return 0
739
740 # Process mentions (newest first, so reverse to process oldest first)
741 mentions.reverse()
742 new_count = 0
743
744 for mention in mentions:
745 save_mention_to_queue(mention)
746 new_count += 1
747
748 # Update last seen ID to the most recent mention
749 if mentions:
750 most_recent_id = mentions[-1]['id'] # Last after reverse = most recent
751 save_last_seen_id(most_recent_id)
752
753 logger.info(f"Queued {new_count} new X mentions")
754 return new_count
755
756 except Exception as e:
757 logger.error(f"Error fetching and queuing mentions: {e}")
758 return 0
759
760# Simple test function
761def get_my_user_info():
762 """Get the authenticated user's information to find correct user ID."""
763 try:
764 client = create_x_client()
765
766 # Use the /2/users/me endpoint to get authenticated user info
767 endpoint = "/users/me"
768 params = {
769 "user.fields": "id,name,username,description"
770 }
771
772 print("Fetching authenticated user information...")
773 response = client._make_request(endpoint, params=params)
774
775 if response and "data" in response:
776 user_data = response["data"]
777 print(f"✅ Found authenticated user:")
778 print(f" ID: {user_data.get('id')}")
779 print(f" Username: @{user_data.get('username')}")
780 print(f" Name: {user_data.get('name')}")
781 print(f" Description: {user_data.get('description', 'N/A')[:100]}...")
782 print(f"\n🔧 Update your config.yaml with:")
783 print(f" user_id: \"{user_data.get('id')}\"")
784 return user_data
785 else:
786 print("❌ Failed to get user information")
787 print(f"Response: {response}")
788 return None
789
790 except Exception as e:
791 print(f"Error getting user info: {e}")
792 return None
793
794def test_search_mentions():
795 """Test the search-based mention detection."""
796 try:
797 client = create_x_client()
798
799 # First get our username
800 user_info = client._make_request("/users/me", params={"user.fields": "username"})
801 if not user_info or "data" not in user_info:
802 print("❌ Could not get username")
803 return
804
805 username = user_info["data"]["username"]
806 print(f"🔍 Searching for mentions of @{username}")
807
808 mentions = client.search_mentions(username, max_results=5)
809
810 if mentions:
811 print(f"✅ Found {len(mentions)} mentions via search:")
812 for mention in mentions:
813 print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...")
814 else:
815 print("No mentions found via search")
816
817 except Exception as e:
818 print(f"Search test failed: {e}")
819
820def test_fetch_and_queue():
821 """Test the single-pass fetch and queue function."""
822 try:
823 client = create_x_client()
824
825 # Get our username
826 user_info = client._make_request("/users/me", params={"user.fields": "username"})
827 if not user_info or "data" not in user_info:
828 print("❌ Could not get username")
829 return
830
831 username = user_info["data"]["username"]
832 print(f"🔄 Fetching and queueing mentions for @{username}")
833
834 # Show current state
835 last_seen = load_last_seen_id()
836 print(f"📍 Last seen ID: {last_seen or 'None (first run)'}")
837
838 # Fetch and queue
839 new_count = fetch_and_queue_mentions(username)
840
841 if new_count > 0:
842 print(f"✅ Queued {new_count} new mentions")
843 print(f"📁 Check ./x_queue/ directory for queued mentions")
844
845 # Show updated state
846 new_last_seen = load_last_seen_id()
847 print(f"📍 Updated last seen ID: {new_last_seen}")
848 else:
849 print("ℹ️ No new mentions to queue")
850
851 except Exception as e:
852 print(f"Fetch and queue test failed: {e}")
853
854def test_thread_context():
855 """Test thread context retrieval from a queued mention."""
856 try:
857 import json
858
859 # Find a queued mention file
860 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
861 if not queue_files:
862 print("❌ No queued mentions found. Run 'python x.py queue' first.")
863 return
864
865 # Read the first mention
866 mention_file = queue_files[0]
867 with open(mention_file, 'r') as f:
868 mention_data = json.load(f)
869
870 mention = mention_data['mention']
871 print(f"📄 Using mention: {mention.get('id')}")
872 print(f"📝 Text: {mention.get('text')}")
873
874 # Check if it has a conversation_id
875 conversation_id = mention.get('conversation_id')
876 if not conversation_id:
877 print("❌ No conversation_id found in mention. May need to re-queue with updated fetch.")
878 return
879
880 print(f"🧵 Getting thread context for conversation: {conversation_id}")
881
882 # Get thread context
883 client = create_x_client()
884 thread_data = client.get_thread_context(conversation_id)
885
886 if thread_data:
887 tweets = thread_data.get('tweets', [])
888 print(f"✅ Retrieved thread with {len(tweets)} tweets")
889
890 # Convert to YAML
891 yaml_thread = thread_to_yaml_string(thread_data)
892
893 # Save thread context for inspection
894 thread_file = X_QUEUE_DIR / f"thread_context_{conversation_id}.yaml"
895 with open(thread_file, 'w') as f:
896 f.write(yaml_thread)
897
898 print(f"💾 Saved thread context to: {thread_file}")
899 print("\n📋 Thread preview:")
900 print(yaml_thread)
901 else:
902 print("❌ Failed to retrieve thread context")
903
904 except Exception as e:
905 print(f"Thread context test failed: {e}")
906
907def test_letta_integration(agent_id: str = None):
908 """Test sending X thread context to Letta agent."""
909 try:
910 from letta_client import Letta
911 import json
912 import yaml
913
914 # Load full config to access letta section
915 try:
916 with open("config.yaml", 'r') as f:
917 full_config = yaml.safe_load(f)
918
919 letta_config = full_config.get('letta', {})
920 api_key = letta_config.get('api_key')
921 config_agent_id = letta_config.get('agent_id')
922
923 # Use agent_id from config if not provided as parameter
924 if not agent_id:
925 if config_agent_id:
926 agent_id = config_agent_id
927 print(f"ℹ️ Using agent_id from config: {agent_id}")
928 else:
929 print("❌ No agent_id found in config.yaml")
930 print("Expected config structure:")
931 print(" letta:")
932 print(" agent_id: your-agent-id")
933 return
934 else:
935 print(f"ℹ️ Using provided agent_id: {agent_id}")
936
937 if not api_key:
938 # Try loading from environment as fallback
939 import os
940 api_key = os.getenv('LETTA_API_KEY')
941 if not api_key:
942 print("❌ LETTA_API_KEY not found in config.yaml or environment")
943 print("Expected config structure:")
944 print(" letta:")
945 print(" api_key: your-letta-api-key")
946 return
947 else:
948 print("ℹ️ Using LETTA_API_KEY from environment")
949 else:
950 print("ℹ️ Using LETTA_API_KEY from config.yaml")
951
952 except Exception as e:
953 print(f"❌ Error loading config: {e}")
954 return
955
956 letta_client = Letta(token=api_key, timeout=600)
957 print(f"🤖 Connected to Letta, using agent: {agent_id}")
958
959 # Find a queued mention file
960 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
961 if not queue_files:
962 print("❌ No queued mentions found. Run 'python x.py queue' first.")
963 return
964
965 # Read the first mention
966 mention_file = queue_files[0]
967 with open(mention_file, 'r') as f:
968 mention_data = json.load(f)
969
970 mention = mention_data['mention']
971 conversation_id = mention.get('conversation_id')
972
973 if not conversation_id:
974 print("❌ No conversation_id found in mention.")
975 return
976
977 print(f"🧵 Getting thread context for conversation: {conversation_id}")
978
979 # Get thread context
980 x_client = create_x_client()
981 thread_data = x_client.get_thread_context(conversation_id)
982
983 if not thread_data:
984 print("❌ Failed to retrieve thread context")
985 return
986
987 # Convert to YAML
988 yaml_thread = thread_to_yaml_string(thread_data)
989
990 # Create prompt for the agent
991 prompt = f"""You are void, an AI agent operating on X (Twitter). You have received a mention and need to respond appropriately.
992
993Here is the thread context:
994
995{yaml_thread}
996
997Please craft a response that continues this conversation naturally. Keep responses conversational and authentic to your void persona."""
998
999 print(f"📤 Sending thread context to Letta agent...")
1000
1001 # Print the prompt in a rich panel
1002 rprint(Panel(prompt, title="Prompt", border_style="blue"))
1003
1004 # Send to Letta agent using streaming
1005 message_stream = letta_client.agents.messages.create_stream(
1006 agent_id=agent_id,
1007 messages=[{"role": "user", "content": prompt}],
1008 stream_tokens=False,
1009 max_steps=10
1010 )
1011
1012 print("🔄 Streaming response from agent...")
1013 response_text = ""
1014
1015 for chunk in message_stream:
1016 print(chunk)
1017 if hasattr(chunk, 'message_type'):
1018 if chunk.message_type == 'assistant_message':
1019 print(f"🤖 Agent response: {chunk.content}")
1020 response_text = chunk.content
1021 elif chunk.message_type == 'reasoning_message':
1022 print(f"💭 Agent reasoning: {chunk.reasoning[:100]}...")
1023 elif chunk.message_type == 'tool_call_message':
1024 print(f"🔧 Agent tool call: {chunk.tool_call.name}")
1025
1026 if response_text:
1027 print(f"\n✅ Agent generated response:")
1028 print(f"📝 Response: {response_text}")
1029 else:
1030 print("❌ No response generated by agent")
1031
1032 except Exception as e:
1033 print(f"Letta integration test failed: {e}")
1034 import traceback
1035 traceback.print_exc()
1036
1037def test_x_client():
1038 """Test the X client by fetching mentions."""
1039 try:
1040 client = create_x_client()
1041 mentions = client.get_mentions(max_results=5)
1042
1043 if mentions:
1044 print(f"Successfully retrieved {len(mentions)} mentions:")
1045 for mention in mentions:
1046 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...")
1047 else:
1048 print("No mentions retrieved")
1049
1050 except Exception as e:
1051 print(f"Test failed: {e}")
1052
1053def reply_to_cameron_post():
1054 """
1055 Reply to Cameron's specific X post.
1056
1057 NOTE: This requires OAuth User Context authentication, not Bearer token.
1058 Current Bearer token is Application-Only which can't post.
1059 """
1060 try:
1061 client = create_x_client()
1062
1063 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618
1064 cameron_post_id = "1950690566909710618"
1065
1066 # Simple reply message
1067 reply_text = "Hello from void! 🤖 Testing X integration."
1068
1069 print(f"Attempting to reply to post {cameron_post_id}")
1070 print(f"Reply text: {reply_text}")
1071 print("\nNOTE: This will fail with current Bearer token (Application-Only)")
1072 print("Posting requires OAuth User Context authentication")
1073
1074 result = client.post_reply(reply_text, cameron_post_id)
1075
1076 if result:
1077 print(f"✅ Successfully posted reply!")
1078 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}")
1079 else:
1080 print("❌ Failed to post reply (expected with current auth)")
1081
1082 except Exception as e:
1083 print(f"Reply failed: {e}")
1084
1085def process_x_mention(void_agent, x_client, mention_data, queue_filepath=None, testing_mode=False):
1086 """
1087 Process an X mention and generate a reply using the Letta agent.
1088 Similar to bsky.py process_mention but for X/Twitter.
1089
1090 Args:
1091 void_agent: The Letta agent instance
1092 x_client: The X API client
1093 mention_data: The mention data dictionary
1094 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
1095 testing_mode: If True, don't actually post to X
1096
1097 Returns:
1098 True: Successfully processed, remove from queue
1099 False: Failed but retryable, keep in queue
1100 None: Failed with non-retryable error, move to errors directory
1101 "no_reply": No reply was generated, move to no_reply directory
1102 """
1103 try:
1104 logger.debug(f"Starting process_x_mention with mention_data type: {type(mention_data)}")
1105
1106 # Extract mention details
1107 if isinstance(mention_data, dict):
1108 # Handle both raw mention and queued mention formats
1109 if 'mention' in mention_data:
1110 mention = mention_data['mention']
1111 else:
1112 mention = mention_data
1113 else:
1114 mention = mention_data
1115
1116 mention_id = mention.get('id')
1117 mention_text = mention.get('text', '')
1118 author_id = mention.get('author_id')
1119 conversation_id = mention.get('conversation_id')
1120 in_reply_to_user_id = mention.get('in_reply_to_user_id')
1121 referenced_tweets = mention.get('referenced_tweets', [])
1122
1123 # Check downrank list - only respond to downranked users 10% of the time
1124 downrank_users = load_downrank_users()
1125 if not should_respond_to_downranked_user(str(author_id), downrank_users):
1126 logger.info(f"🔻 Skipping downranked user {author_id} - not in 10% selection")
1127 return "no_reply"
1128
1129 # Enhanced conversation tracking for debug - especially important for Grok handling
1130 logger.info(f"🔍 CONVERSATION DEBUG - Mention ID: {mention_id}")
1131 logger.info(f" Author ID: {author_id}")
1132 logger.info(f" Conversation ID: {conversation_id}")
1133 logger.info(f" In Reply To User ID: {in_reply_to_user_id}")
1134 logger.info(f" Referenced Tweets: {len(referenced_tweets)} items")
1135 for i, ref in enumerate(referenced_tweets[:3]): # Log first 3 referenced tweets
1136 logger.info(f" Reference {i+1}: {ref.get('type')} -> {ref.get('id')}")
1137 logger.info(f" Text preview: {mention_text[:100]}...")
1138
1139 if not conversation_id:
1140 logger.warning(f"❌ No conversation_id found for mention {mention_id} - this may cause thread context issues")
1141 return None
1142
1143 # Get thread context (disable cache for missing context issues)
1144 # Use mention_id as until_id to exclude tweets that occurred after this mention
1145 try:
1146 thread_data = x_client.get_thread_context(conversation_id, use_cache=False, until_id=mention_id)
1147 if not thread_data:
1148 logger.error(f"❌ Failed to get thread context for conversation {conversation_id}")
1149 return False
1150
1151 # If this mention references a specific tweet, ensure we have that tweet in context
1152 if referenced_tweets:
1153 for ref in referenced_tweets:
1154 if ref.get('type') == 'replied_to':
1155 ref_id = ref.get('id')
1156 # Check if the referenced tweet is in our thread data
1157 thread_tweet_ids = [t.get('id') for t in thread_data.get('tweets', [])]
1158 if ref_id and ref_id not in thread_tweet_ids:
1159 logger.warning(f"Missing referenced tweet {ref_id} in thread context, attempting to fetch")
1160 try:
1161 # Fetch the missing referenced tweet directly
1162 endpoint = f"/tweets/{ref_id}"
1163 params = {
1164 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets,conversation_id",
1165 "user.fields": "id,name,username",
1166 "expansions": "author_id"
1167 }
1168 response = x_client._make_request(endpoint, params)
1169 if response and "data" in response:
1170 missing_tweet = response["data"]
1171 if missing_tweet.get('conversation_id') == conversation_id:
1172 # Add to thread data
1173 if 'tweets' not in thread_data:
1174 thread_data['tweets'] = []
1175 thread_data['tweets'].append(missing_tweet)
1176
1177 # Add user data if available
1178 if "includes" in response and "users" in response["includes"]:
1179 if 'users' not in thread_data:
1180 thread_data['users'] = {}
1181 for user in response["includes"]["users"]:
1182 thread_data['users'][user["id"]] = user
1183
1184 logger.info(f"✅ Added missing referenced tweet {ref_id} to thread context")
1185 else:
1186 logger.warning(f"Referenced tweet {ref_id} belongs to different conversation {missing_tweet.get('conversation_id')}")
1187 except Exception as e:
1188 logger.error(f"Failed to fetch referenced tweet {ref_id}: {e}")
1189
1190 # Enhanced thread context debugging
1191 logger.info(f"🧵 THREAD CONTEXT DEBUG - Conversation ID: {conversation_id}")
1192 thread_posts = thread_data.get('tweets', [])
1193 thread_users = thread_data.get('users', {})
1194 logger.info(f" Posts in thread: {len(thread_posts)}")
1195 logger.info(f" Users in thread: {len(thread_users)}")
1196
1197 # Log thread participants for Grok detection
1198 for user_id, user_info in thread_users.items():
1199 username = user_info.get('username', 'unknown')
1200 name = user_info.get('name', 'Unknown')
1201 is_verified = user_info.get('verified', False)
1202 logger.info(f" User {user_id}: @{username} ({name}) verified={is_verified}")
1203
1204 # Special logging for Grok or AI-related users
1205 if 'grok' in username.lower() or 'grok' in name.lower():
1206 logger.info(f" 🤖 DETECTED GROK USER: @{username} ({name})")
1207
1208 # Log conversation structure
1209 for i, post in enumerate(thread_posts[:5]): # Log first 5 posts
1210 post_id = post.get('id')
1211 post_author = post.get('author_id')
1212 post_text = post.get('text', '')[:50]
1213 is_reply = 'in_reply_to_user_id' in post
1214 logger.info(f" Post {i+1}: {post_id} by {post_author} (reply={is_reply}) - {post_text}...")
1215
1216 except Exception as e:
1217 logger.error(f"❌ Error getting thread context: {e}")
1218 return False
1219
1220 # Convert to YAML string
1221 thread_context = thread_to_yaml_string(thread_data)
1222 logger.info(f"📄 Thread context generated, length: {len(thread_context)} characters")
1223
1224 # Save comprehensive conversation data for debugging
1225 try:
1226 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1227 debug_dir.mkdir(parents=True, exist_ok=True)
1228
1229 # Save raw thread data (JSON)
1230 with open(debug_dir / f"thread_data_{mention_id}.json", 'w') as f:
1231 json.dump(thread_data, f, indent=2)
1232
1233 # Save YAML thread context
1234 with open(debug_dir / f"thread_context_{mention_id}.yaml", 'w') as f:
1235 f.write(thread_context)
1236
1237 # Save mention processing debug info
1238 debug_info = {
1239 'processed_at': datetime.now().isoformat(),
1240 'mention_id': mention_id,
1241 'conversation_id': conversation_id,
1242 'author_id': author_id,
1243 'in_reply_to_user_id': in_reply_to_user_id,
1244 'referenced_tweets': referenced_tweets,
1245 'thread_stats': {
1246 'total_posts': len(thread_posts),
1247 'total_users': len(thread_users),
1248 'yaml_length': len(thread_context)
1249 },
1250 'users_in_conversation': {
1251 user_id: {
1252 'username': user_info.get('username'),
1253 'name': user_info.get('name'),
1254 'verified': user_info.get('verified', False),
1255 'is_grok': 'grok' in user_info.get('username', '').lower() or 'grok' in user_info.get('name', '').lower()
1256 }
1257 for user_id, user_info in thread_users.items()
1258 }
1259 }
1260
1261 with open(debug_dir / f"debug_info_{mention_id}.json", 'w') as f:
1262 json.dump(debug_info, f, indent=2)
1263
1264 logger.info(f"💾 Saved conversation debug data to: {debug_dir}")
1265
1266 except Exception as debug_error:
1267 logger.warning(f"Failed to save debug data: {debug_error}")
1268 # Continue processing even if debug save fails
1269
1270 # Check for #voidstop
1271 if "#voidstop" in thread_context.lower() or "#voidstop" in mention_text.lower():
1272 logger.info("Found #voidstop, skipping this mention")
1273 return True
1274
1275 # Ensure X user blocks are attached
1276 try:
1277 ensure_x_user_blocks_attached(thread_data, void_agent.id)
1278 except Exception as e:
1279 logger.warning(f"Failed to ensure X user blocks: {e}")
1280 # Continue without user blocks rather than failing completely
1281
1282 # Create prompt for Letta agent
1283 author_info = thread_data.get('users', {}).get(author_id, {})
1284 author_username = author_info.get('username', 'unknown')
1285 author_name = author_info.get('name', author_username)
1286
1287 prompt = f"""You received a mention on X (Twitter) from @{author_username} ({author_name}).
1288
1289MOST RECENT POST (the mention you're responding to):
1290"{mention_text}"
1291
1292FULL THREAD CONTEXT:
1293```yaml
1294{thread_context}
1295```
1296
1297The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
1298
1299If you need to update user information, use the x_user_* tools.
1300
1301To reply, use the add_post_to_x_thread tool:
1302- Each call creates one post (max 280 characters)
1303- For most responses, a single call is sufficient
1304- Only use multiple calls for threaded replies when:
1305 * The topic requires extended explanation that cannot fit in 280 characters
1306 * You're explicitly asked for a detailed/long response
1307 * The conversation naturally benefits from a structured multi-part answer
1308- Avoid unnecessary threads - be concise when possible"""
1309
1310 # Log mention processing
1311 title = f"X MENTION FROM @{author_username}"
1312 print(f"\n▶ {title}")
1313 print(f" {'═' * len(title)}")
1314 for line in mention_text.split('\n'):
1315 print(f" {line}")
1316
1317 # Send to Letta agent
1318 from config_loader import get_letta_config
1319 from letta_client import Letta
1320
1321 config = get_letta_config()
1322 letta_client = Letta(token=config['api_key'], timeout=config['timeout'])
1323
1324 logger.debug(f"Sending to LLM: @{author_username} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars")
1325
1326 try:
1327 # Use streaming to avoid timeout errors
1328 message_stream = letta_client.agents.messages.create_stream(
1329 agent_id=void_agent.id,
1330 messages=[{"role": "user", "content": prompt}],
1331 stream_tokens=False,
1332 max_steps=100
1333 )
1334
1335 # Collect streaming response (simplified version of bsky.py logic)
1336 all_messages = []
1337 for chunk in message_stream:
1338 if hasattr(chunk, 'message_type'):
1339 if chunk.message_type == 'reasoning_message':
1340 print("\n◆ Reasoning")
1341 print(" ─────────")
1342 for line in chunk.reasoning.split('\n'):
1343 print(f" {line}")
1344 elif chunk.message_type == 'tool_call_message':
1345 tool_name = chunk.tool_call.name
1346 if tool_name == 'add_post_to_x_thread':
1347 try:
1348 args = json.loads(chunk.tool_call.arguments)
1349 text = args.get('text', '')
1350 if text:
1351 print("\n✎ X Post")
1352 print(" ────────")
1353 for line in text.split('\n'):
1354 print(f" {line}")
1355 except:
1356 pass
1357 elif tool_name == 'halt_activity':
1358 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING X BOT")
1359 if queue_filepath and queue_filepath.exists():
1360 queue_filepath.unlink()
1361 logger.info(f"Deleted queue file: {queue_filepath.name}")
1362 logger.info("=== X BOT TERMINATED BY AGENT ===")
1363 exit(0)
1364 elif chunk.message_type == 'tool_return_message':
1365 tool_name = chunk.name
1366 status = chunk.status
1367 if status == 'success' and tool_name == 'add_post_to_x_thread':
1368 print("\n✓ X Post Queued")
1369 print(" ──────────────")
1370 print(" Post queued successfully")
1371 elif chunk.message_type == 'assistant_message':
1372 print("\n▶ Assistant Response")
1373 print(" ──────────────────")
1374 for line in chunk.content.split('\n'):
1375 print(f" {line}")
1376
1377 all_messages.append(chunk)
1378 if str(chunk) == 'done':
1379 break
1380
1381 # Convert streaming response for compatibility
1382 message_response = type('StreamingResponse', (), {
1383 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
1384 })()
1385
1386 except Exception as api_error:
1387 logger.error(f"Letta API error: {api_error}")
1388 raise
1389
1390 # Extract successful add_post_to_x_thread tool calls
1391 reply_candidates = []
1392 tool_call_results = {}
1393 ignored_notification = False
1394 ack_note = None # Track any note from annotate_ack tool
1395
1396 # First pass: collect tool return statuses
1397 for message in message_response.messages:
1398 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
1399 if message.name == 'add_post_to_x_thread':
1400 tool_call_results[message.tool_call_id] = message.status
1401 elif message.name == 'ignore_notification':
1402 if message.status == 'success':
1403 ignored_notification = True
1404 logger.info("🚫 X notification ignored")
1405
1406 # Second pass: collect successful tool calls
1407 for message in message_response.messages:
1408 if hasattr(message, 'tool_call') and message.tool_call:
1409 # Collect annotate_ack tool calls
1410 if message.tool_call.name == 'annotate_ack':
1411 try:
1412 args = json.loads(message.tool_call.arguments)
1413 note = args.get('note', '')
1414 if note:
1415 ack_note = note
1416 logger.debug(f"Found annotate_ack with note: {note[:50]}...")
1417 except json.JSONDecodeError as e:
1418 logger.error(f"Failed to parse annotate_ack arguments: {e}")
1419
1420 # Collect add_post_to_x_thread tool calls - only if they were successful
1421 elif message.tool_call.name == 'add_post_to_x_thread':
1422 tool_call_id = message.tool_call.tool_call_id
1423 tool_status = tool_call_results.get(tool_call_id, 'unknown')
1424
1425 if tool_status == 'success':
1426 try:
1427 args = json.loads(message.tool_call.arguments)
1428 reply_text = args.get('text', '')
1429 if reply_text:
1430 reply_candidates.append(reply_text)
1431 logger.debug(f"Found successful add_post_to_x_thread candidate: {reply_text[:50]}...")
1432 except json.JSONDecodeError as e:
1433 logger.error(f"Failed to parse tool call arguments: {e}")
1434
1435 # Save agent response data to debug folder
1436 try:
1437 debug_dir = X_QUEUE_DIR / "debug" / f"conversation_{conversation_id}"
1438
1439 # Save complete agent interaction
1440 agent_response_data = {
1441 'processed_at': datetime.now().isoformat(),
1442 'mention_id': mention_id,
1443 'conversation_id': conversation_id,
1444 'prompt_sent': prompt,
1445 'reply_candidates': reply_candidates,
1446 'ignored_notification': ignored_notification,
1447 'ack_note': ack_note,
1448 'tool_call_results': tool_call_results,
1449 'all_messages': []
1450 }
1451
1452 # Convert messages to serializable format
1453 for message in message_response.messages:
1454 msg_data = {
1455 'message_type': getattr(message, 'message_type', 'unknown'),
1456 'content': getattr(message, 'content', ''),
1457 'reasoning': getattr(message, 'reasoning', ''),
1458 'status': getattr(message, 'status', ''),
1459 'name': getattr(message, 'name', ''),
1460 }
1461
1462 if hasattr(message, 'tool_call') and message.tool_call:
1463 msg_data['tool_call'] = {
1464 'name': message.tool_call.name,
1465 'arguments': message.tool_call.arguments,
1466 'tool_call_id': getattr(message.tool_call, 'tool_call_id', '')
1467 }
1468
1469 agent_response_data['all_messages'].append(msg_data)
1470
1471 with open(debug_dir / f"agent_response_{mention_id}.json", 'w') as f:
1472 json.dump(agent_response_data, f, indent=2)
1473
1474 logger.info(f"💾 Saved agent response debug data")
1475
1476 except Exception as debug_error:
1477 logger.warning(f"Failed to save agent response debug data: {debug_error}")
1478
1479 # Handle conflicts
1480 if reply_candidates and ignored_notification:
1481 logger.error("⚠️ CONFLICT: Agent called both add_post_to_x_thread and ignore_notification!")
1482 return False
1483
1484 if reply_candidates:
1485 # Post replies to X
1486 logger.debug(f"Found {len(reply_candidates)} add_post_to_x_thread calls, posting to X")
1487
1488 if len(reply_candidates) == 1:
1489 content = reply_candidates[0]
1490 title = f"Reply to @{author_username}"
1491 else:
1492 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_candidates, 1)])
1493 title = f"Reply Thread to @{author_username} ({len(reply_candidates)} messages)"
1494
1495 print(f"\n✎ {title}")
1496 print(f" {'─' * len(title)}")
1497 for line in content.split('\n'):
1498 print(f" {line}")
1499
1500 if testing_mode:
1501 logger.info("TESTING MODE: Skipping actual X post")
1502 return True
1503 else:
1504 # Post to X using thread approach
1505 success = post_x_thread_replies(x_client, mention_id, reply_candidates)
1506 if success:
1507 logger.info(f"Successfully replied to @{author_username} on X")
1508
1509 # Acknowledge the post we're replying to
1510 try:
1511 ack_result = acknowledge_x_post(x_client, mention_id, ack_note)
1512 if ack_result:
1513 if ack_note:
1514 logger.info(f"Successfully acknowledged X post from @{author_username} (note: \"{ack_note[:50]}...\")")
1515 else:
1516 logger.info(f"Successfully acknowledged X post from @{author_username}")
1517 else:
1518 logger.warning(f"Failed to acknowledge X post from @{author_username}")
1519 except Exception as e:
1520 logger.error(f"Error acknowledging X post from @{author_username}: {e}")
1521 # Don't fail the entire operation if acknowledgment fails
1522
1523 return True
1524 else:
1525 logger.error(f"Failed to send reply to @{author_username} on X")
1526 return False
1527 else:
1528 if ignored_notification:
1529 logger.info(f"X mention from @{author_username} was explicitly ignored")
1530 return "ignored"
1531 else:
1532 logger.warning(f"No add_post_to_x_thread tool calls found for mention from @{author_username} - keeping in queue for next pass")
1533 return False # Keep in queue for retry instead of removing
1534
1535 except Exception as e:
1536 logger.error(f"Error processing X mention: {e}")
1537 return False
1538
1539def acknowledge_x_post(x_client, post_id, note=None):
1540 """
1541 Acknowledge an X post that we replied to.
1542 Uses the same Bluesky client and uploads to the void data repository on atproto,
1543 just like Bluesky acknowledgments.
1544
1545 Args:
1546 x_client: XClient instance (not used, kept for compatibility)
1547 post_id: The X post ID we're acknowledging
1548 note: Optional note to include with the acknowledgment
1549
1550 Returns:
1551 True if successful, False otherwise
1552 """
1553 try:
1554 # Use Bluesky client to upload acks to the void data repository on atproto
1555 bsky_client = bsky_utils.default_login()
1556
1557 # Create a synthetic URI and CID for the X post
1558 # X posts don't have atproto URIs/CIDs, so we create identifiers
1559 post_uri = f"x://twitter.com/post/{post_id}"
1560 post_cid = f"x_{post_id}_cid" # Synthetic CID for X posts
1561
1562 # Use the same acknowledge_post function as Bluesky
1563 ack_result = bsky_utils.acknowledge_post(bsky_client, post_uri, post_cid, note)
1564
1565 if ack_result:
1566 logger.debug(f"Acknowledged X post {post_id} via atproto" + (f" with note: {note[:50]}..." if note else ""))
1567 return True
1568 else:
1569 logger.error(f"Failed to acknowledge X post {post_id}")
1570 return False
1571
1572 except Exception as e:
1573 logger.error(f"Error acknowledging X post {post_id}: {e}")
1574 return False
1575
1576def post_x_thread_replies(x_client, in_reply_to_tweet_id, reply_messages):
1577 """
1578 Post a series of replies to X, threading them properly.
1579
1580 Args:
1581 x_client: XClient instance
1582 in_reply_to_tweet_id: The original tweet ID to reply to
1583 reply_messages: List of reply text strings
1584
1585 Returns:
1586 True if successful, False otherwise
1587 """
1588 try:
1589 current_reply_id = in_reply_to_tweet_id
1590
1591 for i, reply_text in enumerate(reply_messages):
1592 logger.info(f"Posting X reply {i+1}/{len(reply_messages)}: {reply_text[:50]}...")
1593
1594 result = x_client.post_reply(reply_text, current_reply_id)
1595
1596 if result and 'data' in result:
1597 new_tweet_id = result['data']['id']
1598 logger.info(f"Successfully posted X reply {i+1}, ID: {new_tweet_id}")
1599 # For threading, the next reply should reply to this one
1600 current_reply_id = new_tweet_id
1601 else:
1602 logger.error(f"Failed to post X reply {i+1}")
1603 return False
1604
1605 return True
1606
1607 except Exception as e:
1608 logger.error(f"Error posting X thread replies: {e}")
1609 return False
1610
1611def load_and_process_queued_x_mentions(void_agent, x_client, testing_mode=False):
1612 """
1613 Load and process all X mentions from the queue.
1614 Similar to bsky.py load_and_process_queued_notifications but for X.
1615 """
1616 try:
1617 # Get all X mention files in queue directory
1618 queue_files = list(X_QUEUE_DIR.glob("x_mention_*.json"))
1619
1620 if not queue_files:
1621 return
1622
1623 # Load file metadata and sort by creation time (chronological order)
1624 file_metadata = []
1625 for filepath in queue_files:
1626 try:
1627 with open(filepath, 'r') as f:
1628 queue_data = json.load(f)
1629 mention_data = queue_data.get('mention', queue_data)
1630 created_at = mention_data.get('created_at', '1970-01-01T00:00:00.000Z') # Default to epoch if missing
1631 file_metadata.append((created_at, filepath))
1632 except Exception as e:
1633 logger.warning(f"Error reading queue file {filepath.name}: {e}")
1634 # Add with default timestamp so it still gets processed
1635 file_metadata.append(('1970-01-01T00:00:00.000Z', filepath))
1636
1637 # Sort by creation time (oldest first)
1638 file_metadata.sort(key=lambda x: x[0])
1639
1640 logger.info(f"Processing {len(file_metadata)} queued X mentions in chronological order")
1641
1642 for i, (created_at, filepath) in enumerate(file_metadata, 1):
1643 logger.info(f"Processing X queue file {i}/{len(file_metadata)}: {filepath.name} (created: {created_at})")
1644
1645 try:
1646 # Load mention data
1647 with open(filepath, 'r') as f:
1648 queue_data = json.load(f)
1649
1650 mention_data = queue_data.get('mention', queue_data)
1651
1652 # Process the mention
1653 success = process_x_mention(void_agent, x_client, mention_data,
1654 queue_filepath=filepath, testing_mode=testing_mode)
1655
1656 except XRateLimitError:
1657 logger.info("Rate limit hit - breaking out of queue processing to restart from beginning")
1658 break
1659
1660 except Exception as e:
1661 logger.error(f"Error processing X queue file {filepath.name}: {e}")
1662 continue
1663
1664 # Handle file based on processing result
1665 if success:
1666 if testing_mode:
1667 logger.info(f"TESTING MODE: Keeping X queue file: {filepath.name}")
1668 else:
1669 filepath.unlink()
1670 logger.info(f"Successfully processed and removed X file: {filepath.name}")
1671
1672 # Mark as processed
1673 processed_mentions = load_processed_mentions()
1674 processed_mentions.add(mention_data.get('id'))
1675 save_processed_mentions(processed_mentions)
1676
1677 elif success is None: # Move to error directory
1678 error_dir = X_QUEUE_DIR / "errors"
1679 error_dir.mkdir(exist_ok=True)
1680 error_path = error_dir / filepath.name
1681 filepath.rename(error_path)
1682 logger.warning(f"Moved X file {filepath.name} to errors directory")
1683
1684 elif success == "no_reply": # Move to no_reply directory
1685 no_reply_dir = X_QUEUE_DIR / "no_reply"
1686 no_reply_dir.mkdir(exist_ok=True)
1687 no_reply_path = no_reply_dir / filepath.name
1688 filepath.rename(no_reply_path)
1689 logger.info(f"Moved X file {filepath.name} to no_reply directory")
1690
1691 elif success == "ignored": # Delete ignored notifications
1692 filepath.unlink()
1693 logger.info(f"🚫 Deleted ignored X notification: {filepath.name}")
1694
1695 else:
1696 logger.warning(f"⚠️ Failed to process X file {filepath.name}, keeping in queue for retry")
1697
1698 except Exception as e:
1699 logger.error(f"Error loading queued X mentions: {e}")
1700
1701def process_x_notifications(void_agent, x_client, testing_mode=False):
1702 """
1703 Fetch new X mentions, queue them, and process the queue.
1704 Similar to bsky.py process_notifications but for X.
1705 """
1706 try:
1707 # Get username for fetching mentions
1708 user_info = x_client._make_request("/users/me", params={"user.fields": "username"})
1709 if not user_info or "data" not in user_info:
1710 logger.error("Could not get username for X mentions")
1711 return
1712
1713 username = user_info["data"]["username"]
1714
1715 # Fetch and queue new mentions
1716 new_count = fetch_and_queue_mentions(username)
1717
1718 if new_count > 0:
1719 logger.info(f"Found {new_count} new X mentions to process")
1720
1721 # Process the entire queue
1722 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
1723
1724 except Exception as e:
1725 logger.error(f"Error processing X notifications: {e}")
1726
1727def initialize_x_void():
1728 """Initialize the void agent for X operations."""
1729 logger.info("Starting void agent initialization for X...")
1730
1731 from config_loader import get_letta_config
1732 from letta_client import Letta
1733
1734 # Get config
1735 config = get_letta_config()
1736 client = Letta(token=config['api_key'], timeout=config['timeout'])
1737 agent_id = config['agent_id']
1738
1739 try:
1740 void_agent = client.agents.retrieve(agent_id=agent_id)
1741 logger.info(f"Successfully loaded void agent for X: {void_agent.name} ({agent_id})")
1742 except Exception as e:
1743 logger.error(f"Failed to load void agent {agent_id}: {e}")
1744 raise e
1745
1746 # Ensure correct tools are attached for X
1747 logger.info("Configuring tools for X platform...")
1748 try:
1749 from tool_manager import ensure_platform_tools
1750 ensure_platform_tools('x', void_agent.id)
1751 except Exception as e:
1752 logger.error(f"Failed to configure platform tools: {e}")
1753 logger.warning("Continuing with existing tool configuration")
1754
1755 # Log agent details
1756 logger.info(f"X Void agent details - ID: {void_agent.id}")
1757 logger.info(f"Agent name: {void_agent.name}")
1758
1759 return void_agent
1760
1761def x_main_loop(testing_mode=False):
1762 """
1763 Main X bot loop that continuously monitors for mentions and processes them.
1764 Similar to bsky.py main() but for X/Twitter.
1765 """
1766 import time
1767 from time import sleep
1768
1769 logger.info("=== STARTING X VOID BOT ===")
1770
1771 # Initialize void agent
1772 void_agent = initialize_x_void()
1773 logger.info(f"X void agent initialized: {void_agent.id}")
1774
1775 # Initialize X client
1776 x_client = create_x_client()
1777 logger.info("Connected to X API")
1778
1779 # Main loop
1780 FETCH_DELAY_SEC = 60 # Check every minute for X mentions
1781 logger.info(f"Starting X mention monitoring, checking every {FETCH_DELAY_SEC} seconds")
1782
1783 if testing_mode:
1784 logger.info("=== RUNNING IN X TESTING MODE ===")
1785 logger.info(" - No messages will be sent to X")
1786 logger.info(" - Queue files will not be deleted")
1787
1788 cycle_count = 0
1789 start_time = time.time()
1790
1791 while True:
1792 try:
1793 cycle_count += 1
1794 logger.info(f"=== X CYCLE {cycle_count} ===")
1795
1796 # Process X notifications (fetch, queue, and process)
1797 process_x_notifications(void_agent, x_client, testing_mode)
1798
1799 # Log cycle completion
1800 elapsed_time = time.time() - start_time
1801 logger.info(f"X Cycle {cycle_count} complete. Elapsed: {elapsed_time/60:.1f} minutes")
1802
1803 sleep(FETCH_DELAY_SEC)
1804
1805 except KeyboardInterrupt:
1806 elapsed_time = time.time() - start_time
1807 logger.info("=== X BOT STOPPED BY USER ===")
1808 logger.info(f"Final X session: {cycle_count} cycles in {elapsed_time/60:.1f} minutes")
1809 break
1810 except Exception as e:
1811 logger.error(f"=== ERROR IN X MAIN LOOP CYCLE {cycle_count} ===")
1812 logger.error(f"Error details: {e}")
1813 logger.info(f"Sleeping for {FETCH_DELAY_SEC * 2} seconds due to error...")
1814 sleep(FETCH_DELAY_SEC * 2)
1815
1816def process_queue_only(testing_mode=False):
1817 """
1818 Process all queued X mentions without fetching new ones.
1819 Useful for rate limit management - queue first, then process separately.
1820
1821 Args:
1822 testing_mode: If True, don't actually post to X and keep queue files
1823 """
1824 logger.info("=== PROCESSING X QUEUE ONLY ===")
1825
1826 if testing_mode:
1827 logger.info("=== RUNNING IN X TESTING MODE ===")
1828 logger.info(" - No messages will be sent to X")
1829 logger.info(" - Queue files will not be deleted")
1830
1831 try:
1832 # Initialize void agent
1833 void_agent = initialize_x_void()
1834 logger.info(f"X void agent initialized: {void_agent.id}")
1835
1836 # Initialize X client
1837 x_client = create_x_client()
1838 logger.info("Connected to X API")
1839
1840 # Process the queue without fetching new mentions
1841 logger.info("Processing existing X queue...")
1842 load_and_process_queued_x_mentions(void_agent, x_client, testing_mode)
1843
1844 logger.info("=== X QUEUE PROCESSING COMPLETE ===")
1845
1846 except Exception as e:
1847 logger.error(f"Error processing X queue: {e}")
1848 raise
1849
1850def x_notification_loop():
1851 """
1852 DEPRECATED: Old X notification loop using search-based mention detection.
1853 Use x_main_loop() instead for the full bot experience.
1854 """
1855 logger.warning("x_notification_loop() is deprecated. Use x_main_loop() instead.")
1856 x_main_loop()
1857
1858if __name__ == "__main__":
1859 import sys
1860 import argparse
1861
1862 if len(sys.argv) > 1:
1863 if sys.argv[1] == "bot":
1864 # Main bot with optional --test flag
1865 parser = argparse.ArgumentParser(description='X Void Bot')
1866 parser.add_argument('command', choices=['bot'])
1867 parser.add_argument('--test', action='store_true', help='Run in testing mode (no actual posts)')
1868 args = parser.parse_args()
1869 x_main_loop(testing_mode=args.test)
1870 elif sys.argv[1] == "loop":
1871 x_notification_loop()
1872 elif sys.argv[1] == "reply":
1873 reply_to_cameron_post()
1874 elif sys.argv[1] == "me":
1875 get_my_user_info()
1876 elif sys.argv[1] == "search":
1877 test_search_mentions()
1878 elif sys.argv[1] == "queue":
1879 test_fetch_and_queue()
1880 elif sys.argv[1] == "thread":
1881 test_thread_context()
1882 elif sys.argv[1] == "process":
1883 # Process all queued mentions with optional --test flag
1884 testing_mode = "--test" in sys.argv
1885 process_queue_only(testing_mode=testing_mode)
1886 elif sys.argv[1] == "letta":
1887 # Use specific agent ID if provided, otherwise use from config
1888 agent_id = sys.argv[2] if len(sys.argv) > 2 else None
1889 test_letta_integration(agent_id)
1890 elif sys.argv[1] == "downrank":
1891 # View or manage downrank list
1892 if len(sys.argv) > 2 and sys.argv[2] == "list":
1893 downrank_users = load_downrank_users()
1894 if downrank_users:
1895 print(f"📋 Downrank users ({len(downrank_users)} total):")
1896 for user_id in sorted(downrank_users):
1897 print(f" - {user_id}")
1898 else:
1899 print("📋 No downrank users configured")
1900 else:
1901 print("Usage: python x.py downrank list")
1902 print(" list - Show all downranked user IDs")
1903 print(f" Edit {X_DOWNRANK_USERS_FILE} to modify the list")
1904 else:
1905 print("Usage: python x.py [bot|loop|reply|me|search|queue|process|thread|letta|downrank]")
1906 print(" bot - Run the main X bot (use --test for testing mode)")
1907 print(" Example: python x.py bot --test")
1908 print(" queue - Fetch and queue mentions only (no processing)")
1909 print(" process - Process all queued mentions only (no fetching)")
1910 print(" Example: python x.py process --test")
1911 print(" downrank - Manage downrank users (10% response rate)")
1912 print(" Example: python x.py downrank list")
1913 print(" loop - Run the old notification monitoring loop (deprecated)")
1914 print(" reply - Reply to Cameron's specific post")
1915 print(" me - Get authenticated user info and correct user ID")
1916 print(" search - Test search-based mention detection")
1917 print(" thread - Test thread context retrieval from queued mention")
1918 print(" letta - Test sending thread context to Letta agent")
1919 print(" Optional: python x.py letta <agent-id>")
1920 else:
1921 test_x_client()