a digital person for bluesky
1"""Search tool for X (Twitter) posts."""
2from pydantic import BaseModel, Field
3from typing import Optional
4
5
6class SearchXArgs(BaseModel):
7 username: str = Field(..., description="X username to get recent posts from (without @)")
8 max_results: int = Field(default=10, description="Maximum number of posts to return (max 100)")
9 exclude_replies: bool = Field(default=False, description="Whether to exclude replies")
10 exclude_retweets: bool = Field(default=False, description="Whether to exclude retweets")
11
12
13def search_x_posts(username: str, max_results: int = 10, exclude_replies: bool = False, exclude_retweets: bool = False) -> str:
14 """
15 Get recent posts from a specific X (Twitter) user.
16
17 Args:
18 username: X username to get posts from (without @)
19 max_results: Maximum number of posts to return (max 100)
20 exclude_replies: Whether to exclude replies
21 exclude_retweets: Whether to exclude retweets
22
23 Returns:
24 YAML-formatted posts from the user
25 """
26 import os
27 import yaml
28 import requests
29 from datetime import datetime
30
31 try:
32 # Validate inputs
33 max_results = min(max_results, 100)
34
35 # Get credentials from environment
36 # These need to be set in the cloud environment
37 consumer_key = os.getenv("X_CONSUMER_KEY")
38 consumer_secret = os.getenv("X_CONSUMER_SECRET")
39 access_token = os.getenv("X_ACCESS_TOKEN")
40 access_token_secret = os.getenv("X_ACCESS_TOKEN_SECRET")
41
42 # Also check for bearer token as fallback
43 bearer_token = os.getenv("X_BEARER_TOKEN")
44
45 if not any([bearer_token, (consumer_key and consumer_secret and access_token and access_token_secret)]):
46 raise Exception("X API credentials not found in environment variables")
47
48 # First, we need to get the user ID from the username
49 base_url = "https://api.x.com/2"
50
51 # Set up authentication headers
52 if bearer_token:
53 headers = {
54 "Authorization": f"Bearer {bearer_token}",
55 "Content-Type": "application/json"
56 }
57 else:
58 # For OAuth 1.0a, we'd need requests_oauthlib
59 # Since this is a cloud function, we'll require bearer token for simplicity
60 raise Exception("Bearer token required for X API authentication in cloud environment")
61
62 # Get user ID from username
63 user_lookup_url = f"{base_url}/users/by/username/{username}"
64 user_params = {
65 "user.fields": "id,name,username,description"
66 }
67
68 try:
69 user_response = requests.get(user_lookup_url, headers=headers, params=user_params, timeout=10)
70 user_response.raise_for_status()
71 user_data = user_response.json()
72
73 if "data" not in user_data:
74 raise Exception(f"User @{username} not found")
75
76 user_id = user_data["data"]["id"]
77 user_info = user_data["data"]
78
79 except requests.exceptions.HTTPError as e:
80 if user_response.status_code == 404:
81 raise Exception(f"User @{username} not found")
82 else:
83 raise Exception(f"Failed to look up user @{username}: {str(e)}")
84
85 # Get user's recent tweets
86 tweets_url = f"{base_url}/users/{user_id}/tweets"
87
88 # Build query parameters
89 tweets_params = {
90 "max_results": max_results,
91 "tweet.fields": "id,text,author_id,created_at,referenced_tweets,conversation_id",
92 "exclude": []
93 }
94
95 # Add exclusions
96 if exclude_replies:
97 tweets_params["exclude"].append("replies")
98 if exclude_retweets:
99 tweets_params["exclude"].append("retweets")
100
101 # Join exclusions or remove if empty
102 if tweets_params["exclude"]:
103 tweets_params["exclude"] = ",".join(tweets_params["exclude"])
104 else:
105 del tweets_params["exclude"]
106
107 try:
108 tweets_response = requests.get(tweets_url, headers=headers, params=tweets_params, timeout=10)
109 tweets_response.raise_for_status()
110 tweets_data = tweets_response.json()
111 except Exception as e:
112 raise Exception(f"Failed to fetch posts from @{username}: {str(e)}")
113
114 # Format results
115 results = []
116 for tweet in tweets_data.get("data", []):
117 # Check if it's a retweet
118 is_retweet = False
119 referenced_tweets = tweet.get("referenced_tweets", [])
120 for ref in referenced_tweets:
121 if ref.get("type") == "retweeted":
122 is_retweet = True
123 break
124
125 tweet_data = {
126 "author": {
127 "handle": user_info.get("username", ""),
128 "display_name": user_info.get("name", ""),
129 },
130 "text": tweet.get("text", ""),
131 "created_at": tweet.get("created_at", ""),
132 "url": f"https://x.com/{username}/status/{tweet.get('id', '')}",
133 "id": tweet.get("id", ""),
134 "is_retweet": is_retweet
135 }
136
137 # Add conversation info if it's a reply
138 if tweet.get("conversation_id") and tweet.get("conversation_id") != tweet.get("id"):
139 tweet_data["conversation_id"] = tweet.get("conversation_id")
140
141 results.append(tweet_data)
142
143 return yaml.dump({
144 "x_user_posts": {
145 "user": {
146 "username": user_info.get("username"),
147 "name": user_info.get("name"),
148 "description": user_info.get("description", ""),
149 },
150 "post_count": len(results),
151 "posts": results
152 }
153 }, default_flow_style=False, sort_keys=False)
154
155 except Exception as e:
156 raise Exception(f"Error searching X posts: {str(e)}")