a digital person for bluesky

Implement X mention caching and queue system

- Add since_id-based incremental fetching to avoid rate limits
- Create x_queue/ directory system similar to Bluesky queue
- Track last seen mention ID for efficient polling
- Save mentions as JSON files for async processing
- Add single-pass fetch_and_queue_mentions() function
- Avoid duplicate processing with processed mentions tracking

Usage: python x.py queue (single pass, no loops)

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

+310 -16
x.py
··· 3 import requests 4 import yaml 5 import json 6 from typing import Optional, Dict, Any, List 7 from datetime import datetime 8 from requests_oauthlib import OAuth1 9 10 # Configure logging ··· 12 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 13 ) 14 logger = logging.getLogger("x_client") 15 16 class XClient: 17 """X (Twitter) API client for fetching mentions and managing interactions.""" ··· 142 response = self._make_request(endpoint, params) 143 return response.get("data") if response else None 144 145 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]: 146 """ 147 Post a reply to a specific tweet. ··· 223 224 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False) 225 226 # Simple test function 227 def test_x_client(): 228 """Test the X client by fetching mentions.""" 229 try: ··· 274 275 def x_notification_loop(): 276 """ 277 - Simple X notification loop that fetches mentions and logs them. 278 - Very basic version to understand the platform needs. 279 """ 280 import time 281 import json 282 from pathlib import Path 283 284 - logger.info("=== STARTING X NOTIFICATION LOOP ===") 285 286 try: 287 client = create_x_client() 288 logger.info("X client initialized") 289 except Exception as e: 290 logger.error(f"Failed to initialize X client: {e}") 291 return ··· 294 last_mention_id = None 295 cycle_count = 0 296 297 - # Simple loop similar to bsky.py but much more basic 298 while True: 299 try: 300 cycle_count += 1 301 - logger.info(f"=== X CYCLE {cycle_count} ===") 302 303 - # Fetch mentions (newer than last seen) 304 - mentions = client.get_mentions( 305 since_id=last_mention_id, 306 max_results=10 307 ) 308 309 if mentions: 310 - logger.info(f"Found {len(mentions)} new mentions") 311 312 # Update last seen ID 313 if mentions: ··· 330 331 logger.info(f"Saved mention debug info to {mention_file}") 332 else: 333 - logger.info("No new mentions found") 334 335 - # Sleep between cycles (shorter than bsky for now) 336 logger.info("Sleeping for 60 seconds...") 337 time.sleep(60) 338 339 except KeyboardInterrupt: 340 - logger.info("=== X LOOP STOPPED BY USER ===") 341 - logger.info(f"Processed {cycle_count} cycles") 342 break 343 except Exception as e: 344 - logger.error(f"Error in X cycle {cycle_count}: {e}") 345 logger.info("Sleeping for 120 seconds due to error...") 346 time.sleep(120) 347 ··· 352 x_notification_loop() 353 elif sys.argv[1] == "reply": 354 reply_to_cameron_post() 355 else: 356 - print("Usage: python x.py [loop|reply]") 357 - print(" loop - Run the notification monitoring loop") 358 - print(" reply - Reply to Cameron's specific post") 359 else: 360 test_x_client()
··· 3 import requests 4 import yaml 5 import json 6 + import hashlib 7 from typing import Optional, Dict, Any, List 8 from datetime import datetime 9 + from pathlib import Path 10 from requests_oauthlib import OAuth1 11 12 # Configure logging ··· 14 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 15 ) 16 logger = logging.getLogger("x_client") 17 + 18 + # X-specific file paths 19 + X_QUEUE_DIR = Path("x_queue") 20 + X_PROCESSED_MENTIONS_FILE = Path("x_queue/processed_mentions.json") 21 + X_LAST_SEEN_FILE = Path("x_queue/last_seen_id.json") 22 23 class XClient: 24 """X (Twitter) API client for fetching mentions and managing interactions.""" ··· 149 response = self._make_request(endpoint, params) 150 return response.get("data") if response else None 151 152 + def search_mentions(self, username: str, max_results: int = 10, since_id: str = None) -> Optional[List[Dict]]: 153 + """ 154 + Search for mentions using the search endpoint instead of mentions endpoint. 155 + This might have better rate limits than the direct mentions endpoint. 156 + 157 + Args: 158 + username: Username to search for mentions of (without @) 159 + max_results: Number of results to return (10-100) 160 + since_id: Only return results newer than this tweet ID 161 + 162 + Returns: 163 + List of tweets mentioning the username 164 + """ 165 + endpoint = "/tweets/search/recent" 166 + 167 + # Search for mentions of the username 168 + query = f"@{username}" 169 + 170 + params = { 171 + "query": query, 172 + "max_results": min(max(max_results, 10), 100), 173 + "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets", 174 + "user.fields": "id,name,username", 175 + "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 176 + } 177 + 178 + if since_id: 179 + params["since_id"] = since_id 180 + 181 + logger.info(f"Searching for mentions of @{username}") 182 + response = self._make_request(endpoint, params) 183 + 184 + if response and "data" in response: 185 + tweets = response["data"] 186 + logger.info(f"Found {len(tweets)} mentions via search") 187 + return tweets 188 + else: 189 + if response: 190 + logger.info(f"No mentions found via search. Response: {response}") 191 + else: 192 + logger.warning("Search request failed") 193 + return [] 194 + 195 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]: 196 """ 197 Post a reply to a specific tweet. ··· 273 274 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False) 275 276 + # X Caching and Queue System Functions 277 + 278 + def load_last_seen_id() -> Optional[str]: 279 + """Load the last seen mention ID for incremental fetching.""" 280 + if X_LAST_SEEN_FILE.exists(): 281 + try: 282 + with open(X_LAST_SEEN_FILE, 'r') as f: 283 + data = json.load(f) 284 + return data.get('last_seen_id') 285 + except Exception as e: 286 + logger.error(f"Error loading last seen ID: {e}") 287 + return None 288 + 289 + def save_last_seen_id(mention_id: str): 290 + """Save the last seen mention ID.""" 291 + try: 292 + X_QUEUE_DIR.mkdir(exist_ok=True) 293 + with open(X_LAST_SEEN_FILE, 'w') as f: 294 + json.dump({ 295 + 'last_seen_id': mention_id, 296 + 'updated_at': datetime.now().isoformat() 297 + }, f) 298 + logger.debug(f"Saved last seen ID: {mention_id}") 299 + except Exception as e: 300 + logger.error(f"Error saving last seen ID: {e}") 301 + 302 + def load_processed_mentions() -> set: 303 + """Load the set of processed mention IDs.""" 304 + if X_PROCESSED_MENTIONS_FILE.exists(): 305 + try: 306 + with open(X_PROCESSED_MENTIONS_FILE, 'r') as f: 307 + data = json.load(f) 308 + # Keep only recent entries (last 10000) 309 + if len(data) > 10000: 310 + data = data[-10000:] 311 + save_processed_mentions(set(data)) 312 + return set(data) 313 + except Exception as e: 314 + logger.error(f"Error loading processed mentions: {e}") 315 + return set() 316 + 317 + def save_processed_mentions(processed_set: set): 318 + """Save the set of processed mention IDs.""" 319 + try: 320 + X_QUEUE_DIR.mkdir(exist_ok=True) 321 + with open(X_PROCESSED_MENTIONS_FILE, 'w') as f: 322 + json.dump(list(processed_set), f) 323 + except Exception as e: 324 + logger.error(f"Error saving processed mentions: {e}") 325 + 326 + def save_mention_to_queue(mention: Dict): 327 + """Save a mention to the queue directory for async processing.""" 328 + try: 329 + mention_id = mention.get('id') 330 + if not mention_id: 331 + logger.error("Mention missing ID, cannot queue") 332 + return 333 + 334 + # Check if already processed 335 + processed_mentions = load_processed_mentions() 336 + if mention_id in processed_mentions: 337 + logger.debug(f"Mention {mention_id} already processed, skipping") 338 + return 339 + 340 + # Create queue directory 341 + X_QUEUE_DIR.mkdir(exist_ok=True) 342 + 343 + # Create filename using hash (similar to Bluesky system) 344 + mention_str = json.dumps(mention, sort_keys=True) 345 + mention_hash = hashlib.sha256(mention_str.encode()).hexdigest()[:16] 346 + filename = f"x_mention_{mention_hash}.json" 347 + 348 + queue_file = X_QUEUE_DIR / filename 349 + 350 + # Save mention data 351 + with open(queue_file, 'w') as f: 352 + json.dump({ 353 + 'mention': mention, 354 + 'queued_at': datetime.now().isoformat(), 355 + 'type': 'x_mention' 356 + }, f, indent=2) 357 + 358 + logger.info(f"Queued X mention {mention_id} -> {filename}") 359 + 360 + except Exception as e: 361 + logger.error(f"Error saving mention to queue: {e}") 362 + 363 + def fetch_and_queue_mentions(username: str) -> int: 364 + """ 365 + Single-pass function to fetch new mentions and queue them. 366 + Returns number of new mentions found. 367 + """ 368 + try: 369 + client = create_x_client() 370 + 371 + # Load last seen ID for incremental fetching 372 + last_seen_id = load_last_seen_id() 373 + 374 + logger.info(f"Fetching mentions for @{username} since {last_seen_id or 'beginning'}") 375 + 376 + # Search for mentions 377 + mentions = client.search_mentions( 378 + username=username, 379 + since_id=last_seen_id, 380 + max_results=100 # Get as many as possible 381 + ) 382 + 383 + if not mentions: 384 + logger.info("No new mentions found") 385 + return 0 386 + 387 + # Process mentions (newest first, so reverse to process oldest first) 388 + mentions.reverse() 389 + new_count = 0 390 + 391 + for mention in mentions: 392 + save_mention_to_queue(mention) 393 + new_count += 1 394 + 395 + # Update last seen ID to the most recent mention 396 + if mentions: 397 + most_recent_id = mentions[-1]['id'] # Last after reverse = most recent 398 + save_last_seen_id(most_recent_id) 399 + 400 + logger.info(f"Queued {new_count} new X mentions") 401 + return new_count 402 + 403 + except Exception as e: 404 + logger.error(f"Error fetching and queuing mentions: {e}") 405 + return 0 406 + 407 # Simple test function 408 + def get_my_user_info(): 409 + """Get the authenticated user's information to find correct user ID.""" 410 + try: 411 + client = create_x_client() 412 + 413 + # Use the /2/users/me endpoint to get authenticated user info 414 + endpoint = "/users/me" 415 + params = { 416 + "user.fields": "id,name,username,description" 417 + } 418 + 419 + print("Fetching authenticated user information...") 420 + response = client._make_request(endpoint, params=params) 421 + 422 + if response and "data" in response: 423 + user_data = response["data"] 424 + print(f"✅ Found authenticated user:") 425 + print(f" ID: {user_data.get('id')}") 426 + print(f" Username: @{user_data.get('username')}") 427 + print(f" Name: {user_data.get('name')}") 428 + print(f" Description: {user_data.get('description', 'N/A')[:100]}...") 429 + print(f"\n🔧 Update your config.yaml with:") 430 + print(f" user_id: \"{user_data.get('id')}\"") 431 + return user_data 432 + else: 433 + print("❌ Failed to get user information") 434 + print(f"Response: {response}") 435 + return None 436 + 437 + except Exception as e: 438 + print(f"Error getting user info: {e}") 439 + return None 440 + 441 + def test_search_mentions(): 442 + """Test the search-based mention detection.""" 443 + try: 444 + client = create_x_client() 445 + 446 + # First get our username 447 + user_info = client._make_request("/users/me", params={"user.fields": "username"}) 448 + if not user_info or "data" not in user_info: 449 + print("❌ Could not get username") 450 + return 451 + 452 + username = user_info["data"]["username"] 453 + print(f"🔍 Searching for mentions of @{username}") 454 + 455 + mentions = client.search_mentions(username, max_results=5) 456 + 457 + if mentions: 458 + print(f"✅ Found {len(mentions)} mentions via search:") 459 + for mention in mentions: 460 + print(f"- {mention.get('id')}: {mention.get('text', '')[:100]}...") 461 + else: 462 + print("No mentions found via search") 463 + 464 + except Exception as e: 465 + print(f"Search test failed: {e}") 466 + 467 + def test_fetch_and_queue(): 468 + """Test the single-pass fetch and queue function.""" 469 + try: 470 + client = create_x_client() 471 + 472 + # Get our username 473 + user_info = client._make_request("/users/me", params={"user.fields": "username"}) 474 + if not user_info or "data" not in user_info: 475 + print("❌ Could not get username") 476 + return 477 + 478 + username = user_info["data"]["username"] 479 + print(f"🔄 Fetching and queueing mentions for @{username}") 480 + 481 + # Show current state 482 + last_seen = load_last_seen_id() 483 + print(f"📍 Last seen ID: {last_seen or 'None (first run)'}") 484 + 485 + # Fetch and queue 486 + new_count = fetch_and_queue_mentions(username) 487 + 488 + if new_count > 0: 489 + print(f"✅ Queued {new_count} new mentions") 490 + print(f"📁 Check ./x_queue/ directory for queued mentions") 491 + 492 + # Show updated state 493 + new_last_seen = load_last_seen_id() 494 + print(f"📍 Updated last seen ID: {new_last_seen}") 495 + else: 496 + print("ℹ️ No new mentions to queue") 497 + 498 + except Exception as e: 499 + print(f"Fetch and queue test failed: {e}") 500 + 501 def test_x_client(): 502 """Test the X client by fetching mentions.""" 503 try: ··· 548 549 def x_notification_loop(): 550 """ 551 + X notification loop using search-based mention detection. 552 + Uses search endpoint instead of mentions endpoint for better rate limits. 553 """ 554 import time 555 import json 556 from pathlib import Path 557 558 + logger.info("=== STARTING X SEARCH-BASED NOTIFICATION LOOP ===") 559 560 try: 561 client = create_x_client() 562 logger.info("X client initialized") 563 + 564 + # Get our username for searching 565 + user_info = client._make_request("/users/me", params={"user.fields": "username"}) 566 + if not user_info or "data" not in user_info: 567 + logger.error("Could not get username for search") 568 + return 569 + 570 + username = user_info["data"]["username"] 571 + logger.info(f"Monitoring mentions of @{username}") 572 + 573 except Exception as e: 574 logger.error(f"Failed to initialize X client: {e}") 575 return ··· 578 last_mention_id = None 579 cycle_count = 0 580 581 + # Search-based loop with better rate limits 582 while True: 583 try: 584 cycle_count += 1 585 + logger.info(f"=== X SEARCH CYCLE {cycle_count} ===") 586 587 + # Search for mentions using search endpoint 588 + mentions = client.search_mentions( 589 + username=username, 590 since_id=last_mention_id, 591 max_results=10 592 ) 593 594 if mentions: 595 + logger.info(f"Found {len(mentions)} new mentions via search") 596 597 # Update last seen ID 598 if mentions: ··· 615 616 logger.info(f"Saved mention debug info to {mention_file}") 617 else: 618 + logger.info("No new mentions found via search") 619 620 + # Sleep between cycles - search might have better rate limits 621 logger.info("Sleeping for 60 seconds...") 622 time.sleep(60) 623 624 except KeyboardInterrupt: 625 + logger.info("=== X SEARCH LOOP STOPPED BY USER ===") 626 + logger.info(f"Processed {cycle_count} cycles") 627 break 628 except Exception as e: 629 + logger.error(f"Error in X search cycle {cycle_count}: {e}") 630 logger.info("Sleeping for 120 seconds due to error...") 631 time.sleep(120) 632 ··· 637 x_notification_loop() 638 elif sys.argv[1] == "reply": 639 reply_to_cameron_post() 640 + elif sys.argv[1] == "me": 641 + get_my_user_info() 642 + elif sys.argv[1] == "search": 643 + test_search_mentions() 644 + elif sys.argv[1] == "queue": 645 + test_fetch_and_queue() 646 else: 647 + print("Usage: python x.py [loop|reply|me|search|queue]") 648 + print(" loop - Run the notification monitoring loop") 649 + print(" reply - Reply to Cameron's specific post") 650 + print(" me - Get authenticated user info and correct user ID") 651 + print(" search - Test search-based mention detection") 652 + print(" queue - Test fetch and queue mentions (single pass)") 653 else: 654 test_x_client()
+5
x_debug/mention_1950690566909710618.yaml
···
··· 1 + id: '1950690566909710618' 2 + text: hey @void_comind 3 + author_id: '1232326955652931584' 4 + created_at: '2025-07-30T22:50:47.000Z' 5 + in_reply_to_user_id: null
+1
x_queue/last_seen_id.json
···
··· 1 + {"last_seen_id": "1950714596828061885", "updated_at": "2025-07-30T17:26:32.871012"}
+20
x_queue/x_mention_9d95c8dd59179a67.json
···
··· 1 + { 2 + "mention": { 3 + "author_id": "1232326955652931584", 4 + "in_reply_to_user_id": "1950680610282094592", 5 + "id": "1950714596828061885", 6 + "created_at": "2025-07-31T00:26:17.000Z", 7 + "text": "@void_comind sup", 8 + "edit_history_tweet_ids": [ 9 + "1950714596828061885" 10 + ], 11 + "referenced_tweets": [ 12 + { 13 + "type": "replied_to", 14 + "id": "1950707109240373317" 15 + } 16 + ] 17 + }, 18 + "queued_at": "2025-07-30T17:26:32.870254", 19 + "type": "x_mention" 20 + }
+13
x_queue/x_mention_d04913def179066b.json
···
··· 1 + { 2 + "mention": { 3 + "edit_history_tweet_ids": [ 4 + "1950690566909710618" 5 + ], 6 + "created_at": "2025-07-30T22:50:47.000Z", 7 + "text": "hey @void_comind", 8 + "id": "1950690566909710618", 9 + "author_id": "1232326955652931584" 10 + }, 11 + "queued_at": "2025-07-30T17:21:41.171268", 12 + "type": "x_mention" 13 + }