a tool to help your Letta AI agents navigate bluesky
at main 23 kB view raw
1"""Bluesky posting tool for Letta agents using atproto SDK.""" 2 3from typing import List, Dict 4import os 5import re 6 7 8def parse_facets(text: str, client) -> List[Dict]: 9 """Parse text for mentions, links, and hashtags to create rich text facets.""" 10 facets = [] 11 text_bytes = text.encode("UTF-8") 12 13 # Parse mentions 14 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 15 for m in re.finditer(mention_regex, text_bytes): 16 handle = m.group(1)[1:].decode("UTF-8") 17 try: 18 resolved = client.app.bsky.actor.get_profile({'actor': handle}) 19 facets.append({ 20 "index": { 21 "byteStart": m.start(1), 22 "byteEnd": m.end(1), 23 }, 24 "features": [{ 25 "$type": "app.bsky.richtext.facet#mention", 26 "did": resolved.did 27 }], 28 }) 29 except: 30 continue 31 32 # Parse URLs 33 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 34 for m in re.finditer(url_regex, text_bytes): 35 url = m.group(1).decode("UTF-8") 36 facets.append({ 37 "index": { 38 "byteStart": m.start(1), 39 "byteEnd": m.end(1), 40 }, 41 "features": [{ 42 "$type": "app.bsky.richtext.facet#link", 43 "uri": url, 44 }], 45 }) 46 47 # Parse hashtags 48 tag_regex = rb"(?:^|\s)(#[^\d\s]\S*)(?=\s|$)" 49 for m in re.finditer(tag_regex, text_bytes): 50 tag = m.group(1).decode("UTF-8") 51 tag = re.sub(r'[.,;:!?]+$', '', tag) 52 if len(tag) <= 66 and len(tag) > 1: 53 facets.append({ 54 "index": { 55 "byteStart": m.start(1), 56 "byteEnd": m.start(1) + len(tag.encode("UTF-8")), 57 }, 58 "features": [{ 59 "$type": "app.bsky.richtext.facet#tag", 60 "tag": tag[1:], 61 }], 62 }) 63 64 if facets: 65 facets.sort(key=lambda f: f["index"]["byteStart"]) 66 67 return facets if facets else None 68 69 70def _check_is_self(agent_did: str, target_did: str) -> bool: 71 """Check 2: Self-Post Check (Free).""" 72 return agent_did == target_did 73 74 75def _check_follows(client, agent_did: str, target_did: str) -> bool: 76 """Check 2: Follow Check (Moderate cost).""" 77 try: 78 # Fetch profiles to get follow counts 79 agent_profile = client.app.bsky.actor.get_profile({'actor': agent_did}) 80 target_profile = client.app.bsky.actor.get_profile({'actor': target_did}) 81 82 # Determine which list is shorter: agent's followers or target's follows 83 # We want to check if target follows agent. 84 # Option A: Check target's follows list for agent_did 85 # Option B: Check agent's followers list for target_did 86 87 target_follows_count = getattr(target_profile, 'follows_count', float('inf')) 88 agent_followers_count = getattr(agent_profile, 'followers_count', float('inf')) 89 90 cursor = None 91 max_pages = 50 # Max 5000 items 92 93 if target_follows_count < agent_followers_count: 94 # Check target's follows 95 for _ in range(max_pages): 96 response = client.app.bsky.graph.get_follows({'actor': target_did, 'cursor': cursor, 'limit': 100}) 97 if not response.follows: 98 break 99 100 for follow in response.follows: 101 if follow.did == agent_did: 102 return True 103 104 cursor = response.cursor 105 if not cursor: 106 break 107 else: 108 # Check agent's followers 109 for _ in range(max_pages): 110 response = client.app.bsky.graph.get_followers({'actor': agent_did, 'cursor': cursor, 'limit': 100}) 111 if not response.followers: 112 break 113 114 for follower in response.followers: 115 if follower.did == target_did: 116 return True 117 118 cursor = response.cursor 119 if not cursor: 120 break 121 122 return False 123 except Exception: 124 # If optimization fails, we continue to next check rather than failing hard here 125 # unless it's a critical error, but we'll let the main try/except handle that 126 raise 127 128 129def _check_already_replied(client, agent_did: str, agent_handle: str, reply_to_uri: str) -> None: 130 """ 131 Check 1: Duplicate Reply Prevention (Cheap - 1 API call). 132 133 Prevents agents from replying multiple times to the same message. 134 This check runs FIRST to block duplicates in ALL scenarios, including: 135 - Replies to the agent's own posts 136 - Replies to posts that mention the agent 137 - Any other reply scenario 138 139 Only checks direct replies, not deeper thread responses. 140 141 When duplicates are found, provides detailed information about existing 142 replies including URIs and content to help agents continue the conversation 143 appropriately. 144 145 Args: 146 client: Authenticated Bluesky client 147 agent_did: The agent's DID 148 agent_handle: The agent's handle (username) 149 reply_to_uri: URI of the post being replied to 150 151 Raises: 152 Exception: If agent has already replied directly to this message, 153 with details about the existing reply(ies) 154 """ 155 try: 156 # Fetch post with only direct replies (depth=1) 157 response = client.app.bsky.feed.get_post_thread({ 158 'uri': reply_to_uri, 159 'depth': 1, # Only direct replies 160 'parentHeight': 0 # Don't fetch parents (not needed) 161 }) 162 163 # Validate response structure 164 if not hasattr(response, 'thread'): 165 return # Can't verify, proceed 166 167 thread = response.thread 168 if not hasattr(thread, 'replies') or not thread.replies: 169 return # No replies yet, proceed 170 171 # Collect all replies by this agent 172 agent_replies = [] 173 for reply in thread.replies: 174 # Validate reply structure 175 if not hasattr(reply, 'post'): 176 continue 177 if not hasattr(reply.post, 'author'): 178 continue 179 180 # Found agent's reply 181 if reply.post.author.did == agent_did: 182 agent_replies.append(reply) 183 184 # If no duplicates found, proceed 185 if not agent_replies: 186 return 187 188 # Get the most recent reply (last in list) 189 # Note: Agents may have multiple replies if this issue happened before 190 most_recent = agent_replies[-1] 191 reply_post = most_recent.post 192 reply_text = reply_post.record.text if hasattr(reply_post.record, 'text') else "[text unavailable]" 193 reply_uri = reply_post.uri 194 195 # Extract rkey from URI for web URL 196 # URI format: at://did:plc:xyz/app.bsky.feed.post/rkey 197 rkey = reply_uri.split('/')[-1] 198 reply_url = f"https://bsky.app/profile/{agent_handle}/post/{rkey}" 199 200 # Handle multiple replies case 201 count_msg = "" 202 if len(agent_replies) > 1: 203 count_msg = f"\n\nNote: You have {len(agent_replies)} direct replies to this message. The most recent one is shown above." 204 205 # Construct detailed error message 206 error_msg = ( 207 f"Message not sent: You have already replied directly to this message.{count_msg}\n\n" 208 f"Your previous reply:\n\"{reply_text}\"\n\n" 209 f"Reply URI: {reply_uri}\n" 210 f"Web link: {reply_url}\n\n" 211 f"Suggestions:\n" 212 f"1. If you want to add more to your existing reply, use the URI above to continue that thread.\n" 213 f"2. Make sure you're not repeating yourself - check what you already said before adding more.\n" 214 f"3. Consider replying to one of the responses to your reply instead.\n" 215 f"4. If you have something new to say, start a new top-level message with additional context." 216 ) 217 218 raise Exception(error_msg) 219 220 except Exception as e: 221 # If it's our duplicate reply exception, raise it 222 if "already replied" in str(e): 223 raise e 224 # For other errors, re-raise to be caught by main error handler 225 raise 226 227 228def _check_thread_participation(client, agent_did: str, agent_handle: str, reply_to_uri: str) -> bool: 229 """Check 5: Thread Participation and Mention Check (Expensive).""" 230 try: 231 # Fetch the thread 232 # depth=100 should be sufficient for most contexts, or we can walk up manually if needed. 233 # get_post_thread returns the post and its parents if configured. 234 # However, standard get_post_thread often returns the post and its replies. 235 # We need to walk UP the tree (parents). 236 # The 'parent' field in the response structure allows walking up. 237 238 response = client.app.bsky.feed.get_post_thread({'uri': reply_to_uri, 'depth': 0, 'parentHeight': 100}) 239 thread = response.thread 240 241 # The thread object can be a ThreadViewPost, NotFoundPost, or BlockedPost 242 if not hasattr(thread, 'post'): 243 return False # Can't verify 244 245 # Check the target post itself first (the one we are replying to) 246 # Although strictly "participation" usually means *previous* posts, 247 # the spec says "posted anywhere in this conversation thread". 248 # If we are replying to ourselves, _check_is_self would have caught it. 249 # But we check here for mentions in the target post. 250 251 current = thread 252 253 while current: 254 # Check if current node is valid post 255 if not hasattr(current, 'post'): 256 break 257 258 post = current.post 259 260 # Check 3: Did agent author this post? 261 if post.author.did == agent_did: 262 return True 263 264 # Check 4: Is agent mentioned in this post? 265 # Check facets for mention 266 record = post.record 267 if hasattr(record, 'facets') and record.facets: 268 for facet in record.facets: 269 for feature in facet.features: 270 if hasattr(feature, 'did') and feature.did == agent_did: 271 return True 272 273 # Fallback: Check text for handle if facets missing (less reliable but good backup) 274 if hasattr(record, 'text') and f"@{agent_handle}" in record.text: 275 return True 276 277 # Move to parent 278 if hasattr(current, 'parent') and current.parent: 279 current = current.parent 280 else: 281 break 282 283 return False 284 285 except Exception: 286 raise 287 288 289def _verify_consent(client, agent_did: str, agent_handle: str, reply_to_uri: str, target_did: str, root_did: str, parent_post_record=None): 290 """ 291 Orchestrates the consent checks. 292 Raises Exception with specific message if consent denied or verification fails. 293 """ 294 try: 295 # Check 1: Duplicate Reply Prevention 296 # This check must run BEFORE any early returns to prevent duplicates in all scenarios 297 _check_already_replied(client, agent_did, agent_handle, reply_to_uri) 298 299 # Check 2: Self-Post 300 if _check_is_self(agent_did, target_did): 301 return True 302 303 # Check 3: Mention Check (Free/Cheap) 304 # If the post we are replying to mentions us, we can reply. 305 if parent_post_record: 306 # Check facets for mention 307 if hasattr(parent_post_record, 'facets') and parent_post_record.facets: 308 for facet in parent_post_record.facets: 309 for feature in facet.features: 310 if hasattr(feature, 'did') and feature.did == agent_did: 311 return True 312 313 # Fallback: Check text for handle 314 if hasattr(parent_post_record, 'text') and f"@{agent_handle}" in parent_post_record.text: 315 return True 316 317 # Check 4: Follow Check 318 # Rule: Target must follow agent. 319 # Rule 4B: If root author is different from target, Root must ALSO follow agent. 320 321 target_follows = _check_follows(client, agent_did, target_did) 322 323 if target_follows: 324 # If target follows, we must also check root if it's different 325 if root_did and root_did != target_did and root_did != agent_did: 326 root_follows = _check_follows(client, agent_did, root_did) 327 if not root_follows: 328 # Target follows, but Root does not. Fail. 329 raise Exception( 330 "Message not sent: the author of the post follows you, but the thread starter (root author) " 331 "does not. We respect the consent of the thread owner." 332 ) 333 return True 334 335 # Check 5: Thread Participation 336 # This requires fetching the thread (Expensive) 337 if _check_thread_participation(client, agent_did, agent_handle, reply_to_uri): 338 return True 339 340 # If all checks fail 341 raise Exception( 342 "Message not sent: the reply was withheld because the user does not follow you " 343 "and you were not mentioned or included in this thread. Respecting the consent of " 344 "strangers is the right choice here. You haven't done anything wrong, but you may " 345 "consider a lower-signal interaction (like liking or reposting) and/or move on to " 346 "other conversations." 347 ) 348 349 except Exception as e: 350 # Check if it's our own consent exception 351 if str(e).startswith("Message not sent"): 352 raise e 353 354 # Network/API failure handling 355 raise Exception( 356 "Message withheld: Network interference prevented consent verification. To ensure " 357 "safety, the system paused this action. This is an external connection issue, not " 358 "a failure of your logic. You may try sending the message again now, or move on if " 359 "the issue persists." 360 ) 361 362 363 364def create_bluesky_post(text: List[str], lang: str = "en-US", reply_to_uri: str = None) -> Dict: 365 """ 366 Create a post or thread on Bluesky using atproto SDK. 367 368 IMPORTANT: The 'text' parameter must always be a list, even for a single post. 369 ✅ Correct: text=["Hello Bluesky!"] 370 ✅ Correct: text=["First post", "Second post", "Third post"] 371 ✅ Correct: text=["Reply text"], reply_to_uri="at://did:plc:xyz/app.bsky.feed.post/abc" 372 ❌ Wrong: text="Hello Bluesky!" (not a list) 373 ❌ Wrong: text=[] (empty list) 374 ❌ Wrong: text=["This is a very long post that exceeds 220 characters..."] (too long) 375 376 Args: 377 text (List[str]): List of post contents. Each post must be 220 characters or less. 378 - Single-item list: Creates one standalone post 379 - Multi-item list: Creates a thread where each post replies to the previous one 380 lang (str): Language code (e.g., 'en-US', 'es', 'ja'). Defaults to 'en-US'. 381 reply_to_uri (str): Optional AT URI of a post to reply to (e.g., 'at://did:plc:.../app.bsky.feed.post/...'). 382 If provided, creates a reply to that specific post. If None, creates a standalone post. 383 384 Returns: 385 Dict: Status, message, and post details. On error, returns dict with "status"="error" and "message" describing the issue. 386 387 What you can do: 388 ✓ Create single posts up to 220 characters 389 ✓ Create threads with multiple posts 390 ✓ Reply to existing posts using their AT URI 391 ✓ Include mentions (@user.bsky.social), links, and hashtags 392 ✓ Automatically parse rich text features (mentions, links, hashtags) 393 ✓ Set language codes for posts 394 395 Examples: 396 Single post: 397 create_bluesky_post(["Hello Bluesky!"]) 398 399 Thread: 400 create_bluesky_post([ 401 "First post in my thread", 402 "Second post continuing the thought", 403 "Final post wrapping up" 404 ]) 405 406 Reply to existing post: 407 create_bluesky_post( 408 ["Thanks for sharing!"], 409 reply_to_uri="at://did:plc:abc123/app.bsky.feed.post/xyz789" 410 ) 411 412 Post with mentions and hashtags: 413 create_bluesky_post(["Great work @alice.bsky.social! #bluesky #coding"]) 414 """ 415 try: 416 from atproto import Client, models 417 418 if not text or len(text) == 0: 419 return { 420 "status": "error", 421 "message": "The text list is empty. Provide at least one post in a list, like: ['Your message here']" 422 } 423 424 for i, post_text in enumerate(text, 1): 425 if len(post_text) > 300: 426 return { 427 "status": "error", 428 "message": f"Post {i} is {len(post_text)} characters, which exceeds the 220 character limit. Shorten the text and try again." 429 } 430 431 username = os.environ.get('BSKY_USERNAME') 432 password = os.environ.get('BSKY_APP_PASSWORD') 433 434 if not username or not password: 435 return { 436 "status": "error", 437 "message": "Environment variables BSKY_USERNAME and BSKY_APP_PASSWORD are not set. Set these variables with your Bluesky credentials." 438 } 439 440 client = Client() 441 client.login(username, password) 442 443 # --- FETCH PARENT/ROOT REFS --- 444 initial_reply_ref = None 445 initial_root_ref = None 446 target_did = None 447 root_did = None 448 parent_post_record = None 449 450 if reply_to_uri: 451 try: 452 uri_parts = reply_to_uri.replace('at://', '').split('/') 453 if len(uri_parts) < 3: 454 return { 455 "status": "error", 456 "message": f"Invalid reply_to_uri format. Expected format is: at://did/collection/rkey. The URI provided has {len(uri_parts)} parts but needs at least 3." 457 } 458 459 repo_did = uri_parts[0] 460 rkey = uri_parts[-1] 461 462 parent_post = client.app.bsky.feed.post.get(repo_did, rkey) 463 464 if not parent_post or not hasattr(parent_post, 'uri') or not hasattr(parent_post, 'cid'): 465 return { 466 "status": "error", 467 "message": f"Could not retrieve post data from URI: {reply_to_uri}. The post may not exist or the URI may be incorrect." 468 } 469 470 # Extract target DID from parent post 471 target_did = repo_did 472 parent_post_record = parent_post.value 473 474 parent_ref = models.ComAtprotoRepoStrongRef.Main( 475 uri=parent_post.uri, 476 cid=parent_post.cid 477 ) 478 479 if hasattr(parent_post.value, 'reply') and parent_post.value.reply: 480 root_ref = parent_post.value.reply.root 481 else: 482 root_ref = models.ComAtprotoRepoStrongRef.Main( 483 uri=parent_post.uri, 484 cid=parent_post.cid 485 ) 486 487 initial_reply_ref = models.AppBskyFeedPost.ReplyRef( 488 parent=parent_ref, 489 root=root_ref 490 ) 491 initial_root_ref = root_ref 492 493 # Extract root DID 494 root_uri_parts = root_ref.uri.replace('at://', '').split('/') 495 if len(root_uri_parts) >= 1: 496 root_did = root_uri_parts[0] 497 498 except Exception as e: 499 return { 500 "status": "error", 501 "message": f"Failed to fetch post to reply to: {str(e)}. Check the URI format and try again." 502 } 503 504 # --- CONSENT GUARDRAILS --- 505 if reply_to_uri: 506 try: 507 agent_did = client.me.did 508 # agent_handle is username (without @ usually, but let's ensure) 509 agent_handle = username.replace('@', '') 510 511 _verify_consent(client, agent_did, agent_handle, reply_to_uri, target_did, root_did, parent_post_record) 512 except Exception as e: 513 return { 514 "status": "error", 515 "message": str(e) 516 } 517 # -------------------------- 518 519 post_urls = [] 520 previous_post_ref = None 521 root_post_ref = initial_root_ref 522 523 for i, post_text in enumerate(text): 524 if i == 0: 525 reply_ref = initial_reply_ref 526 else: 527 reply_ref = models.AppBskyFeedPost.ReplyRef( 528 parent=previous_post_ref, 529 root=root_post_ref 530 ) 531 532 facets = parse_facets(post_text, client) 533 534 response = client.send_post( 535 text=post_text, 536 reply_to=reply_ref, 537 langs=[lang], 538 facets=facets 539 ) 540 541 rkey = response.uri.split('/')[-1] 542 post_url = f"https://bsky.app/profile/{username}/post/{rkey}" 543 post_urls.append(post_url) 544 545 strong_ref = models.ComAtprotoRepoStrongRef.Main( 546 uri=response.uri, 547 cid=response.cid 548 ) 549 previous_post_ref = strong_ref 550 551 if i == 0 and not root_post_ref: 552 root_post_ref = strong_ref 553 554 reply_msg = "" 555 if reply_to_uri: 556 reply_msg = f" as a reply to post {reply_to_uri}" 557 558 if len(text) == 1: 559 return { 560 "status": "success", 561 "message": f"Successfully posted to Bluesky{reply_msg}!", 562 "post_url": post_urls[0], 563 "text": text[0], 564 "language": lang 565 } 566 else: 567 urls_text = "\n".join([f"Post {i+1}: {url}" for i, url in enumerate(post_urls)]) 568 return { 569 "status": "success", 570 "message": f"Successfully created thread with {len(text)} posts{reply_msg}!", 571 "post_urls": post_urls, 572 "post_count": len(text), 573 "language": lang 574 } 575 576 except ImportError: 577 return { 578 "status": "error", 579 "message": "The atproto package is not installed. Install it with: pip install atproto" 580 } 581 except Exception as e: 582 return { 583 "status": "error", 584 "message": f"Failed to post to Bluesky: {str(e)}. Check the error details and try again." 585 }