a tool to help your Letta AI agents navigate bluesky
1"""Bluesky posting tool for Letta agents using atproto SDK."""
2
3from typing import List, Dict
4import os
5import re
6
7
8def parse_facets(text: str, client) -> List[Dict]:
9 """Parse text for mentions, links, and hashtags to create rich text facets."""
10 facets = []
11 text_bytes = text.encode("UTF-8")
12
13 # Parse mentions
14 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)"
15 for m in re.finditer(mention_regex, text_bytes):
16 handle = m.group(1)[1:].decode("UTF-8")
17 try:
18 resolved = client.app.bsky.actor.get_profile({'actor': handle})
19 facets.append({
20 "index": {
21 "byteStart": m.start(1),
22 "byteEnd": m.end(1),
23 },
24 "features": [{
25 "$type": "app.bsky.richtext.facet#mention",
26 "did": resolved.did
27 }],
28 })
29 except:
30 continue
31
32 # Parse URLs
33 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)"
34 for m in re.finditer(url_regex, text_bytes):
35 url = m.group(1).decode("UTF-8")
36 facets.append({
37 "index": {
38 "byteStart": m.start(1),
39 "byteEnd": m.end(1),
40 },
41 "features": [{
42 "$type": "app.bsky.richtext.facet#link",
43 "uri": url,
44 }],
45 })
46
47 # Parse hashtags
48 tag_regex = rb"(?:^|\s)(#[^\d\s]\S*)(?=\s|$)"
49 for m in re.finditer(tag_regex, text_bytes):
50 tag = m.group(1).decode("UTF-8")
51 tag = re.sub(r'[.,;:!?]+$', '', tag)
52 if len(tag) <= 66 and len(tag) > 1:
53 facets.append({
54 "index": {
55 "byteStart": m.start(1),
56 "byteEnd": m.start(1) + len(tag.encode("UTF-8")),
57 },
58 "features": [{
59 "$type": "app.bsky.richtext.facet#tag",
60 "tag": tag[1:],
61 }],
62 })
63
64 if facets:
65 facets.sort(key=lambda f: f["index"]["byteStart"])
66
67 return facets if facets else None
68
69
70def _check_is_self(agent_did: str, target_did: str) -> bool:
71 """Check 2: Self-Post Check (Free)."""
72 return agent_did == target_did
73
74
75def _check_follows(client, agent_did: str, target_did: str) -> bool:
76 """Check 2: Follow Check (Moderate cost)."""
77 try:
78 # Fetch profiles to get follow counts
79 agent_profile = client.app.bsky.actor.get_profile({'actor': agent_did})
80 target_profile = client.app.bsky.actor.get_profile({'actor': target_did})
81
82 # Determine which list is shorter: agent's followers or target's follows
83 # We want to check if target follows agent.
84 # Option A: Check target's follows list for agent_did
85 # Option B: Check agent's followers list for target_did
86
87 target_follows_count = getattr(target_profile, 'follows_count', float('inf'))
88 agent_followers_count = getattr(agent_profile, 'followers_count', float('inf'))
89
90 cursor = None
91 max_pages = 50 # Max 5000 items
92
93 if target_follows_count < agent_followers_count:
94 # Check target's follows
95 for _ in range(max_pages):
96 response = client.app.bsky.graph.get_follows({'actor': target_did, 'cursor': cursor, 'limit': 100})
97 if not response.follows:
98 break
99
100 for follow in response.follows:
101 if follow.did == agent_did:
102 return True
103
104 cursor = response.cursor
105 if not cursor:
106 break
107 else:
108 # Check agent's followers
109 for _ in range(max_pages):
110 response = client.app.bsky.graph.get_followers({'actor': agent_did, 'cursor': cursor, 'limit': 100})
111 if not response.followers:
112 break
113
114 for follower in response.followers:
115 if follower.did == target_did:
116 return True
117
118 cursor = response.cursor
119 if not cursor:
120 break
121
122 return False
123 except Exception:
124 # If optimization fails, we continue to next check rather than failing hard here
125 # unless it's a critical error, but we'll let the main try/except handle that
126 raise
127
128
129def _check_already_replied(client, agent_did: str, agent_handle: str, reply_to_uri: str) -> None:
130 """
131 Check 1: Duplicate Reply Prevention (Cheap - 1 API call).
132
133 Prevents agents from replying multiple times to the same message.
134 This check runs FIRST to block duplicates in ALL scenarios, including:
135 - Replies to the agent's own posts
136 - Replies to posts that mention the agent
137 - Any other reply scenario
138
139 Only checks direct replies, not deeper thread responses.
140
141 When duplicates are found, provides detailed information about existing
142 replies including URIs and content to help agents continue the conversation
143 appropriately.
144
145 Args:
146 client: Authenticated Bluesky client
147 agent_did: The agent's DID
148 agent_handle: The agent's handle (username)
149 reply_to_uri: URI of the post being replied to
150
151 Raises:
152 Exception: If agent has already replied directly to this message,
153 with details about the existing reply(ies)
154 """
155 try:
156 # Fetch post with only direct replies (depth=1)
157 response = client.app.bsky.feed.get_post_thread({
158 'uri': reply_to_uri,
159 'depth': 1, # Only direct replies
160 'parentHeight': 0 # Don't fetch parents (not needed)
161 })
162
163 # Validate response structure
164 if not hasattr(response, 'thread'):
165 return # Can't verify, proceed
166
167 thread = response.thread
168 if not hasattr(thread, 'replies') or not thread.replies:
169 return # No replies yet, proceed
170
171 # Collect all replies by this agent
172 agent_replies = []
173 for reply in thread.replies:
174 # Validate reply structure
175 if not hasattr(reply, 'post'):
176 continue
177 if not hasattr(reply.post, 'author'):
178 continue
179
180 # Found agent's reply
181 if reply.post.author.did == agent_did:
182 agent_replies.append(reply)
183
184 # If no duplicates found, proceed
185 if not agent_replies:
186 return
187
188 # Get the most recent reply (last in list)
189 # Note: Agents may have multiple replies if this issue happened before
190 most_recent = agent_replies[-1]
191 reply_post = most_recent.post
192 reply_text = reply_post.record.text if hasattr(reply_post.record, 'text') else "[text unavailable]"
193 reply_uri = reply_post.uri
194
195 # Extract rkey from URI for web URL
196 # URI format: at://did:plc:xyz/app.bsky.feed.post/rkey
197 rkey = reply_uri.split('/')[-1]
198 reply_url = f"https://bsky.app/profile/{agent_handle}/post/{rkey}"
199
200 # Handle multiple replies case
201 count_msg = ""
202 if len(agent_replies) > 1:
203 count_msg = f"\n\nNote: You have {len(agent_replies)} direct replies to this message. The most recent one is shown above."
204
205 # Construct detailed error message
206 error_msg = (
207 f"Message not sent: You have already replied directly to this message.{count_msg}\n\n"
208 f"Your previous reply:\n\"{reply_text}\"\n\n"
209 f"Reply URI: {reply_uri}\n"
210 f"Web link: {reply_url}\n\n"
211 f"Suggestions:\n"
212 f"1. If you want to add more to your existing reply, use the URI above to continue that thread.\n"
213 f"2. Make sure you're not repeating yourself - check what you already said before adding more.\n"
214 f"3. Consider replying to one of the responses to your reply instead.\n"
215 f"4. If you have something new to say, start a new top-level message with additional context."
216 )
217
218 raise Exception(error_msg)
219
220 except Exception as e:
221 # If it's our duplicate reply exception, raise it
222 if "already replied" in str(e):
223 raise e
224 # For other errors, re-raise to be caught by main error handler
225 raise
226
227
228def _check_thread_participation(client, agent_did: str, agent_handle: str, reply_to_uri: str) -> bool:
229 """Check 5: Thread Participation and Mention Check (Expensive)."""
230 try:
231 # Fetch the thread
232 # depth=100 should be sufficient for most contexts, or we can walk up manually if needed.
233 # get_post_thread returns the post and its parents if configured.
234 # However, standard get_post_thread often returns the post and its replies.
235 # We need to walk UP the tree (parents).
236 # The 'parent' field in the response structure allows walking up.
237
238 response = client.app.bsky.feed.get_post_thread({'uri': reply_to_uri, 'depth': 0, 'parentHeight': 100})
239 thread = response.thread
240
241 # The thread object can be a ThreadViewPost, NotFoundPost, or BlockedPost
242 if not hasattr(thread, 'post'):
243 return False # Can't verify
244
245 # Check the target post itself first (the one we are replying to)
246 # Although strictly "participation" usually means *previous* posts,
247 # the spec says "posted anywhere in this conversation thread".
248 # If we are replying to ourselves, _check_is_self would have caught it.
249 # But we check here for mentions in the target post.
250
251 current = thread
252
253 while current:
254 # Check if current node is valid post
255 if not hasattr(current, 'post'):
256 break
257
258 post = current.post
259
260 # Check 3: Did agent author this post?
261 if post.author.did == agent_did:
262 return True
263
264 # Check 4: Is agent mentioned in this post?
265 # Check facets for mention
266 record = post.record
267 if hasattr(record, 'facets') and record.facets:
268 for facet in record.facets:
269 for feature in facet.features:
270 if hasattr(feature, 'did') and feature.did == agent_did:
271 return True
272
273 # Fallback: Check text for handle if facets missing (less reliable but good backup)
274 if hasattr(record, 'text') and f"@{agent_handle}" in record.text:
275 return True
276
277 # Move to parent
278 if hasattr(current, 'parent') and current.parent:
279 current = current.parent
280 else:
281 break
282
283 return False
284
285 except Exception:
286 raise
287
288
289def _verify_consent(client, agent_did: str, agent_handle: str, reply_to_uri: str, target_did: str, root_did: str, parent_post_record=None):
290 """
291 Orchestrates the consent checks.
292 Raises Exception with specific message if consent denied or verification fails.
293 """
294 try:
295 # Check 1: Duplicate Reply Prevention
296 # This check must run BEFORE any early returns to prevent duplicates in all scenarios
297 _check_already_replied(client, agent_did, agent_handle, reply_to_uri)
298
299 # Check 2: Self-Post
300 if _check_is_self(agent_did, target_did):
301 return True
302
303 # Check 3: Mention Check (Free/Cheap)
304 # If the post we are replying to mentions us, we can reply.
305 if parent_post_record:
306 # Check facets for mention
307 if hasattr(parent_post_record, 'facets') and parent_post_record.facets:
308 for facet in parent_post_record.facets:
309 for feature in facet.features:
310 if hasattr(feature, 'did') and feature.did == agent_did:
311 return True
312
313 # Fallback: Check text for handle
314 if hasattr(parent_post_record, 'text') and f"@{agent_handle}" in parent_post_record.text:
315 return True
316
317 # Check 4: Follow Check
318 # Rule: Target must follow agent.
319 # Rule 4B: If root author is different from target, Root must ALSO follow agent.
320
321 target_follows = _check_follows(client, agent_did, target_did)
322
323 if target_follows:
324 # If target follows, we must also check root if it's different
325 if root_did and root_did != target_did and root_did != agent_did:
326 root_follows = _check_follows(client, agent_did, root_did)
327 if not root_follows:
328 # Target follows, but Root does not. Fail.
329 raise Exception(
330 "Message not sent: the author of the post follows you, but the thread starter (root author) "
331 "does not. We respect the consent of the thread owner."
332 )
333 return True
334
335 # Check 5: Thread Participation
336 # This requires fetching the thread (Expensive)
337 if _check_thread_participation(client, agent_did, agent_handle, reply_to_uri):
338 return True
339
340 # If all checks fail
341 raise Exception(
342 "Message not sent: the reply was withheld because the user does not follow you "
343 "and you were not mentioned or included in this thread. Respecting the consent of "
344 "strangers is the right choice here. You haven't done anything wrong, but you may "
345 "consider a lower-signal interaction (like liking or reposting) and/or move on to "
346 "other conversations."
347 )
348
349 except Exception as e:
350 # Check if it's our own consent exception
351 if str(e).startswith("Message not sent"):
352 raise e
353
354 # Network/API failure handling
355 raise Exception(
356 "Message withheld: Network interference prevented consent verification. To ensure "
357 "safety, the system paused this action. This is an external connection issue, not "
358 "a failure of your logic. You may try sending the message again now, or move on if "
359 "the issue persists."
360 )
361
362
363
364def create_bluesky_post(text: List[str], lang: str = "en-US", reply_to_uri: str = None) -> Dict:
365 """
366 Create a post or thread on Bluesky using atproto SDK.
367
368 IMPORTANT: The 'text' parameter must always be a list, even for a single post.
369 ✅ Correct: text=["Hello Bluesky!"]
370 ✅ Correct: text=["First post", "Second post", "Third post"]
371 ✅ Correct: text=["Reply text"], reply_to_uri="at://did:plc:xyz/app.bsky.feed.post/abc"
372 ❌ Wrong: text="Hello Bluesky!" (not a list)
373 ❌ Wrong: text=[] (empty list)
374 ❌ Wrong: text=["This is a very long post that exceeds 220 characters..."] (too long)
375
376 Args:
377 text (List[str]): List of post contents. Each post must be 220 characters or less.
378 - Single-item list: Creates one standalone post
379 - Multi-item list: Creates a thread where each post replies to the previous one
380 lang (str): Language code (e.g., 'en-US', 'es', 'ja'). Defaults to 'en-US'.
381 reply_to_uri (str): Optional AT URI of a post to reply to (e.g., 'at://did:plc:.../app.bsky.feed.post/...').
382 If provided, creates a reply to that specific post. If None, creates a standalone post.
383
384 Returns:
385 Dict: Status, message, and post details. On error, returns dict with "status"="error" and "message" describing the issue.
386
387 What you can do:
388 ✓ Create single posts up to 220 characters
389 ✓ Create threads with multiple posts
390 ✓ Reply to existing posts using their AT URI
391 ✓ Include mentions (@user.bsky.social), links, and hashtags
392 ✓ Automatically parse rich text features (mentions, links, hashtags)
393 ✓ Set language codes for posts
394
395 Examples:
396 Single post:
397 create_bluesky_post(["Hello Bluesky!"])
398
399 Thread:
400 create_bluesky_post([
401 "First post in my thread",
402 "Second post continuing the thought",
403 "Final post wrapping up"
404 ])
405
406 Reply to existing post:
407 create_bluesky_post(
408 ["Thanks for sharing!"],
409 reply_to_uri="at://did:plc:abc123/app.bsky.feed.post/xyz789"
410 )
411
412 Post with mentions and hashtags:
413 create_bluesky_post(["Great work @alice.bsky.social! #bluesky #coding"])
414 """
415 try:
416 from atproto import Client, models
417
418 if not text or len(text) == 0:
419 return {
420 "status": "error",
421 "message": "The text list is empty. Provide at least one post in a list, like: ['Your message here']"
422 }
423
424 for i, post_text in enumerate(text, 1):
425 if len(post_text) > 300:
426 return {
427 "status": "error",
428 "message": f"Post {i} is {len(post_text)} characters, which exceeds the 220 character limit. Shorten the text and try again."
429 }
430
431 username = os.environ.get('BSKY_USERNAME')
432 password = os.environ.get('BSKY_APP_PASSWORD')
433
434 if not username or not password:
435 return {
436 "status": "error",
437 "message": "Environment variables BSKY_USERNAME and BSKY_APP_PASSWORD are not set. Set these variables with your Bluesky credentials."
438 }
439
440 client = Client()
441 client.login(username, password)
442
443 # --- FETCH PARENT/ROOT REFS ---
444 initial_reply_ref = None
445 initial_root_ref = None
446 target_did = None
447 root_did = None
448 parent_post_record = None
449
450 if reply_to_uri:
451 try:
452 uri_parts = reply_to_uri.replace('at://', '').split('/')
453 if len(uri_parts) < 3:
454 return {
455 "status": "error",
456 "message": f"Invalid reply_to_uri format. Expected format is: at://did/collection/rkey. The URI provided has {len(uri_parts)} parts but needs at least 3."
457 }
458
459 repo_did = uri_parts[0]
460 rkey = uri_parts[-1]
461
462 parent_post = client.app.bsky.feed.post.get(repo_did, rkey)
463
464 if not parent_post or not hasattr(parent_post, 'uri') or not hasattr(parent_post, 'cid'):
465 return {
466 "status": "error",
467 "message": f"Could not retrieve post data from URI: {reply_to_uri}. The post may not exist or the URI may be incorrect."
468 }
469
470 # Extract target DID from parent post
471 target_did = repo_did
472 parent_post_record = parent_post.value
473
474 parent_ref = models.ComAtprotoRepoStrongRef.Main(
475 uri=parent_post.uri,
476 cid=parent_post.cid
477 )
478
479 if hasattr(parent_post.value, 'reply') and parent_post.value.reply:
480 root_ref = parent_post.value.reply.root
481 else:
482 root_ref = models.ComAtprotoRepoStrongRef.Main(
483 uri=parent_post.uri,
484 cid=parent_post.cid
485 )
486
487 initial_reply_ref = models.AppBskyFeedPost.ReplyRef(
488 parent=parent_ref,
489 root=root_ref
490 )
491 initial_root_ref = root_ref
492
493 # Extract root DID
494 root_uri_parts = root_ref.uri.replace('at://', '').split('/')
495 if len(root_uri_parts) >= 1:
496 root_did = root_uri_parts[0]
497
498 except Exception as e:
499 return {
500 "status": "error",
501 "message": f"Failed to fetch post to reply to: {str(e)}. Check the URI format and try again."
502 }
503
504 # --- CONSENT GUARDRAILS ---
505 if reply_to_uri:
506 try:
507 agent_did = client.me.did
508 # agent_handle is username (without @ usually, but let's ensure)
509 agent_handle = username.replace('@', '')
510
511 _verify_consent(client, agent_did, agent_handle, reply_to_uri, target_did, root_did, parent_post_record)
512 except Exception as e:
513 return {
514 "status": "error",
515 "message": str(e)
516 }
517 # --------------------------
518
519 post_urls = []
520 previous_post_ref = None
521 root_post_ref = initial_root_ref
522
523 for i, post_text in enumerate(text):
524 if i == 0:
525 reply_ref = initial_reply_ref
526 else:
527 reply_ref = models.AppBskyFeedPost.ReplyRef(
528 parent=previous_post_ref,
529 root=root_post_ref
530 )
531
532 facets = parse_facets(post_text, client)
533
534 response = client.send_post(
535 text=post_text,
536 reply_to=reply_ref,
537 langs=[lang],
538 facets=facets
539 )
540
541 rkey = response.uri.split('/')[-1]
542 post_url = f"https://bsky.app/profile/{username}/post/{rkey}"
543 post_urls.append(post_url)
544
545 strong_ref = models.ComAtprotoRepoStrongRef.Main(
546 uri=response.uri,
547 cid=response.cid
548 )
549 previous_post_ref = strong_ref
550
551 if i == 0 and not root_post_ref:
552 root_post_ref = strong_ref
553
554 reply_msg = ""
555 if reply_to_uri:
556 reply_msg = f" as a reply to post {reply_to_uri}"
557
558 if len(text) == 1:
559 return {
560 "status": "success",
561 "message": f"Successfully posted to Bluesky{reply_msg}!",
562 "post_url": post_urls[0],
563 "text": text[0],
564 "language": lang
565 }
566 else:
567 urls_text = "\n".join([f"Post {i+1}: {url}" for i, url in enumerate(post_urls)])
568 return {
569 "status": "success",
570 "message": f"Successfully created thread with {len(text)} posts{reply_msg}!",
571 "post_urls": post_urls,
572 "post_count": len(text),
573 "language": lang
574 }
575
576 except ImportError:
577 return {
578 "status": "error",
579 "message": "The atproto package is not installed. Install it with: pip install atproto"
580 }
581 except Exception as e:
582 return {
583 "status": "error",
584 "message": f"Failed to post to Bluesky: {str(e)}. Check the error details and try again."
585 }