a digital person for bluesky
42
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 4ed3b0dd135addf4c728c174b886fdd87bb4851c 124 lines 4.5 kB view raw
1"""Search tool for Bluesky posts.""" 2from pydantic import BaseModel, Field 3from typing import Optional 4 5 6class SearchArgs(BaseModel): 7 query: str = Field(..., description="Search query string") 8 max_results: int = Field(default=25, description="Maximum number of results to return (max 100)") 9 author: Optional[str] = Field(None, description="Filter by author handle (e.g., 'user.bsky.social')") 10 sort: str = Field(default="latest", description="Sort order: 'latest' or 'top'") 11 12 13def search_bluesky_posts(query: str, max_results: int = 25, author: str = None, sort: str = "latest") -> str: 14 """ 15 Search for posts on Bluesky matching the given criteria. 16 17 Args: 18 query: Search query string 19 max_results: Maximum number of results to return (max 100) 20 author: Filter by author handle (e.g., 'user.bsky.social') 21 sort: Sort order: 'latest' or 'top' 22 23 Returns: 24 YAML-formatted search results with posts and metadata 25 """ 26 import os 27 import yaml 28 import requests 29 from datetime import datetime 30 31 try: 32 # Validate inputs 33 max_results = min(max_results, 100) 34 if sort not in ["latest", "top"]: 35 sort = "latest" 36 37 # Build search query 38 search_query = query 39 if author: 40 search_query = f"from:{author} {query}" 41 42 # Get credentials from environment 43 username = os.getenv("BSKY_USERNAME") 44 password = os.getenv("BSKY_PASSWORD") 45 pds_host = os.getenv("PDS_URI", "https://bsky.social") 46 47 if not username or not password: 48 raise Exception("BSKY_USERNAME and BSKY_PASSWORD environment variables must be set") 49 50 # Create session 51 session_url = f"{pds_host}/xrpc/com.atproto.server.createSession" 52 session_data = { 53 "identifier": username, 54 "password": password 55 } 56 57 try: 58 session_response = requests.post(session_url, json=session_data, timeout=10) 59 session_response.raise_for_status() 60 session = session_response.json() 61 access_token = session.get("accessJwt") 62 63 if not access_token: 64 raise Exception("Failed to get access token from session") 65 except Exception as e: 66 raise Exception(f"Authentication failed. ({str(e)})") 67 68 # Search posts 69 headers = {"Authorization": f"Bearer {access_token}"} 70 search_url = f"{pds_host}/xrpc/app.bsky.feed.searchPosts" 71 params = { 72 "q": search_query, 73 "limit": max_results, 74 "sort": sort 75 } 76 77 try: 78 response = requests.get(search_url, headers=headers, params=params, timeout=10) 79 response.raise_for_status() 80 search_data = response.json() 81 except Exception as e: 82 raise Exception(f"Search failed. ({str(e)})") 83 84 # Format results 85 results = [] 86 for post in search_data.get("posts", []): 87 author = post.get("author", {}) 88 record = post.get("record", {}) 89 90 post_data = { 91 "author": { 92 "handle": author.get("handle", ""), 93 "display_name": author.get("displayName", ""), 94 }, 95 "text": record.get("text", ""), 96 "created_at": record.get("createdAt", ""), 97 "uri": post.get("uri", ""), 98 "cid": post.get("cid", ""), 99 "like_count": post.get("likeCount", 0), 100 "repost_count": post.get("repostCount", 0), 101 "reply_count": post.get("replyCount", 0), 102 } 103 104 # Add reply info if present 105 if "reply" in record and record["reply"]: 106 post_data["reply_to"] = { 107 "uri": record["reply"].get("parent", {}).get("uri", ""), 108 "cid": record["reply"].get("parent", {}).get("cid", ""), 109 } 110 111 results.append(post_data) 112 113 return yaml.dump({ 114 "search_results": { 115 "query": query, 116 "author_filter": author, 117 "sort": sort, 118 "result_count": len(results), 119 "posts": results 120 } 121 }, default_flow_style=False, sort_keys=False) 122 123 except Exception as e: 124 raise Exception(f"Error searching Bluesky: {str(e)}")