a digital person for bluesky

Refactor tools from class-based to function-based implementation

Convert SearchBlueskyTool, PostToBlueskyTool, and GetBlueskyFeedTool
from BaseTool classes to standalone functions. This simplifies the
tool architecture and aligns with the updated tool registration system.

Also update error handling to throw exceptions instead of returning
error strings, following the new coding principles.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+325 -362
tools
+121 -119
tools/feed.py
··· 1 """Feed tool for retrieving Bluesky feeds.""" 2 - from typing import List, Type, Optional 3 from pydantic import BaseModel, Field 4 - from letta_client.client import BaseTool 5 6 7 class FeedArgs(BaseModel): ··· 9 max_posts: int = Field(default=25, description="Maximum number of posts to retrieve (max 100)") 10 11 12 - class GetBlueskyFeedTool(BaseTool): 13 - name: str = "get_bluesky_feed" 14 - args_schema: Type[BaseModel] = FeedArgs 15 - description: str = "Retrieve a Bluesky feed (home timeline or custom feed)" 16 - tags: List[str] = ["bluesky", "feed", "timeline"] 17 18 - def run(self, feed_uri: Optional[str] = None, max_posts: int = 25) -> str: 19 - """Retrieve a Bluesky feed.""" 20 - import os 21 - import yaml 22 - import requests 23 24 try: 25 - # Validate inputs 26 - max_posts = min(max_posts, 100) 27 28 - # Get credentials from environment 29 - username = os.getenv("BSKY_USERNAME") 30 - password = os.getenv("BSKY_PASSWORD") 31 - pds_host = os.getenv("PDS_URI", "https://bsky.social") 32 - 33 - if not username or not password: 34 - return "Error: BSKY_USERNAME and BSKY_PASSWORD environment variables must be set" 35 - 36 - # Create session 37 - session_url = f"{pds_host}/xrpc/com.atproto.server.createSession" 38 - session_data = { 39 - "identifier": username, 40 - "password": password 41 } 42 - 43 - try: 44 - session_response = requests.post(session_url, json=session_data, timeout=10) 45 - session_response.raise_for_status() 46 - session = session_response.json() 47 - access_token = session.get("accessJwt") 48 - 49 - if not access_token: 50 - return "Error: Failed to get access token from session" 51 - except Exception as e: 52 - return f"Error: Authentication failed. ({str(e)})" 53 - 54 - # Get feed 55 - headers = {"Authorization": f"Bearer {access_token}"} 56 - 57 - if feed_uri: 58 - # Custom feed 59 - feed_url = f"{pds_host}/xrpc/app.bsky.feed.getFeed" 60 - params = { 61 - "feed": feed_uri, 62 - "limit": max_posts 63 - } 64 - feed_type = "custom" 65 - feed_name = feed_uri.split('/')[-1] if '/' in feed_uri else feed_uri 66 - else: 67 - # Home timeline 68 - feed_url = f"{pds_host}/xrpc/app.bsky.feed.getTimeline" 69 - params = { 70 - "limit": max_posts 71 - } 72 - feed_type = "home" 73 - feed_name = "timeline" 74 75 - try: 76 - response = requests.get(feed_url, headers=headers, params=params, timeout=10) 77 - response.raise_for_status() 78 - feed_data = response.json() 79 - except Exception as e: 80 - return f"Error: Failed to get feed. ({str(e)})" 81 82 - # Format posts 83 - posts = [] 84 - for item in feed_data.get("feed", []): 85 - post = item.get("post", {}) 86 - author = post.get("author", {}) 87 - record = post.get("record", {}) 88 - 89 - post_data = { 90 - "author": { 91 - "handle": author.get("handle", ""), 92 - "display_name": author.get("displayName", ""), 93 - }, 94 - "text": record.get("text", ""), 95 - "created_at": record.get("createdAt", ""), 96 - "uri": post.get("uri", ""), 97 - "cid": post.get("cid", ""), 98 - "like_count": post.get("likeCount", 0), 99 - "repost_count": post.get("repostCount", 0), 100 - "reply_count": post.get("replyCount", 0), 101 - } 102 - 103 - # Add repost info if present 104 - if "reason" in item and item["reason"]: 105 - reason = item["reason"] 106 - if reason.get("$type") == "app.bsky.feed.defs#reasonRepost": 107 - by = reason.get("by", {}) 108 - post_data["reposted_by"] = { 109 - "handle": by.get("handle", ""), 110 - "display_name": by.get("displayName", ""), 111 - } 112 - 113 - # Add reply info if present 114 - if "reply" in record and record["reply"]: 115 - parent = record["reply"].get("parent", {}) 116 - post_data["reply_to"] = { 117 - "uri": parent.get("uri", ""), 118 - "cid": parent.get("cid", ""), 119 } 120 - 121 - posts.append(post_data) 122 123 - # Format response 124 - feed_result = { 125 - "feed": { 126 - "type": feed_type, 127 - "name": feed_name, 128 - "post_count": len(posts), 129 - "posts": posts 130 } 131 - } 132 133 - if feed_uri: 134 - feed_result["feed"]["uri"] = feed_uri 135 - 136 - return yaml.dump(feed_result, default_flow_style=False, sort_keys=False) 137 - 138 - except Exception as e: 139 - return f"Error retrieving feed: {str(e)}"
··· 1 """Feed tool for retrieving Bluesky feeds.""" 2 from pydantic import BaseModel, Field 3 + from typing import Optional 4 5 6 class FeedArgs(BaseModel): ··· 8 max_posts: int = Field(default=25, description="Maximum number of posts to retrieve (max 100)") 9 10 11 + def get_bluesky_feed(feed_uri: str = None, max_posts: int = 25) -> str: 12 + """ 13 + Retrieve a Bluesky feed (home timeline or custom feed). 14 15 + Args: 16 + feed_uri: Custom feed URI (e.g., 'at://did:plc:abc/app.bsky.feed.generator/feed-name'). If not provided, returns home timeline 17 + max_posts: Maximum number of posts to retrieve (max 100) 18 + 19 + Returns: 20 + YAML-formatted feed data with posts and metadata 21 + """ 22 + import os 23 + import yaml 24 + import requests 25 + 26 + try: 27 + # Validate inputs 28 + max_posts = min(max_posts, 100) 29 + 30 + # Get credentials from environment 31 + username = os.getenv("BSKY_USERNAME") 32 + password = os.getenv("BSKY_PASSWORD") 33 + pds_host = os.getenv("PDS_URI", "https://bsky.social") 34 + 35 + if not username or not password: 36 + raise Exception("BSKY_USERNAME and BSKY_PASSWORD environment variables must be set") 37 + 38 + # Create session 39 + session_url = f"{pds_host}/xrpc/com.atproto.server.createSession" 40 + session_data = { 41 + "identifier": username, 42 + "password": password 43 + } 44 45 try: 46 + session_response = requests.post(session_url, json=session_data, timeout=10) 47 + session_response.raise_for_status() 48 + session = session_response.json() 49 + access_token = session.get("accessJwt") 50 51 + if not access_token: 52 + raise Exception("Failed to get access token from session") 53 + except Exception as e: 54 + raise Exception(f"Authentication failed. ({str(e)})") 55 + 56 + # Get feed 57 + headers = {"Authorization": f"Bearer {access_token}"} 58 + 59 + if feed_uri: 60 + # Custom feed 61 + feed_url = f"{pds_host}/xrpc/app.bsky.feed.getFeed" 62 + params = { 63 + "feed": feed_uri, 64 + "limit": max_posts 65 } 66 + feed_type = "custom" 67 + feed_name = feed_uri.split('/')[-1] if '/' in feed_uri else feed_uri 68 + else: 69 + # Home timeline 70 + feed_url = f"{pds_host}/xrpc/app.bsky.feed.getTimeline" 71 + params = { 72 + "limit": max_posts 73 + } 74 + feed_type = "home" 75 + feed_name = "timeline" 76 + 77 + try: 78 + response = requests.get(feed_url, headers=headers, params=params, timeout=10) 79 + response.raise_for_status() 80 + feed_data = response.json() 81 + except Exception as e: 82 + raise Exception(f"Failed to get feed. ({str(e)})") 83 + 84 + # Format posts 85 + posts = [] 86 + for item in feed_data.get("feed", []): 87 + post = item.get("post", {}) 88 + author = post.get("author", {}) 89 + record = post.get("record", {}) 90 91 + post_data = { 92 + "author": { 93 + "handle": author.get("handle", ""), 94 + "display_name": author.get("displayName", ""), 95 + }, 96 + "text": record.get("text", ""), 97 + "created_at": record.get("createdAt", ""), 98 + "uri": post.get("uri", ""), 99 + "cid": post.get("cid", ""), 100 + "like_count": post.get("likeCount", 0), 101 + "repost_count": post.get("repostCount", 0), 102 + "reply_count": post.get("replyCount", 0), 103 + } 104 105 + # Add repost info if present 106 + if "reason" in item and item["reason"]: 107 + reason = item["reason"] 108 + if reason.get("$type") == "app.bsky.feed.defs#reasonRepost": 109 + by = reason.get("by", {}) 110 + post_data["reposted_by"] = { 111 + "handle": by.get("handle", ""), 112 + "display_name": by.get("displayName", ""), 113 } 114 115 + # Add reply info if present 116 + if "reply" in record and record["reply"]: 117 + parent = record["reply"].get("parent", {}) 118 + post_data["reply_to"] = { 119 + "uri": parent.get("uri", ""), 120 + "cid": parent.get("cid", ""), 121 } 122 123 + posts.append(post_data) 124 + 125 + # Format response 126 + feed_result = { 127 + "feed": { 128 + "type": feed_type, 129 + "name": feed_name, 130 + "post_count": len(posts), 131 + "posts": posts 132 + } 133 + } 134 + 135 + if feed_uri: 136 + feed_result["feed"]["uri"] = feed_uri 137 + 138 + return yaml.dump(feed_result, default_flow_style=False, sort_keys=False) 139 + 140 + except Exception as e: 141 + raise Exception(f"Error retrieving feed: {str(e)}")
+100 -143
tools/post.py
··· 1 """Post tool for creating Bluesky posts.""" 2 - from typing import List, Type 3 from pydantic import BaseModel, Field 4 - from letta_client.client import BaseTool 5 6 7 class PostArgs(BaseModel): 8 text: str = Field(..., description="The text content to post (max 300 characters)") 9 10 11 - class PostToBlueskyTool(BaseTool): 12 - name: str = "post_to_bluesky" 13 - args_schema: Type[BaseModel] = PostArgs 14 - description: str = "Post a message to Bluesky" 15 - tags: List[str] = ["bluesky", "post", "create"] 16 17 - def run(self, text: str) -> str: 18 - """ 19 - Post a message to Bluesky. 20 21 - Args: 22 - text: The text content to post (max 300 characters) 23 - 24 - Returns: 25 - Success message with post URL if successful, error message if failed 26 - """ 27 - import os 28 import re 29 - import requests 30 - from datetime import datetime, timezone 31 32 - try: 33 - # Validate character limit 34 - if len(text) > 300: 35 - return f"Error: Post exceeds 300 character limit (current: {len(text)} characters)" 36 - 37 - # Get credentials from environment 38 - username = os.getenv("BSKY_USERNAME") 39 - password = os.getenv("BSKY_PASSWORD") 40 - pds_host = os.getenv("PDS_URI", "https://bsky.social") 41 - 42 - if not username or not password: 43 - return "Error: BSKY_USERNAME and BSKY_PASSWORD environment variables must be set" 44 - 45 - # Create session 46 - session_url = f"{pds_host}/xrpc/com.atproto.server.createSession" 47 - session_data = { 48 - "identifier": username, 49 - "password": password 50 - } 51 - 52 try: 53 - session_response = requests.post(session_url, json=session_data, timeout=10) 54 - session_response.raise_for_status() 55 - session = session_response.json() 56 - access_token = session.get("accessJwt") 57 - user_did = session.get("did") 58 - 59 - if not access_token or not user_did: 60 - return "Error: Failed to get access token or DID from session" 61 - except Exception as e: 62 - return f"Error: Authentication failed. ({str(e)})" 63 - 64 - # Helper function to parse mentions and create facets 65 - def parse_mentions(text: str): 66 - facets = [] 67 - # Regex for mentions based on Bluesky handle syntax 68 - mention_regex = rb"[$|\W](@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 69 - text_bytes = text.encode("UTF-8") 70 - 71 - for m in re.finditer(mention_regex, text_bytes): 72 - handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix 73 - 74 - # Resolve handle to DID 75 - try: 76 - resolve_resp = requests.get( 77 - f"{pds_host}/xrpc/com.atproto.identity.resolveHandle", 78 - params={"handle": handle}, 79 - timeout=5 80 - ) 81 - if resolve_resp.status_code == 200: 82 - did = resolve_resp.json()["did"] 83 - facets.append({ 84 - "index": { 85 - "byteStart": m.start(1), 86 - "byteEnd": m.end(1), 87 - }, 88 - "features": [{"$type": "app.bsky.richtext.facet#mention", "did": did}], 89 - }) 90 - except: 91 - # If handle resolution fails, skip this mention 92 - continue 93 - 94 - return facets 95 - 96 - # Helper function to parse URLs and create facets 97 - def parse_urls(text: str): 98 - facets = [] 99 - # URL regex 100 - url_regex = rb"[$|\W](https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 101 - text_bytes = text.encode("UTF-8") 102 - 103 - for m in re.finditer(url_regex, text_bytes): 104 - url = m.group(1).decode("UTF-8") 105 facets.append({ 106 "index": { 107 "byteStart": m.start(1), 108 "byteEnd": m.end(1), 109 }, 110 - "features": [ 111 - { 112 - "$type": "app.bsky.richtext.facet#link", 113 - "uri": url, 114 - } 115 - ], 116 }) 117 - 118 - return facets 119 - 120 - 121 - # Build the post record 122 - now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 123 - 124 - post_record = { 125 - "$type": "app.bsky.feed.post", 126 - "text": text, 127 - "createdAt": now, 128 - } 129 - 130 - # Add facets for mentions and links 131 - facets = parse_mentions(text) + parse_urls(text) 132 - if facets: 133 - post_record["facets"] = facets 134 - 135 - # Create the post 136 - try: 137 - create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 138 - headers = {"Authorization": f"Bearer {access_token}"} 139 - 140 - create_data = { 141 - "repo": user_did, 142 - "collection": "app.bsky.feed.post", 143 - "record": post_record 144 - } 145 - 146 - post_response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 147 - post_response.raise_for_status() 148 - result = post_response.json() 149 - 150 - post_uri = result.get("uri") 151 - # Extract handle from session if available 152 - handle = session.get("handle", username) 153 - rkey = post_uri.split("/")[-1] if post_uri else "" 154 - post_url = f"https://bsky.app/profile/{handle}/post/{rkey}" 155 - 156 - return f"Successfully posted to Bluesky!\nPost URL: {post_url}\nText: {text}" 157 - 158 - except Exception as e: 159 - return f"Error: Failed to create post. ({str(e)})" 160 - 161 - except Exception as e: 162 - return f"Error posting to Bluesky: {str(e)}"
··· 1 """Post tool for creating Bluesky posts.""" 2 from pydantic import BaseModel, Field 3 4 5 class PostArgs(BaseModel): 6 text: str = Field(..., description="The text content to post (max 300 characters)") 7 8 9 + def post_to_bluesky(text: str) -> str: 10 + """Post a message to Bluesky.""" 11 + import os 12 + import requests 13 + from datetime import datetime, timezone 14 15 + try: 16 + # Validate character limit 17 + if len(text) > 300: 18 + raise Exception(f"Post exceeds 300 character limit (current: {len(text)} characters)") 19 + 20 + # Get credentials from environment 21 + username = os.getenv("BSKY_USERNAME") 22 + password = os.getenv("BSKY_PASSWORD") 23 + pds_host = os.getenv("PDS_URI", "https://bsky.social") 24 + 25 + if not username or not password: 26 + raise Exception("BSKY_USERNAME and BSKY_PASSWORD environment variables must be set") 27 + 28 + # Create session 29 + session_url = f"{pds_host}/xrpc/com.atproto.server.createSession" 30 + session_data = { 31 + "identifier": username, 32 + "password": password 33 + } 34 35 + session_response = requests.post(session_url, json=session_data, timeout=10) 36 + session_response.raise_for_status() 37 + session = session_response.json() 38 + access_token = session.get("accessJwt") 39 + user_did = session.get("did") 40 + 41 + if not access_token or not user_did: 42 + raise Exception("Failed to get access token or DID from session") 43 + 44 + # Build post record with facets for mentions and URLs 45 + now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 46 + 47 + post_record = { 48 + "$type": "app.bsky.feed.post", 49 + "text": text, 50 + "createdAt": now, 51 + } 52 + 53 + # Add facets for mentions and URLs 54 import re 55 + facets = [] 56 57 + # Parse mentions 58 + mention_regex = rb"[$|\W](@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 59 + text_bytes = text.encode("UTF-8") 60 + 61 + for m in re.finditer(mention_regex, text_bytes): 62 + handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix 63 try: 64 + resolve_resp = requests.get( 65 + f"{pds_host}/xrpc/com.atproto.identity.resolveHandle", 66 + params={"handle": handle}, 67 + timeout=5 68 + ) 69 + if resolve_resp.status_code == 200: 70 + did = resolve_resp.json()["did"] 71 facets.append({ 72 "index": { 73 "byteStart": m.start(1), 74 "byteEnd": m.end(1), 75 }, 76 + "features": [{"$type": "app.bsky.richtext.facet#mention", "did": did}], 77 }) 78 + except: 79 + continue 80 + 81 + # Parse URLs 82 + url_regex = rb"[$|\W](https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 83 + 84 + for m in re.finditer(url_regex, text_bytes): 85 + url = m.group(1).decode("UTF-8") 86 + facets.append({ 87 + "index": { 88 + "byteStart": m.start(1), 89 + "byteEnd": m.end(1), 90 + }, 91 + "features": [{"$type": "app.bsky.richtext.facet#link", "uri": url}], 92 + }) 93 + 94 + if facets: 95 + post_record["facets"] = facets 96 + 97 + # Create the post 98 + create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 99 + headers = {"Authorization": f"Bearer {access_token}"} 100 + 101 + create_data = { 102 + "repo": user_did, 103 + "collection": "app.bsky.feed.post", 104 + "record": post_record 105 + } 106 + 107 + post_response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 108 + post_response.raise_for_status() 109 + result = post_response.json() 110 + 111 + post_uri = result.get("uri") 112 + handle = session.get("handle", username) 113 + rkey = post_uri.split("/")[-1] if post_uri else "" 114 + post_url = f"https://bsky.app/profile/{handle}/post/{rkey}" 115 + 116 + return f"Successfully posted to Bluesky!\nPost URL: {post_url}\nText: {text}" 117 + 118 + except Exception as e: 119 + raise Exception(f"Error posting to Bluesky: {str(e)}")
+104 -100
tools/search.py
··· 1 """Search tool for Bluesky posts.""" 2 - from typing import List, Type, Optional 3 from pydantic import BaseModel, Field 4 - from letta_client.client import BaseTool 5 6 7 class SearchArgs(BaseModel): ··· 11 sort: str = Field(default="latest", description="Sort order: 'latest' or 'top'") 12 13 14 - class SearchBlueskyTool(BaseTool): 15 - name: str = "search_bluesky_posts" 16 - args_schema: Type[BaseModel] = SearchArgs 17 - description: str = "Search for posts on Bluesky matching the given criteria" 18 - tags: List[str] = ["bluesky", "search", "posts"] 19 20 - def run(self, query: str, max_results: int = 25, author: Optional[str] = None, sort: str = "latest") -> str: 21 - """Search for posts on Bluesky.""" 22 - import os 23 - import yaml 24 - import requests 25 - from datetime import datetime 26 27 try: 28 - # Validate inputs 29 - max_results = min(max_results, 100) 30 - if sort not in ["latest", "top"]: 31 - sort = "latest" 32 - 33 - # Build search query 34 - search_query = query 35 - if author: 36 - search_query = f"from:{author} {query}" 37 38 - # Get credentials from environment 39 - username = os.getenv("BSKY_USERNAME") 40 - password = os.getenv("BSKY_PASSWORD") 41 - pds_host = os.getenv("PDS_URI", "https://bsky.social") 42 - 43 - if not username or not password: 44 - return "Error: BSKY_USERNAME and BSKY_PASSWORD environment variables must be set" 45 - 46 - # Create session 47 - session_url = f"{pds_host}/xrpc/com.atproto.server.createSession" 48 - session_data = { 49 - "identifier": username, 50 - "password": password 51 - } 52 - 53 - try: 54 - session_response = requests.post(session_url, json=session_data, timeout=10) 55 - session_response.raise_for_status() 56 - session = session_response.json() 57 - access_token = session.get("accessJwt") 58 - 59 - if not access_token: 60 - return "Error: Failed to get access token from session" 61 - except Exception as e: 62 - return f"Error: Authentication failed. ({str(e)})" 63 64 - # Search posts 65 - headers = {"Authorization": f"Bearer {access_token}"} 66 - search_url = f"{pds_host}/xrpc/app.bsky.feed.searchPosts" 67 - params = { 68 - "q": search_query, 69 - "limit": max_results, 70 - "sort": sort 71 } 72 73 - try: 74 - response = requests.get(search_url, headers=headers, params=params, timeout=10) 75 - response.raise_for_status() 76 - search_data = response.json() 77 - except Exception as e: 78 - return f"Error: Search failed. ({str(e)})" 79 - 80 - # Format results 81 - results = [] 82 - for post in search_data.get("posts", []): 83 - author = post.get("author", {}) 84 - record = post.get("record", {}) 85 - 86 - post_data = { 87 - "author": { 88 - "handle": author.get("handle", ""), 89 - "display_name": author.get("displayName", ""), 90 - }, 91 - "text": record.get("text", ""), 92 - "created_at": record.get("createdAt", ""), 93 - "uri": post.get("uri", ""), 94 - "cid": post.get("cid", ""), 95 - "like_count": post.get("likeCount", 0), 96 - "repost_count": post.get("repostCount", 0), 97 - "reply_count": post.get("replyCount", 0), 98 } 99 - 100 - # Add reply info if present 101 - if "reply" in record and record["reply"]: 102 - post_data["reply_to"] = { 103 - "uri": record["reply"].get("parent", {}).get("uri", ""), 104 - "cid": record["reply"].get("parent", {}).get("cid", ""), 105 - } 106 - 107 - results.append(post_data) 108 109 - return yaml.dump({ 110 - "search_results": { 111 - "query": query, 112 - "author_filter": author, 113 - "sort": sort, 114 - "result_count": len(results), 115 - "posts": results 116 - } 117 - }, default_flow_style=False, sort_keys=False) 118 - 119 - except Exception as e: 120 - return f"Error searching Bluesky: {str(e)}"
··· 1 """Search tool for Bluesky posts.""" 2 from pydantic import BaseModel, Field 3 + from typing import Optional 4 5 6 class SearchArgs(BaseModel): ··· 10 sort: str = Field(default="latest", description="Sort order: 'latest' or 'top'") 11 12 13 + def search_bluesky_posts(query: str, max_results: int = 25, author: str = None, sort: str = "latest") -> str: 14 + """ 15 + Search for posts on Bluesky matching the given criteria. 16 17 + Args: 18 + query: Search query string 19 + max_results: Maximum number of results to return (max 100) 20 + author: Filter by author handle (e.g., 'user.bsky.social') 21 + sort: Sort order: 'latest' or 'top' 22 + 23 + Returns: 24 + YAML-formatted search results with posts and metadata 25 + """ 26 + import os 27 + import yaml 28 + import requests 29 + from datetime import datetime 30 + 31 + try: 32 + # Validate inputs 33 + max_results = min(max_results, 100) 34 + if sort not in ["latest", "top"]: 35 + sort = "latest" 36 + 37 + # Build search query 38 + search_query = query 39 + if author: 40 + search_query = f"from:{author} {query}" 41 + 42 + # Get credentials from environment 43 + username = os.getenv("BSKY_USERNAME") 44 + password = os.getenv("BSKY_PASSWORD") 45 + pds_host = os.getenv("PDS_URI", "https://bsky.social") 46 + 47 + if not username or not password: 48 + raise Exception("BSKY_USERNAME and BSKY_PASSWORD environment variables must be set") 49 + 50 + # Create session 51 + session_url = f"{pds_host}/xrpc/com.atproto.server.createSession" 52 + session_data = { 53 + "identifier": username, 54 + "password": password 55 + } 56 57 try: 58 + session_response = requests.post(session_url, json=session_data, timeout=10) 59 + session_response.raise_for_status() 60 + session = session_response.json() 61 + access_token = session.get("accessJwt") 62 63 + if not access_token: 64 + raise Exception("Failed to get access token from session") 65 + except Exception as e: 66 + raise Exception(f"Authentication failed. ({str(e)})") 67 + 68 + # Search posts 69 + headers = {"Authorization": f"Bearer {access_token}"} 70 + search_url = f"{pds_host}/xrpc/app.bsky.feed.searchPosts" 71 + params = { 72 + "q": search_query, 73 + "limit": max_results, 74 + "sort": sort 75 + } 76 + 77 + try: 78 + response = requests.get(search_url, headers=headers, params=params, timeout=10) 79 + response.raise_for_status() 80 + search_data = response.json() 81 + except Exception as e: 82 + raise Exception(f"Search failed. ({str(e)})") 83 + 84 + # Format results 85 + results = [] 86 + for post in search_data.get("posts", []): 87 + author = post.get("author", {}) 88 + record = post.get("record", {}) 89 90 + post_data = { 91 + "author": { 92 + "handle": author.get("handle", ""), 93 + "display_name": author.get("displayName", ""), 94 + }, 95 + "text": record.get("text", ""), 96 + "created_at": record.get("createdAt", ""), 97 + "uri": post.get("uri", ""), 98 + "cid": post.get("cid", ""), 99 + "like_count": post.get("likeCount", 0), 100 + "repost_count": post.get("repostCount", 0), 101 + "reply_count": post.get("replyCount", 0), 102 } 103 104 + # Add reply info if present 105 + if "reply" in record and record["reply"]: 106 + post_data["reply_to"] = { 107 + "uri": record["reply"].get("parent", {}).get("uri", ""), 108 + "cid": record["reply"].get("parent", {}).get("cid", ""), 109 } 110 111 + results.append(post_data) 112 + 113 + return yaml.dump({ 114 + "search_results": { 115 + "query": query, 116 + "author_filter": author, 117 + "sort": sort, 118 + "result_count": len(results), 119 + "posts": results 120 + } 121 + }, default_flow_style=False, sort_keys=False) 122 + 123 + except Exception as e: 124 + raise Exception(f"Error searching Bluesky: {str(e)}")