a digital person for bluesky
1"""Search tool for Bluesky posts."""
2from pydantic import BaseModel, Field
3from typing import Optional
4
5
6class SearchArgs(BaseModel):
7 query: str = Field(..., description="Search query string")
8 max_results: int = Field(default=25, description="Maximum number of results to return (max 100)")
9 author: Optional[str] = Field(None, description="Filter by author handle (e.g., 'user.bsky.social')")
10 sort: str = Field(default="latest", description="Sort order: 'latest' or 'top'")
11
12
13def search_bluesky_posts(query: str, max_results: int = 25, author: str = None, sort: str = "latest") -> str:
14 """
15 Search for posts on Bluesky matching the given criteria.
16
17 Args:
18 query: Search query string
19 max_results: Maximum number of results to return (max 100)
20 author: Filter by author handle (e.g., 'user.bsky.social')
21 sort: Sort order: 'latest' or 'top'
22
23 Returns:
24 YAML-formatted search results with posts and metadata
25 """
26 import os
27 import yaml
28 import requests
29 from datetime import datetime
30
31 try:
32 # Validate inputs
33 max_results = min(max_results, 100)
34 if sort not in ["latest", "top"]:
35 sort = "latest"
36
37 # Build search query
38 search_query = query
39 if author:
40 search_query = f"from:{author} {query}"
41
42 # Get credentials from environment
43 username = os.getenv("BSKY_USERNAME")
44 password = os.getenv("BSKY_PASSWORD")
45 pds_host = os.getenv("PDS_URI", "https://bsky.social")
46
47 if not username or not password:
48 raise Exception("BSKY_USERNAME and BSKY_PASSWORD environment variables must be set")
49
50 # Create session
51 session_url = f"{pds_host}/xrpc/com.atproto.server.createSession"
52 session_data = {
53 "identifier": username,
54 "password": password
55 }
56
57 try:
58 session_response = requests.post(session_url, json=session_data, timeout=10)
59 session_response.raise_for_status()
60 session = session_response.json()
61 access_token = session.get("accessJwt")
62
63 if not access_token:
64 raise Exception("Failed to get access token from session")
65 except Exception as e:
66 raise Exception(f"Authentication failed. ({str(e)})")
67
68 # Search posts
69 headers = {"Authorization": f"Bearer {access_token}"}
70 search_url = f"{pds_host}/xrpc/app.bsky.feed.searchPosts"
71 params = {
72 "q": search_query,
73 "limit": max_results,
74 "sort": sort
75 }
76
77 try:
78 response = requests.get(search_url, headers=headers, params=params, timeout=10)
79 response.raise_for_status()
80 search_data = response.json()
81 except Exception as e:
82 raise Exception(f"Search failed. ({str(e)})")
83
84 # Format results
85 results = []
86 for post in search_data.get("posts", []):
87 author = post.get("author", {})
88 record = post.get("record", {})
89
90 post_data = {
91 "author": {
92 "handle": author.get("handle", ""),
93 "display_name": author.get("displayName", ""),
94 },
95 "text": record.get("text", ""),
96 "created_at": record.get("createdAt", ""),
97 "uri": post.get("uri", ""),
98 "cid": post.get("cid", ""),
99 "like_count": post.get("likeCount", 0),
100 "repost_count": post.get("repostCount", 0),
101 "reply_count": post.get("replyCount", 0),
102 }
103
104 # Add reply info if present
105 if "reply" in record and record["reply"]:
106 post_data["reply_to"] = {
107 "uri": record["reply"].get("parent", {}).get("uri", ""),
108 "cid": record["reply"].get("parent", {}).get("cid", ""),
109 }
110
111 results.append(post_data)
112
113 return yaml.dump({
114 "search_results": {
115 "query": query,
116 "author_filter": author,
117 "sort": sort,
118 "result_count": len(results),
119 "posts": results
120 }
121 }, default_flow_style=False, sort_keys=False)
122
123 except Exception as e:
124 raise Exception(f"Error searching Bluesky: {str(e)}")