a digital person for bluesky
1import os
2import logging
3import requests
4import yaml
5import json
6from typing import Optional, Dict, Any, List
7from datetime import datetime
8from requests_oauthlib import OAuth1
9
10# Configure logging
11logging.basicConfig(
12 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
13)
14logger = logging.getLogger("x_client")
15
16class XClient:
17 """X (Twitter) API client for fetching mentions and managing interactions."""
18
19 def __init__(self, api_key: str, user_id: str, access_token: str = None,
20 consumer_key: str = None, consumer_secret: str = None,
21 access_token_secret: str = None):
22 self.api_key = api_key
23 self.access_token = access_token
24 self.user_id = user_id
25 self.base_url = "https://api.x.com/2"
26
27 # Check if we have OAuth 1.0a credentials
28 if (consumer_key and consumer_secret and access_token and access_token_secret):
29 # Use OAuth 1.0a for User Context
30 self.oauth = OAuth1(
31 consumer_key,
32 client_secret=consumer_secret,
33 resource_owner_key=access_token,
34 resource_owner_secret=access_token_secret
35 )
36 self.headers = {"Content-Type": "application/json"}
37 self.auth_method = "oauth1a"
38 logger.info("Using OAuth 1.0a User Context authentication for X API")
39 elif access_token:
40 # Use OAuth 2.0 Bearer token for User Context
41 self.oauth = None
42 self.headers = {
43 "Authorization": f"Bearer {access_token}",
44 "Content-Type": "application/json"
45 }
46 self.auth_method = "oauth2_user"
47 logger.info("Using OAuth 2.0 User Context access token for X API")
48 else:
49 # Use Application-Only Bearer token
50 self.oauth = None
51 self.headers = {
52 "Authorization": f"Bearer {api_key}",
53 "Content-Type": "application/json"
54 }
55 self.auth_method = "bearer"
56 logger.info("Using Application-Only Bearer token for X API")
57
58 def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None) -> Optional[Dict]:
59 """Make a request to the X API with proper error handling."""
60 url = f"{self.base_url}{endpoint}"
61
62 try:
63 if method.upper() == "GET":
64 if self.oauth:
65 response = requests.get(url, headers=self.headers, params=params, auth=self.oauth)
66 else:
67 response = requests.get(url, headers=self.headers, params=params)
68 elif method.upper() == "POST":
69 if self.oauth:
70 response = requests.post(url, headers=self.headers, json=data, auth=self.oauth)
71 else:
72 response = requests.post(url, headers=self.headers, json=data)
73 else:
74 raise ValueError(f"Unsupported HTTP method: {method}")
75
76 response.raise_for_status()
77 return response.json()
78 except requests.exceptions.HTTPError as e:
79 if response.status_code == 401:
80 logger.error(f"X API authentication failed with {self.auth_method} - check your credentials")
81 logger.error(f"Response: {response.text}")
82 elif response.status_code == 403:
83 logger.error(f"X API forbidden with {self.auth_method} - check app permissions")
84 logger.error(f"Response: {response.text}")
85 elif response.status_code == 429:
86 logger.error("X API rate limit exceeded")
87 logger.error(f"Response: {response.text}")
88 else:
89 logger.error(f"X API request failed: {e}")
90 logger.error(f"Response: {response.text}")
91 return None
92 except Exception as e:
93 logger.error(f"Unexpected error making X API request: {e}")
94 return None
95
96 def get_mentions(self, since_id: Optional[str] = None, max_results: int = 10) -> Optional[List[Dict]]:
97 """
98 Fetch mentions for the configured user.
99
100 Args:
101 since_id: Minimum Post ID to include (for getting newer mentions)
102 max_results: Number of results to return (5-100)
103
104 Returns:
105 List of mention objects or None if request failed
106 """
107 endpoint = f"/users/{self.user_id}/mentions"
108 params = {
109 "max_results": min(max(max_results, 5), 100), # Ensure within API limits
110 "tweet.fields": "id,text,author_id,created_at,in_reply_to_user_id,referenced_tweets",
111 "user.fields": "id,name,username",
112 "expansions": "author_id,in_reply_to_user_id,referenced_tweets.id"
113 }
114
115 if since_id:
116 params["since_id"] = since_id
117
118 logger.info(f"Fetching mentions for user {self.user_id}")
119 response = self._make_request(endpoint, params)
120
121 if response:
122 logger.debug(f"X API response: {response}")
123
124 if response and "data" in response:
125 mentions = response["data"]
126 logger.info(f"Retrieved {len(mentions)} mentions")
127 return mentions
128 else:
129 if response:
130 logger.info(f"No mentions in response. Full response: {response}")
131 else:
132 logger.warning("Request failed - no response received")
133 return []
134
135 def get_user_info(self, user_id: str) -> Optional[Dict]:
136 """Get information about a specific user."""
137 endpoint = f"/users/{user_id}"
138 params = {
139 "user.fields": "id,name,username,description,public_metrics"
140 }
141
142 response = self._make_request(endpoint, params)
143 return response.get("data") if response else None
144
145 def post_reply(self, reply_text: str, in_reply_to_tweet_id: str) -> Optional[Dict]:
146 """
147 Post a reply to a specific tweet.
148
149 Args:
150 reply_text: The text content of the reply
151 in_reply_to_tweet_id: The ID of the tweet to reply to
152
153 Returns:
154 Response data if successful, None if failed
155 """
156 endpoint = "/tweets"
157
158 payload = {
159 "text": reply_text,
160 "reply": {
161 "in_reply_to_tweet_id": in_reply_to_tweet_id
162 }
163 }
164
165 logger.info(f"Attempting to post reply with {self.auth_method} authentication")
166 result = self._make_request(endpoint, method="POST", data=payload)
167
168 if result:
169 logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}")
170 return result
171 else:
172 logger.error("Failed to post reply")
173 return None
174
175def load_x_config(config_path: str = "config.yaml") -> Dict[str, str]:
176 """Load X configuration from config file."""
177 try:
178 with open(config_path, 'r') as f:
179 config = yaml.safe_load(f)
180
181 x_config = config.get('x', {})
182 if not x_config.get('api_key') or not x_config.get('user_id'):
183 raise ValueError("X API key and user_id must be configured in config.yaml")
184
185 return x_config
186 except Exception as e:
187 logger.error(f"Failed to load X configuration: {e}")
188 raise
189
190def create_x_client(config_path: str = "config.yaml") -> XClient:
191 """Create and return an X client with configuration loaded from file."""
192 config = load_x_config(config_path)
193 return XClient(
194 api_key=config['api_key'],
195 user_id=config['user_id'],
196 access_token=config.get('access_token'),
197 consumer_key=config.get('consumer_key'),
198 consumer_secret=config.get('consumer_secret'),
199 access_token_secret=config.get('access_token_secret')
200 )
201
202def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str:
203 """
204 Convert a mention object to a YAML string for better AI comprehension.
205 Similar to thread_to_yaml_string in bsky_utils.py
206 """
207 # Extract relevant fields
208 simplified_mention = {
209 'id': mention.get('id'),
210 'text': mention.get('text'),
211 'author_id': mention.get('author_id'),
212 'created_at': mention.get('created_at'),
213 'in_reply_to_user_id': mention.get('in_reply_to_user_id')
214 }
215
216 # Add user information if available
217 if users_data and mention.get('author_id') in users_data:
218 user = users_data[mention.get('author_id')]
219 simplified_mention['author'] = {
220 'username': user.get('username'),
221 'name': user.get('name')
222 }
223
224 return yaml.dump(simplified_mention, default_flow_style=False, sort_keys=False)
225
226# Simple test function
227def test_x_client():
228 """Test the X client by fetching mentions."""
229 try:
230 client = create_x_client()
231 mentions = client.get_mentions(max_results=5)
232
233 if mentions:
234 print(f"Successfully retrieved {len(mentions)} mentions:")
235 for mention in mentions:
236 print(f"- {mention.get('id')}: {mention.get('text')[:50]}...")
237 else:
238 print("No mentions retrieved")
239
240 except Exception as e:
241 print(f"Test failed: {e}")
242
243def reply_to_cameron_post():
244 """
245 Reply to Cameron's specific X post.
246
247 NOTE: This requires OAuth User Context authentication, not Bearer token.
248 Current Bearer token is Application-Only which can't post.
249 """
250 try:
251 client = create_x_client()
252
253 # Cameron's post ID from the URL: https://x.com/cameron_pfiffer/status/1950690566909710618
254 cameron_post_id = "1950690566909710618"
255
256 # Simple reply message
257 reply_text = "Hello from void! 🤖 Testing X integration."
258
259 print(f"Attempting to reply to post {cameron_post_id}")
260 print(f"Reply text: {reply_text}")
261 print("\nNOTE: This will fail with current Bearer token (Application-Only)")
262 print("Posting requires OAuth User Context authentication")
263
264 result = client.post_reply(reply_text, cameron_post_id)
265
266 if result:
267 print(f"✅ Successfully posted reply!")
268 print(f"Reply ID: {result.get('data', {}).get('id', 'Unknown')}")
269 else:
270 print("❌ Failed to post reply (expected with current auth)")
271
272 except Exception as e:
273 print(f"Reply failed: {e}")
274
275def x_notification_loop():
276 """
277 Simple X notification loop that fetches mentions and logs them.
278 Very basic version to understand the platform needs.
279 """
280 import time
281 import json
282 from pathlib import Path
283
284 logger.info("=== STARTING X NOTIFICATION LOOP ===")
285
286 try:
287 client = create_x_client()
288 logger.info("X client initialized")
289 except Exception as e:
290 logger.error(f"Failed to initialize X client: {e}")
291 return
292
293 # Track the last seen mention ID to avoid duplicates
294 last_mention_id = None
295 cycle_count = 0
296
297 # Simple loop similar to bsky.py but much more basic
298 while True:
299 try:
300 cycle_count += 1
301 logger.info(f"=== X CYCLE {cycle_count} ===")
302
303 # Fetch mentions (newer than last seen)
304 mentions = client.get_mentions(
305 since_id=last_mention_id,
306 max_results=10
307 )
308
309 if mentions:
310 logger.info(f"Found {len(mentions)} new mentions")
311
312 # Update last seen ID
313 if mentions:
314 last_mention_id = mentions[0]['id'] # Most recent first
315
316 # Process each mention (just log for now)
317 for mention in mentions:
318 logger.info(f"Mention from {mention.get('author_id')}: {mention.get('text', '')[:100]}...")
319
320 # Convert to YAML for inspection
321 yaml_mention = mention_to_yaml_string(mention)
322
323 # Save to file for inspection (temporary)
324 debug_dir = Path("x_debug")
325 debug_dir.mkdir(exist_ok=True)
326
327 mention_file = debug_dir / f"mention_{mention['id']}.yaml"
328 with open(mention_file, 'w') as f:
329 f.write(yaml_mention)
330
331 logger.info(f"Saved mention debug info to {mention_file}")
332 else:
333 logger.info("No new mentions found")
334
335 # Sleep between cycles (shorter than bsky for now)
336 logger.info("Sleeping for 60 seconds...")
337 time.sleep(60)
338
339 except KeyboardInterrupt:
340 logger.info("=== X LOOP STOPPED BY USER ===")
341 logger.info(f"Processed {cycle_count} cycles")
342 break
343 except Exception as e:
344 logger.error(f"Error in X cycle {cycle_count}: {e}")
345 logger.info("Sleeping for 120 seconds due to error...")
346 time.sleep(120)
347
348if __name__ == "__main__":
349 import sys
350 if len(sys.argv) > 1:
351 if sys.argv[1] == "loop":
352 x_notification_loop()
353 elif sys.argv[1] == "reply":
354 reply_to_cameron_post()
355 else:
356 print("Usage: python x.py [loop|reply]")
357 print(" loop - Run the notification monitoring loop")
358 print(" reply - Reply to Cameron's specific post")
359 else:
360 test_x_client()