a digital person for bluesky
at main 8.3 kB view raw
1"""Post tool for creating Bluesky posts.""" 2from typing import List, Optional 3from pydantic import BaseModel, Field, validator 4 5 6class PostArgs(BaseModel): 7 text: List[str] = Field( 8 ..., 9 description="List of texts to create posts (each max 300 characters). Single item creates one post, multiple items create a thread." 10 ) 11 lang: Optional[str] = Field( 12 default="en-US", 13 description="Language code for the posts (e.g., 'en-US', 'es', 'ja', 'th'). Defaults to 'en-US'" 14 ) 15 16 @validator('text') 17 def validate_text_list(cls, v): 18 if not v or len(v) == 0: 19 raise ValueError("Text list cannot be empty") 20 return v 21 22 23def create_new_bluesky_post(text: List[str], lang: str = "en-US") -> str: 24 """ 25 Create a NEW standalone post on Bluesky. This tool creates independent posts that 26 start new conversations. 27 28 IMPORTANT: This tool is ONLY for creating new posts. To reply to an existing post, 29 use reply_to_bluesky_post instead. 30 31 Args: 32 text: List of post contents (each max 300 characters). Single item creates one post, multiple items create a thread. 33 lang: Language code for the posts (e.g., 'en-US', 'es', 'ja', 'th'). Defaults to 'en-US' 34 35 Returns: 36 Success message with post URL(s) 37 38 Raises: 39 Exception: If the post fails or list is empty 40 """ 41 import os 42 import requests 43 from datetime import datetime, timezone 44 45 try: 46 # Validate input 47 if not text or len(text) == 0: 48 raise Exception("Text list cannot be empty") 49 50 # Validate character limits for all posts 51 for i, post_text in enumerate(text): 52 if len(post_text) > 300: 53 raise Exception(f"Post {i+1} exceeds 300 character limit (current: {len(post_text)} characters)") 54 55 # Get credentials from environment 56 username = os.getenv("BSKY_USERNAME") 57 password = os.getenv("BSKY_PASSWORD") 58 pds_host = os.getenv("PDS_URI", "https://bsky.social") 59 60 if not username or not password: 61 raise Exception("BSKY_USERNAME and BSKY_PASSWORD environment variables must be set") 62 63 # Create session 64 session_url = f"{pds_host}/xrpc/com.atproto.server.createSession" 65 session_data = { 66 "identifier": username, 67 "password": password 68 } 69 70 session_response = requests.post(session_url, json=session_data, timeout=10) 71 session_response.raise_for_status() 72 session = session_response.json() 73 access_token = session.get("accessJwt") 74 user_did = session.get("did") 75 76 if not access_token or not user_did: 77 raise Exception("Failed to get access token or DID from session") 78 79 # Create posts (single or thread) 80 import re 81 headers = {"Authorization": f"Bearer {access_token}"} 82 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 83 84 post_urls = [] 85 previous_post = None 86 root_post = None 87 88 for i, post_text in enumerate(text): 89 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 90 91 post_record = { 92 "$type": "app.bsky.feed.post", 93 "text": post_text, 94 "createdAt": now, 95 "langs": [lang] 96 } 97 98 # If this is part of a thread (not the first post), add reply references 99 if previous_post: 100 post_record["reply"] = { 101 "root": root_post, 102 "parent": previous_post 103 } 104 105 # Add facets for mentions and URLs 106 facets = [] 107 108 # Parse mentions - fixed to handle @ at start of text 109 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 110 text_bytes = post_text.encode("UTF-8") 111 112 for m in re.finditer(mention_regex, text_bytes): 113 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix 114 # Adjust byte positions to account for the optional prefix 115 mention_start = m.start(1) 116 mention_end = m.end(1) 117 try: 118 resolve_resp = requests.get( 119 f"{pds_host}/xrpc/com.atproto.identity.resolveHandle", 120 params={"handle": handle}, 121 timeout=5 122 ) 123 if resolve_resp.status_code == 200: 124 did = resolve_resp.json()["did"] 125 facets.append({ 126 "index": { 127 "byteStart": mention_start, 128 "byteEnd": mention_end, 129 }, 130 "features": [{"$type": "app.bsky.richtext.facet#mention", "did": did}], 131 }) 132 except: 133 continue 134 135 # Parse URLs - fixed to handle URLs at start of text 136 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 137 138 for m in re.finditer(url_regex, text_bytes): 139 url = m.group(1).decode("UTF-8") 140 # Adjust byte positions to account for the optional prefix 141 url_start = m.start(1) 142 url_end = m.end(1) 143 facets.append({ 144 "index": { 145 "byteStart": url_start, 146 "byteEnd": url_end, 147 }, 148 "features": [{"$type": "app.bsky.richtext.facet#link", "uri": url}], 149 }) 150 151 # Parse hashtags 152 hashtag_regex = rb"(?:^|[$|\s])#([a-zA-Z0-9_]+)" 153 154 for m in re.finditer(hashtag_regex, text_bytes): 155 tag = m.group(1).decode("UTF-8") # Get tag without # prefix 156 # Get byte positions for the entire hashtag including # 157 tag_start = m.start(0) 158 # Adjust start if there's a space/prefix 159 if text_bytes[tag_start:tag_start+1] in (b' ', b'$'): 160 tag_start += 1 161 tag_end = m.end(0) 162 facets.append({ 163 "index": { 164 "byteStart": tag_start, 165 "byteEnd": tag_end, 166 }, 167 "features": [{"$type": "app.bsky.richtext.facet#tag", "tag": tag}], 168 }) 169 170 if facets: 171 post_record["facets"] = facets 172 173 # Create the post 174 create_data = { 175 "repo": user_did, 176 "collection": "app.bsky.feed.post", 177 "record": post_record 178 } 179 180 post_response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 181 post_response.raise_for_status() 182 result = post_response.json() 183 184 post_uri = result.get("uri") 185 post_cid = result.get("cid") 186 handle = session.get("handle", username) 187 rkey = post_uri.split("/")[-1] if post_uri else "" 188 post_url = f"https://bsky.app/profile/{handle}/post/{rkey}" 189 post_urls.append(post_url) 190 191 # Set up references for thread continuation 192 previous_post = {"uri": post_uri, "cid": post_cid} 193 if i == 0: 194 root_post = previous_post 195 196 # Return appropriate message based on single post or thread 197 if len(text) == 1: 198 return f"Successfully posted to Bluesky!\nPost URL: {post_urls[0]}\nText: {text[0]}\nLanguage: {lang}" 199 else: 200 urls_text = "\n".join([f"Post {i+1}: {url}" for i, url in enumerate(post_urls)]) 201 return f"Successfully created thread with {len(text)} posts!\n{urls_text}\nLanguage: {lang}" 202 203 except Exception as e: 204 raise Exception(f"Error posting to Bluesky: {str(e)}")