+71
-33
x.py
+71
-33
x.py
···
5
5
import json
6
6
from typing import Optional, Dict, Any, List
7
7
from datetime import datetime
8
+
from requests_oauthlib import OAuth1
8
9
9
10
# Configure logging
10
11
logging.basicConfig(
···
15
16
class XClient:
16
17
"""X (Twitter) API client for fetching mentions and managing interactions."""
17
18
18
-
def __init__(self, api_key: str, user_id: str):
19
+
def __init__(self, api_key: str, user_id: str, access_token: str = None,
20
+
consumer_key: str = None, consumer_secret: str = None,
21
+
access_token_secret: str = None):
19
22
self.api_key = api_key
23
+
self.access_token = access_token
20
24
self.user_id = user_id
21
25
self.base_url = "https://api.x.com/2"
22
-
self.headers = {
23
-
"Authorization": f"Bearer {api_key}",
24
-
"Content-Type": "application/json"
25
-
}
26
+
27
+
# Check if we have OAuth 1.0a credentials
28
+
if (consumer_key and consumer_secret and access_token and access_token_secret):
29
+
# Use OAuth 1.0a for User Context
30
+
self.oauth = OAuth1(
31
+
consumer_key,
32
+
client_secret=consumer_secret,
33
+
resource_owner_key=access_token,
34
+
resource_owner_secret=access_token_secret
35
+
)
36
+
self.headers = {"Content-Type": "application/json"}
37
+
self.auth_method = "oauth1a"
38
+
logger.info("Using OAuth 1.0a User Context authentication for X API")
39
+
elif access_token:
40
+
# Use OAuth 2.0 Bearer token for User Context
41
+
self.oauth = None
42
+
self.headers = {
43
+
"Authorization": f"Bearer {access_token}",
44
+
"Content-Type": "application/json"
45
+
}
46
+
self.auth_method = "oauth2_user"
47
+
logger.info("Using OAuth 2.0 User Context access token for X API")
48
+
else:
49
+
# Use Application-Only Bearer token
50
+
self.oauth = None
51
+
self.headers = {
52
+
"Authorization": f"Bearer {api_key}",
53
+
"Content-Type": "application/json"
54
+
}
55
+
self.auth_method = "bearer"
56
+
logger.info("Using Application-Only Bearer token for X API")
26
57
27
-
def _make_request(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]:
58
+
def _make_request(self, endpoint: str, params: Optional[Dict] = None, method: str = "GET", data: Optional[Dict] = None) -> Optional[Dict]:
28
59
"""Make a request to the X API with proper error handling."""
29
60
url = f"{self.base_url}{endpoint}"
30
61
31
62
try:
32
-
response = requests.get(url, headers=self.headers, params=params)
63
+
if method.upper() == "GET":
64
+
if self.oauth:
65
+
response = requests.get(url, headers=self.headers, params=params, auth=self.oauth)
66
+
else:
67
+
response = requests.get(url, headers=self.headers, params=params)
68
+
elif method.upper() == "POST":
69
+
if self.oauth:
70
+
response = requests.post(url, headers=self.headers, json=data, auth=self.oauth)
71
+
else:
72
+
response = requests.post(url, headers=self.headers, json=data)
73
+
else:
74
+
raise ValueError(f"Unsupported HTTP method: {method}")
75
+
33
76
response.raise_for_status()
34
77
return response.json()
35
78
except requests.exceptions.HTTPError as e:
36
79
if response.status_code == 401:
37
-
logger.error("X API authentication failed - check your bearer token")
80
+
logger.error(f"X API authentication failed with {self.auth_method} - check your credentials")
81
+
logger.error(f"Response: {response.text}")
82
+
elif response.status_code == 403:
83
+
logger.error(f"X API forbidden with {self.auth_method} - check app permissions")
84
+
logger.error(f"Response: {response.text}")
38
85
elif response.status_code == 429:
39
86
logger.error("X API rate limit exceeded")
87
+
logger.error(f"Response: {response.text}")
40
88
else:
41
89
logger.error(f"X API request failed: {e}")
90
+
logger.error(f"Response: {response.text}")
42
91
return None
43
92
except Exception as e:
44
93
logger.error(f"Unexpected error making X API request: {e}")
···
113
162
}
114
163
}
115
164
116
-
try:
117
-
url = f"{self.base_url}{endpoint}"
118
-
response = requests.post(url, headers=self.headers, json=payload)
119
-
response.raise_for_status()
120
-
121
-
result = response.json()
165
+
logger.info(f"Attempting to post reply with {self.auth_method} authentication")
166
+
result = self._make_request(endpoint, method="POST", data=payload)
167
+
168
+
if result:
122
169
logger.info(f"Successfully posted reply to tweet {in_reply_to_tweet_id}")
123
170
return result
124
-
125
-
except requests.exceptions.HTTPError as e:
126
-
if response.status_code == 401:
127
-
logger.error("X API authentication failed for posting - check your bearer token")
128
-
logger.error(f"Response: {response.text}")
129
-
elif response.status_code == 403:
130
-
logger.error("X API posting forbidden - likely app permissions issue")
131
-
logger.error("Check that your X app has 'Read and Write' permissions enabled")
132
-
logger.error(f"Response: {response.text}")
133
-
elif response.status_code == 429:
134
-
logger.error("X API rate limit exceeded for posting")
135
-
logger.error(f"Response: {response.text}")
136
-
else:
137
-
logger.error(f"X API post request failed: {e}")
138
-
logger.error(f"Response content: {response.text}")
139
-
return None
140
-
except Exception as e:
141
-
logger.error(f"Unexpected error posting to X: {e}")
171
+
else:
172
+
logger.error("Failed to post reply")
142
173
return None
143
174
144
175
def load_x_config(config_path: str = "config.yaml") -> Dict[str, str]:
···
159
190
def create_x_client(config_path: str = "config.yaml") -> XClient:
160
191
"""Create and return an X client with configuration loaded from file."""
161
192
config = load_x_config(config_path)
162
-
return XClient(config['api_key'], config['user_id'])
193
+
return XClient(
194
+
api_key=config['api_key'],
195
+
user_id=config['user_id'],
196
+
access_token=config.get('access_token'),
197
+
consumer_key=config.get('consumer_key'),
198
+
consumer_secret=config.get('consumer_secret'),
199
+
access_token_secret=config.get('access_token_secret')
200
+
)
163
201
164
202
def mention_to_yaml_string(mention: Dict, users_data: Optional[Dict] = None) -> str:
165
203
"""