a digital person for bluesky
1import os 2import logging 3import requests 4import yaml 5import json 6from typing import Optional, Dict, Any, List 7from datetime import datetime 8from requests_oauthlib import OAuth1 9 10# Configure logging 11logging.basicConfig( 12 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 13) 14logger = logging.getLogger("x_client") 15 16class XClient: 17 """X (Twitter) API client for fetching mentions and managing interactions.""" 18 19 def __init__(self, api_key: str, user_id: str, access_token: str = None, 20 consumer_key: str = None, consumer_secret: str = None, 21 access_token_secret: str = None): 22 self.api_key = api_key 23 self.access_token = access_token 24 self.user_id = user_id 25 self.base_url = "https://api.x.com/2" 26 27 # Check if we have OAuth 1.0a credentials 28 if (consumer_key and consumer_secret and access_token and access_token_secret): 29 # Use OAuth 1.0a for User Context 30 self.oauth = OAuth1( 31 consumer_key, 32 client_secret=consumer_secret, 33 resource_owner_key=access_token, 34 resource_owner_secret=access_token_secret 35 ) 36 self.headers = {"Content-Type": "application/json"} 37 self.auth_method = "oauth1a" 38 logger.info("Using OAuth 1.0a User Context authentication for X API") 39 elif access_token: 40 # Use OAuth 2.0 Bearer token for User Context 41 self.oauth = None 42 self.headers = { 43 "Authorization": f"Bearer {access_token}", 44 "Content-Type": "application/json" 45 } 46 self.auth_method = "oauth2_user" 47 logger.info("Using OAuth 2.0 User Context access token for X API") 48 else: 49 # Use Application-Only Bearer token 50 self.oauth = None 51 self.headers = { 52 "Authorization": f"Bearer {api_key}", 53 "Content-Type": "application/json" 54 } 55 self.auth_method = "bearer" 56 logger.info("Using Application-Only Bearer token for X API") 57 58 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None) -> Optional[Dict]: 59 """Make a request to the X API with proper error handling.""" 60 url = f"{self.base_url}{endpoint}" 61 62 try: 63 if method.upper() == "GET": 64 if self.oauth: 65 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth) 66 else: 67 response = requests.get(url, headers=self.headers, params=params) 68 elif method.upper() == "POST": 69 if self.oauth: 70 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth) 71 else: 72 response = requests.post(url, headers=self.headers, json=data) 73 else: 74 raise ValueError(f"Unsupported HTTP method: {method}") 75 76 response.raise_for_status() 77 return response.json() 78 except requests.exceptions.HTTPError as e: 79 if response.status_code == 401: 80 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials") 81 logger.error(f"Response: {response.text}") 82 elif response.status_code == 403: 83 logger.error(f"X API forbidden with {self.auth_method} - check app permissions") 84 logger.error(f"Response: {response.text}") 85 elif response.status_code == 429: 86 logger.error("X API rate limit exceeded") 87 logger.error(f"Response: {response.text}") 88 else: 89 logger.error(f"X API request failed: {e}") 90 logger.error(f"Response: {response.text}") 91 return None 92 except Exception as e: 93 logger.error(f"Unexpected error making X API request: {e}") 94 return None 95 96 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]: 97 """ 98 Fetch mentions for the configured user. 99 100 Args: 101 since_id: Minimum Post ID to include (for getting newer mentions) 102 max_results: Number of results to return (5-100) 103 104 Returns: 105 List of mention objects or None if request failed 106 """ 107 endpoint = f"/users/{self.user_id}/mentions" 108 params = { 109 "max_results": min(max(max_results, 5), 100), # Ensure within API limits 110 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets", 111 "user.fields": "id,name,username", 112 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id" 113 } 114 115 if since_id: 116 params["since_id"] = since_id 117 118 logger.info(f"Fetching mentions for user {self.user_id}") 119 response = self._make_request(endpoint, params) 120 121 if response: 122 logger.debug(f"X API response: {response}") 123 124 if response and "data" in response: 125 mentions = response["data"] 126 logger.info(f"Retrieved {len(mentions)} mentions") 127 return mentions 128 else: 129 if response: 130 logger.info(f"No mentions in response. Full response: {response}") 131 else: 132 logger.warning("Request failed - no response received") 133 return [] 134 135 def get_user_info(self, user_id: str) -> Optional[Dict]: 136 """Get information about a specific user.""" 137 endpoint = f"/users/{user_id}" 138 params = { 139 "user.fields": "id,name,username,description,public_metrics" 140 } 141 142 response = self._make_request(endpoint, params) 143 return response.get("data") if response else None 144 145 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]: 146 """ 147 Post a reply to a specific tweet. 148 149 Args: 150 reply_text: The text content of the reply 151 in_reply_to_tweet_id: The ID of the tweet to reply to 152 153 Returns: 154 Response data if successful, None if failed 155 """ 156 endpoint = "/tweets" 157 158 payload = { 159 "text": reply_text, 160 "reply": { 161 "in_reply_to_tweet_id": in_reply_to_tweet_id 162 } 163 } 164 165 logger.info(f"Attempting to post reply with {self.auth_method} authentication") 166 result = self._make_request(endpoint, method="POST", data=payload) 167 168 if result: 169 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}") 170 return result 171 else: 172 logger.error("Failed to post reply") 173 return None 174 175def load_x_config(config_path: str = "config.yaml") -> Dict[str, str]: 176 """Load X configuration from config file.""" 177 try: 178 with open(config_path, 'r') as f: 179 config = yaml.safe_load(f) 180 181 x_config = config.get('x', {}) 182 if not x_config.get('api_key') or not x_config.get('user_id'): 183 raise ValueError("X API key and user_id must be configured in config.yaml") 184 185 return x_config 186 except Exception as e: 187 logger.error(f"Failed to load X configuration: {e}") 188 raise 189 190def create_x_client(config_path: str = "config.yaml") -> XClient: 191 """Create and return an X client with configuration loaded from file.""" 192 config = load_x_config(config_path) 193 return XClient( 194 api_key=config['api_key'], 195 user_id=config['user_id'], 196 access_token=config.get('access_token'), 197 consumer_key=config.get('consumer_key'), 198 consumer_secret=config.get('consumer_secret'), 199 access_token_secret=config.get('access_token_secret') 200 ) 201 202def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str: 203 """ 204 Convert a mention object to a YAML string for better AI comprehension. 205 Similar to thread_to_yaml_string in bsky_utils.py 206 """ 207 # Extract relevant fields 208 simplified_mention = { 209 'id': mention.get('id'), 210 'text': mention.get('text'), 211 'author_id': mention.get('author_id'), 212 'created_at': mention.get('created_at'), 213 'in_reply_to_user_id': mention.get('in_reply_to_user_id') 214 } 215 216 # Add user information if available 217 if users_data and mention.get('author_id') in users_data: 218 user = users_data[mention.get('author_id')] 219 simplified_mention['author'] = { 220 'username': user.get('username'), 221 'name': user.get('name') 222 } 223 224 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False) 225 226# Simple test function 227def test_x_client(): 228 """Test the X client by fetching mentions.""" 229 try: 230 client = create_x_client() 231 mentions = client.get_mentions(max_results=5) 232 233 if mentions: 234 print(f"Successfully retrieved {len(mentions)} mentions:") 235 for mention in mentions: 236 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...") 237 else: 238 print("No mentions retrieved") 239 240 except Exception as e: 241 print(f"Test failed: {e}") 242 243def reply_to_cameron_post(): 244 """ 245 Reply to Cameron's specific X post. 246 247 NOTE: This requires OAuth User Context authentication, not Bearer token. 248 Current Bearer token is Application-Only which can't post. 249 """ 250 try: 251 client = create_x_client() 252 253 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618 254 cameron_post_id = "1950690566909710618" 255 256 # Simple reply message 257 reply_text = "Hello from void! 🤖 Testing X integration." 258 259 print(f"Attempting to reply to post {cameron_post_id}") 260 print(f"Reply text: {reply_text}") 261 print("\nNOTE: This will fail with current Bearer token (Application-Only)") 262 print("Posting requires OAuth User Context authentication") 263 264 result = client.post_reply(reply_text, cameron_post_id) 265 266 if result: 267 print(f"✅ Successfully posted reply!") 268 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}") 269 else: 270 print("❌ Failed to post reply (expected with current auth)") 271 272 except Exception as e: 273 print(f"Reply failed: {e}") 274 275def x_notification_loop(): 276 """ 277 Simple X notification loop that fetches mentions and logs them. 278 Very basic version to understand the platform needs. 279 """ 280 import time 281 import json 282 from pathlib import Path 283 284 logger.info("=== STARTING X NOTIFICATION LOOP ===") 285 286 try: 287 client = create_x_client() 288 logger.info("X client initialized") 289 except Exception as e: 290 logger.error(f"Failed to initialize X client: {e}") 291 return 292 293 # Track the last seen mention ID to avoid duplicates 294 last_mention_id = None 295 cycle_count = 0 296 297 # Simple loop similar to bsky.py but much more basic 298 while True: 299 try: 300 cycle_count += 1 301 logger.info(f"=== X CYCLE {cycle_count} ===") 302 303 # Fetch mentions (newer than last seen) 304 mentions = client.get_mentions( 305 since_id=last_mention_id, 306 max_results=10 307 ) 308 309 if mentions: 310 logger.info(f"Found {len(mentions)} new mentions") 311 312 # Update last seen ID 313 if mentions: 314 last_mention_id = mentions[0]['id'] # Most recent first 315 316 # Process each mention (just log for now) 317 for mention in mentions: 318 logger.info(f"Mention from {mention.get('author_id')}: {mention.get('text', '')[:100]}...") 319 320 # Convert to YAML for inspection 321 yaml_mention = mention_to_yaml_string(mention) 322 323 # Save to file for inspection (temporary) 324 debug_dir = Path("x_debug") 325 debug_dir.mkdir(exist_ok=True) 326 327 mention_file = debug_dir / f"mention_{mention['id']}.yaml" 328 with open(mention_file, 'w') as f: 329 f.write(yaml_mention) 330 331 logger.info(f"Saved mention debug info to {mention_file}") 332 else: 333 logger.info("No new mentions found") 334 335 # Sleep between cycles (shorter than bsky for now) 336 logger.info("Sleeping for 60 seconds...") 337 time.sleep(60) 338 339 except KeyboardInterrupt: 340 logger.info("=== X LOOP STOPPED BY USER ===") 341 logger.info(f"Processed {cycle_count} cycles") 342 break 343 except Exception as e: 344 logger.error(f"Error in X cycle {cycle_count}: {e}") 345 logger.info("Sleeping for 120 seconds due to error...") 346 time.sleep(120) 347 348if __name__ == "__main__": 349 import sys 350 if len(sys.argv) > 1: 351 if sys.argv[1] == "loop": 352 x_notification_loop() 353 elif sys.argv[1] == "reply": 354 reply_to_cameron_post() 355 else: 356 print("Usage: python x.py [loop|reply]") 357 print(" loop - Run the notification monitoring loop") 358 print(" reply - Reply to Cameron's specific post") 359 else: 360 test_x_client()