a digital person for bluesky
at main 4.5 kB view raw
1"""Search tool for Bluesky posts.""" 2from pydantic import BaseModel, Field 3from typing import Optional 4 5 6class SearchArgs(BaseModel): 7 query: str = Field(..., description="Search query string") 8 max_results: int = Field(default=25, description="Maximum number of results to return (max 100)") 9 author: Optional[str] = Field(None, description="Filter by author handle (e.g., 'user.bsky.social')") 10 sort: str = Field(default="latest", description="Sort order: 'latest' or 'top'") 11 12 13def search_bluesky_posts(query: str, max_results: int = 25, author: str = None, sort: str = "latest") -> str: 14 """ 15 Search for posts on Bluesky matching the given criteria. 16 17 Args: 18 query: Search query string 19 max_results: Maximum number of results to return (max 100) 20 author: Filter by author handle (e.g., 'user.bsky.social') 21 sort: Sort order: 'latest' or 'top' 22 23 Returns: 24 YAML-formatted search results with posts and metadata 25 """ 26 import os 27 import yaml 28 import requests 29 from datetime import datetime 30 31 try: 32 # Validate inputs 33 max_results = min(max_results, 100) 34 if sort not in ["latest", "top"]: 35 sort = "latest" 36 37 # Build search query 38 search_query = query 39 if author: 40 search_query = f"from:{author} {query}" 41 42 # Get credentials from environment 43 username = os.getenv("BSKY_USERNAME") 44 password = os.getenv("BSKY_PASSWORD") 45 pds_host = os.getenv("PDS_URI", "https://bsky.social") 46 47 if not username or not password: 48 raise Exception("BSKY_USERNAME and BSKY_PASSWORD environment variables must be set") 49 50 # Create session 51 session_url = f"{pds_host}/xrpc/com.atproto.server.createSession" 52 session_data = { 53 "identifier": username, 54 "password": password 55 } 56 57 try: 58 session_response = requests.post(session_url, json=session_data, timeout=10) 59 session_response.raise_for_status() 60 session = session_response.json() 61 access_token = session.get("accessJwt") 62 63 if not access_token: 64 raise Exception("Failed to get access token from session") 65 except Exception as e: 66 raise Exception(f"Authentication failed. ({str(e)})") 67 68 # Search posts 69 headers = {"Authorization": f"Bearer {access_token}"} 70 search_url = f"{pds_host}/xrpc/app.bsky.feed.searchPosts" 71 params = { 72 "q": search_query, 73 "limit": max_results, 74 "sort": sort 75 } 76 77 try: 78 response = requests.get(search_url, headers=headers, params=params, timeout=10) 79 response.raise_for_status() 80 search_data = response.json() 81 except Exception as e: 82 raise Exception(f"Search failed. ({str(e)})") 83 84 # Format results 85 results = [] 86 for post in search_data.get("posts", []): 87 author = post.get("author", {}) 88 record = post.get("record", {}) 89 90 post_data = { 91 "author": { 92 "handle": author.get("handle", ""), 93 "display_name": author.get("displayName", ""), 94 }, 95 "text": record.get("text", ""), 96 "created_at": record.get("createdAt", ""), 97 "uri": post.get("uri", ""), 98 "cid": post.get("cid", ""), 99 "like_count": post.get("likeCount", 0), 100 "repost_count": post.get("repostCount", 0), 101 "reply_count": post.get("replyCount", 0), 102 } 103 104 # Add reply info if present 105 if "reply" in record and record["reply"]: 106 post_data["reply_to"] = { 107 "uri": record["reply"].get("parent", {}).get("uri", ""), 108 "cid": record["reply"].get("parent", {}).get("cid", ""), 109 } 110 111 results.append(post_data) 112 113 return yaml.dump({ 114 "search_results": { 115 "query": query, 116 "author_filter": author, 117 "sort": sort, 118 "result_count": len(results), 119 "posts": results 120 } 121 }, default_flow_style=False, sort_keys=False) 122 123 except Exception as e: 124 raise Exception(f"Error searching Bluesky: {str(e)}")