a digital person for bluesky
1"""Post tool for creating Bluesky posts."""
2from typing import List, Optional
3from pydantic import BaseModel, Field, validator
4
5
6class PostArgs(BaseModel):
7 text: List[str] = Field(
8 ...,
9 description="List of texts to create posts (each max 300 characters). Single item creates one post, multiple items create a thread."
10 )
11 lang: Optional[str] = Field(
12 default="en-US",
13 description="Language code for the posts (e.g., 'en-US', 'es', 'ja', 'th'). Defaults to 'en-US'"
14 )
15
16 @validator('text')
17 def validate_text_list(cls, v):
18 if not v or len(v) == 0:
19 raise ValueError("Text list cannot be empty")
20 return v
21
22
23def create_new_bluesky_post(text: List[str], lang: str = "en-US") -> str:
24 """
25 Create a NEW standalone post on Bluesky. This tool creates independent posts that
26 start new conversations.
27
28 IMPORTANT: This tool is ONLY for creating new posts. To reply to an existing post,
29 use reply_to_bluesky_post instead.
30
31 Args:
32 text: List of post contents (each max 300 characters). Single item creates one post, multiple items create a thread.
33 lang: Language code for the posts (e.g., 'en-US', 'es', 'ja', 'th'). Defaults to 'en-US'
34
35 Returns:
36 Success message with post URL(s)
37
38 Raises:
39 Exception: If the post fails or list is empty
40 """
41 import os
42 import requests
43 from datetime import datetime, timezone
44
45 try:
46 # Validate input
47 if not text or len(text) == 0:
48 raise Exception("Text list cannot be empty")
49
50 # Validate character limits for all posts
51 for i, post_text in enumerate(text):
52 if len(post_text) > 300:
53 raise Exception(f"Post {i+1} exceeds 300 character limit (current: {len(post_text)} characters)")
54
55 # Get credentials from environment
56 username = os.getenv("BSKY_USERNAME")
57 password = os.getenv("BSKY_PASSWORD")
58 pds_host = os.getenv("PDS_URI", "https://bsky.social")
59
60 if not username or not password:
61 raise Exception("BSKY_USERNAME and BSKY_PASSWORD environment variables must be set")
62
63 # Create session
64 session_url = f"{pds_host}/xrpc/com.atproto.server.createSession"
65 session_data = {
66 "identifier": username,
67 "password": password
68 }
69
70 session_response = requests.post(session_url, json=session_data, timeout=10)
71 session_response.raise_for_status()
72 session = session_response.json()
73 access_token = session.get("accessJwt")
74 user_did = session.get("did")
75
76 if not access_token or not user_did:
77 raise Exception("Failed to get access token or DID from session")
78
79 # Create posts (single or thread)
80 import re
81 headers = {"Authorization": f"Bearer {access_token}"}
82 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
83
84 post_urls = []
85 previous_post = None
86 root_post = None
87
88 for i, post_text in enumerate(text):
89 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
90
91 post_record = {
92 "$type": "app.bsky.feed.post",
93 "text": post_text,
94 "createdAt": now,
95 "langs": [lang]
96 }
97
98 # If this is part of a thread (not the first post), add reply references
99 if previous_post:
100 post_record["reply"] = {
101 "root": root_post,
102 "parent": previous_post
103 }
104
105 # Add facets for mentions and URLs
106 facets = []
107
108 # Parse mentions - fixed to handle @ at start of text
109 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)"
110 text_bytes = post_text.encode("UTF-8")
111
112 for m in re.finditer(mention_regex, text_bytes):
113 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix
114 # Adjust byte positions to account for the optional prefix
115 mention_start = m.start(1)
116 mention_end = m.end(1)
117 try:
118 resolve_resp = requests.get(
119 f"{pds_host}/xrpc/com.atproto.identity.resolveHandle",
120 params={"handle": handle},
121 timeout=5
122 )
123 if resolve_resp.status_code == 200:
124 did = resolve_resp.json()["did"]
125 facets.append({
126 "index": {
127 "byteStart": mention_start,
128 "byteEnd": mention_end,
129 },
130 "features": [{"$type": "app.bsky.richtext.facet#mention", "did": did}],
131 })
132 except:
133 continue
134
135 # Parse URLs - fixed to handle URLs at start of text
136 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)"
137
138 for m in re.finditer(url_regex, text_bytes):
139 url = m.group(1).decode("UTF-8")
140 # Adjust byte positions to account for the optional prefix
141 url_start = m.start(1)
142 url_end = m.end(1)
143 facets.append({
144 "index": {
145 "byteStart": url_start,
146 "byteEnd": url_end,
147 },
148 "features": [{"$type": "app.bsky.richtext.facet#link", "uri": url}],
149 })
150
151 # Parse hashtags
152 hashtag_regex = rb"(?:^|[$|\s])#([a-zA-Z0-9_]+)"
153
154 for m in re.finditer(hashtag_regex, text_bytes):
155 tag = m.group(1).decode("UTF-8") # Get tag without # prefix
156 # Get byte positions for the entire hashtag including #
157 tag_start = m.start(0)
158 # Adjust start if there's a space/prefix
159 if text_bytes[tag_start:tag_start+1] in (b' ', b'$'):
160 tag_start += 1
161 tag_end = m.end(0)
162 facets.append({
163 "index": {
164 "byteStart": tag_start,
165 "byteEnd": tag_end,
166 },
167 "features": [{"$type": "app.bsky.richtext.facet#tag", "tag": tag}],
168 })
169
170 if facets:
171 post_record["facets"] = facets
172
173 # Create the post
174 create_data = {
175 "repo": user_did,
176 "collection": "app.bsky.feed.post",
177 "record": post_record
178 }
179
180 post_response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
181 post_response.raise_for_status()
182 result = post_response.json()
183
184 post_uri = result.get("uri")
185 post_cid = result.get("cid")
186 handle = session.get("handle", username)
187 rkey = post_uri.split("/")[-1] if post_uri else ""
188 post_url = f"https://bsky.app/profile/{handle}/post/{rkey}"
189 post_urls.append(post_url)
190
191 # Set up references for thread continuation
192 previous_post = {"uri": post_uri, "cid": post_cid}
193 if i == 0:
194 root_post = previous_post
195
196 # Return appropriate message based on single post or thread
197 if len(text) == 1:
198 return f"Successfully posted to Bluesky!\nPost URL: {post_urls[0]}\nText: {text[0]}\nLanguage: {lang}"
199 else:
200 urls_text = "\n".join([f"Post {i+1}: {url}" for i, url in enumerate(post_urls)])
201 return f"Successfully created thread with {len(text)} posts!\n{urls_text}\nLanguage: {lang}"
202
203 except Exception as e:
204 raise Exception(f"Error posting to Bluesky: {str(e)}")