a digital person for bluesky
1import os 2import logging 3from typing import Optional, Dict, Any, List 4from atproto_client import Client, Session, SessionEvent, models 5 6# Configure logging 7logging.basicConfig( 8 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 9) 10logger = logging.getLogger("bluesky_session_handler") 11 12# Load the environment variables 13import dotenv 14dotenv.load_dotenv(override=True) 15 16import yaml 17import json 18 19# Strip fields. A list of fields to remove from a JSON object 20STRIP_FIELDS = [ 21 "cid", 22 "rev", 23 "did", 24 "uri", 25 "langs", 26 "threadgate", 27 "py_type", 28 "labels", 29 "avatar", 30 "viewer", 31 "indexed_at", 32 "tags", 33 "associated", 34 "thread_context", 35 "aspect_ratio", 36 "thumb", 37 "fullsize", 38 "root", 39 "created_at", 40 "verification", 41 "like_count", 42 "quote_count", 43 "reply_count", 44 "repost_count", 45 "embedding_disabled", 46 "thread_muted", 47 "reply_disabled", 48 "pinned", 49 "like", 50 "repost", 51 "blocked_by", 52 "blocking", 53 "blocking_by_list", 54 "followed_by", 55 "following", 56 "known_followers", 57 "muted", 58 "muted_by_list", 59 "root_author_like", 60 "entities", 61 "ref", 62 "mime_type", 63 "size", 64] 65def convert_to_basic_types(obj): 66 """Convert complex Python objects to basic types for JSON/YAML serialization.""" 67 if hasattr(obj, '__dict__'): 68 # Convert objects with __dict__ to their dictionary representation 69 return convert_to_basic_types(obj.__dict__) 70 elif isinstance(obj, dict): 71 return {key: convert_to_basic_types(value) for key, value in obj.items()} 72 elif isinstance(obj, list): 73 return [convert_to_basic_types(item) for item in obj] 74 elif isinstance(obj, (str, int, float, bool)) or obj is None: 75 return obj 76 else: 77 # For other types, try to convert to string 78 return str(obj) 79 80 81def strip_fields(obj, strip_field_list): 82 """Recursively strip fields from a JSON object.""" 83 if isinstance(obj, dict): 84 keys_flagged_for_removal = [] 85 86 # Remove fields from strip list and pydantic metadata 87 for field in list(obj.keys()): 88 if field in strip_field_list or field.startswith("__"): 89 keys_flagged_for_removal.append(field) 90 91 # Remove flagged keys 92 for key in keys_flagged_for_removal: 93 obj.pop(key, None) 94 95 # Recursively process remaining values 96 for key, value in list(obj.items()): 97 obj[key] = strip_fields(value, strip_field_list) 98 # Remove empty/null values after processing 99 if ( 100 obj[key] is None 101 or (isinstance(obj[key], dict) and len(obj[key]) == 0) 102 or (isinstance(obj[key], list) and len(obj[key]) == 0) 103 or (isinstance(obj[key], str) and obj[key].strip() == "") 104 ): 105 obj.pop(key, None) 106 107 elif isinstance(obj, list): 108 for i, value in enumerate(obj): 109 obj[i] = strip_fields(value, strip_field_list) 110 # Remove None values from list 111 obj[:] = [item for item in obj if item is not None] 112 113 return obj 114 115 116def flatten_thread_structure(thread_data): 117 """ 118 Flatten a nested thread structure into a list while preserving all data. 119 120 Args: 121 thread_data: The thread data from get_post_thread 122 123 Returns: 124 Dict with 'posts' key containing a list of posts in chronological order 125 """ 126 posts = [] 127 128 def traverse_thread(node): 129 """Recursively traverse the thread structure to collect posts.""" 130 if not node: 131 return 132 133 # If this node has a parent, traverse it first (to maintain chronological order) 134 if hasattr(node, 'parent') and node.parent: 135 traverse_thread(node.parent) 136 137 # Then add this node's post 138 if hasattr(node, 'post') and node.post: 139 # Convert to dict if needed to ensure we can process it 140 if hasattr(node.post, '__dict__'): 141 post_dict = node.post.__dict__.copy() 142 elif isinstance(node.post, dict): 143 post_dict = node.post.copy() 144 else: 145 post_dict = {} 146 147 posts.append(post_dict) 148 149 # Handle the thread structure 150 if hasattr(thread_data, 'thread'): 151 # Start from the main thread node 152 traverse_thread(thread_data.thread) 153 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__: 154 traverse_thread(thread_data.__dict__['thread']) 155 156 # Return a simple structure with posts list 157 return {'posts': posts} 158 159 160def thread_to_yaml_string(thread, strip_metadata=True): 161 """ 162 Convert thread data to a YAML-formatted string for LLM parsing. 163 164 Args: 165 thread: The thread data from get_post_thread 166 strip_metadata: Whether to strip metadata fields for cleaner output 167 168 Returns: 169 YAML-formatted string representation of the thread 170 """ 171 # First flatten the thread structure to avoid deep nesting 172 flattened = flatten_thread_structure(thread) 173 174 # Convert complex objects to basic types 175 basic_thread = convert_to_basic_types(flattened) 176 177 if strip_metadata: 178 # Create a copy and strip unwanted fields 179 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS) 180 else: 181 cleaned_thread = basic_thread 182 183 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False) 184 185 186 187 188 189 190 191def get_session(username: str) -> Optional[str]: 192 try: 193 with open(f"session_{username}.txt", encoding="UTF-8") as f: 194 return f.read() 195 except FileNotFoundError: 196 logger.debug(f"No existing session found for {username}") 197 return None 198 199def save_session(username: str, session_string: str) -> None: 200 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f: 201 f.write(session_string) 202 logger.debug(f"Session saved for {username}") 203 204def on_session_change(username: str, event: SessionEvent, session: Session) -> None: 205 logger.debug(f"Session changed: {event} {repr(session)}") 206 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 207 logger.debug(f"Saving changed session for {username}") 208 save_session(username, session.export()) 209 210def init_client(username: str, password: str) -> Client: 211 pds_uri = os.getenv("PDS_URI") 212 if pds_uri is None: 213 logger.warning( 214 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable." 215 ) 216 pds_uri = "https://bsky.social" 217 218 # Print the PDS URI 219 logger.debug(f"Using PDS URI: {pds_uri}") 220 221 client = Client(pds_uri) 222 client.on_session_change( 223 lambda event, session: on_session_change(username, event, session) 224 ) 225 226 session_string = get_session(username) 227 if session_string: 228 logger.debug(f"Reusing existing session for {username}") 229 client.login(session_string=session_string) 230 else: 231 logger.debug(f"Creating new session for {username}") 232 client.login(username, password) 233 234 return client 235 236 237def default_login() -> Client: 238 username = os.getenv("BSKY_USERNAME") 239 password = os.getenv("BSKY_PASSWORD") 240 241 if username is None: 242 logger.error( 243 "No username provided. Please provide a username using the BSKY_USERNAME environment variable." 244 ) 245 exit() 246 247 if password is None: 248 logger.error( 249 "No password provided. Please provide a password using the BSKY_PASSWORD environment variable." 250 ) 251 exit() 252 253 return init_client(username, password) 254 255def remove_outside_quotes(text: str) -> str: 256 """ 257 Remove outside double quotes from response text. 258 259 Only handles double quotes to avoid interfering with contractions: 260 - Double quotes: "text" → text 261 - Preserves single quotes and internal quotes 262 263 Args: 264 text: The text to process 265 266 Returns: 267 Text with outside double quotes removed 268 """ 269 if not text or len(text) < 2: 270 return text 271 272 text = text.strip() 273 274 # Only remove double quotes from start and end 275 if text.startswith('"') and text.endswith('"'): 276 return text[1:-1] 277 278 return text 279 280def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None) -> Dict[str, Any]: 281 """ 282 Reply to a post on Bluesky with rich text support. 283 284 Args: 285 client: Authenticated Bluesky client 286 text: The reply text 287 reply_to_uri: The URI of the post being replied to (parent) 288 reply_to_cid: The CID of the post being replied to (parent) 289 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri 290 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid 291 lang: Language code for the post (e.g., 'en-US', 'es', 'ja') 292 293 Returns: 294 The response from sending the post 295 """ 296 import re 297 298 # If root is not provided, this is a reply to the root post 299 if root_uri is None: 300 root_uri = reply_to_uri 301 root_cid = reply_to_cid 302 303 # Create references for the reply 304 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid)) 305 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid)) 306 307 # Parse rich text facets (mentions and URLs) 308 facets = [] 309 text_bytes = text.encode("UTF-8") 310 311 # Parse mentions - fixed to handle @ at start of text 312 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 313 314 for m in re.finditer(mention_regex, text_bytes): 315 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix 316 # Adjust byte positions to account for the optional prefix 317 mention_start = m.start(1) 318 mention_end = m.end(1) 319 try: 320 # Resolve handle to DID using the API 321 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle}) 322 if resolve_resp and hasattr(resolve_resp, 'did'): 323 facets.append( 324 models.AppBskyRichtextFacet.Main( 325 index=models.AppBskyRichtextFacet.ByteSlice( 326 byteStart=mention_start, 327 byteEnd=mention_end 328 ), 329 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)] 330 ) 331 ) 332 except Exception as e: 333 logger.debug(f"Failed to resolve handle {handle}: {e}") 334 continue 335 336 # Parse URLs - fixed to handle URLs at start of text 337 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 338 339 for m in re.finditer(url_regex, text_bytes): 340 url = m.group(1).decode("UTF-8") 341 # Adjust byte positions to account for the optional prefix 342 url_start = m.start(1) 343 url_end = m.end(1) 344 facets.append( 345 models.AppBskyRichtextFacet.Main( 346 index=models.AppBskyRichtextFacet.ByteSlice( 347 byteStart=url_start, 348 byteEnd=url_end 349 ), 350 features=[models.AppBskyRichtextFacet.Link(uri=url)] 351 ) 352 ) 353 354 # Send the reply with facets if any were found 355 if facets: 356 response = client.send_post( 357 text=text, 358 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 359 facets=facets, 360 langs=[lang] if lang else None 361 ) 362 else: 363 response = client.send_post( 364 text=text, 365 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 366 langs=[lang] if lang else None 367 ) 368 369 logger.info(f"Reply sent successfully: {response.uri}") 370 return response 371 372 373def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]: 374 """ 375 Get the thread containing a post to find root post information. 376 377 Args: 378 client: Authenticated Bluesky client 379 uri: The URI of the post 380 381 Returns: 382 The thread data or None if not found 383 """ 384 try: 385 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10}) 386 return thread 387 except Exception as e: 388 logger.error(f"Error fetching post thread: {e}") 389 return None 390 391 392def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US") -> Optional[Dict[str, Any]]: 393 """ 394 Reply to a notification (mention or reply). 395 396 Args: 397 client: Authenticated Bluesky client 398 notification: The notification object from list_notifications 399 reply_text: The text to reply with 400 lang: Language code for the post (defaults to "en-US") 401 402 Returns: 403 The response from sending the reply or None if failed 404 """ 405 try: 406 # Get the post URI and CID from the notification (handle both dict and object) 407 if isinstance(notification, dict): 408 post_uri = notification.get('uri') 409 post_cid = notification.get('cid') 410 # Check if the notification record has reply info with root 411 record = notification.get('record', {}) 412 reply_info = record.get('reply') if isinstance(record, dict) else None 413 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 414 post_uri = notification.uri 415 post_cid = notification.cid 416 # Check if the notification record has reply info with root 417 reply_info = None 418 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 419 reply_info = notification.record.reply 420 else: 421 post_uri = None 422 post_cid = None 423 reply_info = None 424 425 if not post_uri or not post_cid: 426 logger.error("Notification doesn't have required uri/cid fields") 427 return None 428 429 # Determine root: if post has reply info, use its root; otherwise this post IS the root 430 if reply_info: 431 # Extract root from the notification's reply structure 432 if isinstance(reply_info, dict): 433 root_ref = reply_info.get('root') 434 if root_ref and isinstance(root_ref, dict): 435 root_uri = root_ref.get('uri', post_uri) 436 root_cid = root_ref.get('cid', post_cid) 437 else: 438 # No root in reply info, use post as root 439 root_uri = post_uri 440 root_cid = post_cid 441 elif hasattr(reply_info, 'root'): 442 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 443 root_uri = reply_info.root.uri 444 root_cid = reply_info.root.cid 445 else: 446 root_uri = post_uri 447 root_cid = post_cid 448 else: 449 root_uri = post_uri 450 root_cid = post_cid 451 else: 452 # No reply info means this post IS the root 453 root_uri = post_uri 454 root_cid = post_cid 455 456 # Reply to the notification 457 return reply_to_post( 458 client=client, 459 text=reply_text, 460 reply_to_uri=post_uri, 461 reply_to_cid=post_cid, 462 root_uri=root_uri, 463 root_cid=root_cid, 464 lang=lang 465 ) 466 467 except Exception as e: 468 logger.error(f"Error replying to notification: {e}") 469 return None 470 471 472def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US") -> Optional[List[Dict[str, Any]]]: 473 """ 474 Reply to a notification with a threaded chain of messages (max 15). 475 476 Args: 477 client: Authenticated Bluesky client 478 notification: The notification object from list_notifications 479 reply_messages: List of reply texts (max 15 messages, each max 300 chars) 480 lang: Language code for the posts (defaults to "en-US") 481 482 Returns: 483 List of responses from sending the replies or None if failed 484 """ 485 try: 486 # Validate input 487 if not reply_messages or len(reply_messages) == 0: 488 logger.error("Reply messages list cannot be empty") 489 return None 490 if len(reply_messages) > 15: 491 logger.error(f"Cannot send more than 15 reply messages (got {len(reply_messages)})") 492 return None 493 494 # Get the post URI and CID from the notification (handle both dict and object) 495 if isinstance(notification, dict): 496 post_uri = notification.get('uri') 497 post_cid = notification.get('cid') 498 # Check if the notification record has reply info with root 499 record = notification.get('record', {}) 500 reply_info = record.get('reply') if isinstance(record, dict) else None 501 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 502 post_uri = notification.uri 503 post_cid = notification.cid 504 # Check if the notification record has reply info with root 505 reply_info = None 506 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 507 reply_info = notification.record.reply 508 else: 509 post_uri = None 510 post_cid = None 511 reply_info = None 512 513 if not post_uri or not post_cid: 514 logger.error("Notification doesn't have required uri/cid fields") 515 return None 516 517 # Determine root: if post has reply info, use its root; otherwise this post IS the root 518 if reply_info: 519 # Extract root from the notification's reply structure 520 if isinstance(reply_info, dict): 521 root_ref = reply_info.get('root') 522 if root_ref and isinstance(root_ref, dict): 523 root_uri = root_ref.get('uri', post_uri) 524 root_cid = root_ref.get('cid', post_cid) 525 else: 526 # No root in reply info, use post as root 527 root_uri = post_uri 528 root_cid = post_cid 529 elif hasattr(reply_info, 'root'): 530 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 531 root_uri = reply_info.root.uri 532 root_cid = reply_info.root.cid 533 else: 534 root_uri = post_uri 535 root_cid = post_cid 536 else: 537 root_uri = post_uri 538 root_cid = post_cid 539 else: 540 # No reply info means this post IS the root 541 root_uri = post_uri 542 root_cid = post_cid 543 544 # Send replies in sequence, creating a thread 545 responses = [] 546 current_parent_uri = post_uri 547 current_parent_cid = post_cid 548 549 for i, message in enumerate(reply_messages): 550 logger.info(f"Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...") 551 552 # Send this reply 553 response = reply_to_post( 554 client=client, 555 text=message, 556 reply_to_uri=current_parent_uri, 557 reply_to_cid=current_parent_cid, 558 root_uri=root_uri, 559 root_cid=root_cid, 560 lang=lang 561 ) 562 563 if not response: 564 logger.error(f"Failed to send reply {i+1}, posting system failure message") 565 # Try to post a system failure message 566 failure_response = reply_to_post( 567 client=client, 568 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]", 569 reply_to_uri=current_parent_uri, 570 reply_to_cid=current_parent_cid, 571 root_uri=root_uri, 572 root_cid=root_cid, 573 lang=lang 574 ) 575 if failure_response: 576 responses.append(failure_response) 577 current_parent_uri = failure_response.uri 578 current_parent_cid = failure_response.cid 579 else: 580 logger.error("Could not even send system failure message, stopping thread") 581 return responses if responses else None 582 else: 583 responses.append(response) 584 # Update parent references for next reply (if any) 585 if i < len(reply_messages) - 1: # Not the last message 586 current_parent_uri = response.uri 587 current_parent_cid = response.cid 588 589 logger.info(f"Successfully sent {len(responses)} threaded replies") 590 return responses 591 592 except Exception as e: 593 logger.error(f"Error sending threaded reply to notification: {e}") 594 return None 595 596 597def create_synthesis_ack(client: Client, note: str) -> Optional[Dict[str, Any]]: 598 """ 599 Create a stream.thought.ack record for synthesis without a target post. 600 601 This creates a synthesis acknowledgment with null subject field. 602 603 Args: 604 client: Authenticated Bluesky client 605 note: The synthesis note/content 606 607 Returns: 608 The response from creating the acknowledgment record or None if failed 609 """ 610 try: 611 import requests 612 import json 613 from datetime import datetime, timezone 614 615 # Get session info from the client 616 access_token = None 617 user_did = None 618 619 # Try different ways to get the session info 620 if hasattr(client, '_session') and client._session: 621 access_token = client._session.access_jwt 622 user_did = client._session.did 623 elif hasattr(client, 'access_jwt'): 624 access_token = client.access_jwt 625 user_did = client.did if hasattr(client, 'did') else None 626 else: 627 logger.error("Cannot access client session information") 628 return None 629 630 if not access_token or not user_did: 631 logger.error("Missing access token or DID from session") 632 return None 633 634 pds_host = os.getenv("PDS_URI", "https://bsky.social") 635 636 # Create acknowledgment record with null subject 637 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 638 ack_record = { 639 "$type": "stream.thought.ack", 640 "subject": None, # Null subject for synthesis 641 "createdAt": now, 642 "note": note 643 } 644 645 # Create the record 646 headers = {"Authorization": f"Bearer {access_token}"} 647 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 648 649 create_data = { 650 "repo": user_did, 651 "collection": "stream.thought.ack", 652 "record": ack_record 653 } 654 655 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 656 response.raise_for_status() 657 result = response.json() 658 659 logger.info(f"Successfully created synthesis acknowledgment") 660 return result 661 662 except Exception as e: 663 logger.error(f"Error creating synthesis acknowledgment: {e}") 664 return None 665 666 667def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]: 668 """ 669 Create a stream.thought.ack record to acknowledge a post. 670 671 This creates a custom acknowledgment record instead of a standard Bluesky like, 672 allowing void to track which posts it has engaged with. 673 674 Args: 675 client: Authenticated Bluesky client 676 post_uri: The URI of the post to acknowledge 677 post_cid: The CID of the post to acknowledge 678 note: Optional note to attach to the acknowledgment 679 680 Returns: 681 The response from creating the acknowledgment record or None if failed 682 """ 683 try: 684 import requests 685 import json 686 from datetime import datetime, timezone 687 688 # Get session info from the client 689 # The atproto Client stores the session differently 690 access_token = None 691 user_did = None 692 693 # Try different ways to get the session info 694 if hasattr(client, '_session') and client._session: 695 access_token = client._session.access_jwt 696 user_did = client._session.did 697 elif hasattr(client, 'access_jwt'): 698 access_token = client.access_jwt 699 user_did = client.did if hasattr(client, 'did') else None 700 else: 701 logger.error("Cannot access client session information") 702 return None 703 704 if not access_token or not user_did: 705 logger.error("Missing access token or DID from session") 706 return None 707 708 pds_host = os.getenv("PDS_URI", "https://bsky.social") 709 710 # Create acknowledgment record with stream.thought.ack type 711 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 712 ack_record = { 713 "$type": "stream.thought.ack", 714 "subject": { 715 "uri": post_uri, 716 "cid": post_cid 717 }, 718 "createdAt": now, 719 "note": note # Will be null if no note provided 720 } 721 722 # Create the record 723 headers = {"Authorization": f"Bearer {access_token}"} 724 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 725 726 create_data = { 727 "repo": user_did, 728 "collection": "stream.thought.ack", 729 "record": ack_record 730 } 731 732 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 733 response.raise_for_status() 734 result = response.json() 735 736 logger.info(f"Successfully acknowledged post: {post_uri}") 737 return result 738 739 except Exception as e: 740 logger.error(f"Error acknowledging post: {e}") 741 return None 742 743 744def create_tool_call_record(client: Client, tool_name: str, arguments: str, tool_call_id: Optional[str] = None) -> Optional[Dict[str, Any]]: 745 """ 746 Create a stream.thought.tool_call record to track tool usage. 747 748 This creates a record of tool calls made by void during processing, 749 allowing for analysis of tool usage patterns and debugging. 750 751 Args: 752 client: Authenticated Bluesky client 753 tool_name: Name of the tool being called 754 arguments: Raw JSON string of the tool arguments 755 tool_call_id: Optional ID of the tool call for correlation 756 757 Returns: 758 The response from creating the tool call record or None if failed 759 """ 760 try: 761 import requests 762 import json 763 from datetime import datetime, timezone 764 765 # Get session info from the client 766 access_token = None 767 user_did = None 768 769 # Try different ways to get the session info 770 if hasattr(client, '_session') and client._session: 771 access_token = client._session.access_jwt 772 user_did = client._session.did 773 elif hasattr(client, 'access_jwt'): 774 access_token = client.access_jwt 775 user_did = client.did if hasattr(client, 'did') else None 776 else: 777 logger.error("Cannot access client session information") 778 return None 779 780 if not access_token or not user_did: 781 logger.error("Missing access token or DID from session") 782 return None 783 784 pds_host = os.getenv("PDS_URI", "https://bsky.social") 785 786 # Create tool call record 787 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 788 tool_record = { 789 "$type": "stream.thought.tool.call", 790 "tool_name": tool_name, 791 "arguments": arguments, # Store as string to avoid parsing issues 792 "createdAt": now 793 } 794 795 # Add tool_call_id if provided 796 if tool_call_id: 797 tool_record["tool_call_id"] = tool_call_id 798 799 # Create the record 800 headers = {"Authorization": f"Bearer {access_token}"} 801 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 802 803 create_data = { 804 "repo": user_did, 805 "collection": "stream.thought.tool.call", 806 "record": tool_record 807 } 808 809 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 810 if response.status_code != 200: 811 logger.error(f"Tool call record creation failed: {response.status_code} - {response.text}") 812 response.raise_for_status() 813 result = response.json() 814 815 logger.debug(f"Successfully recorded tool call: {tool_name}") 816 return result 817 818 except Exception as e: 819 logger.error(f"Error creating tool call record: {e}") 820 return None 821 822 823def create_reasoning_record(client: Client, reasoning_text: str) -> Optional[Dict[str, Any]]: 824 """ 825 Create a stream.thought.reasoning record to track agent reasoning. 826 827 This creates a record of void's reasoning during message processing, 828 providing transparency into the decision-making process. 829 830 Args: 831 client: Authenticated Bluesky client 832 reasoning_text: The reasoning text from the agent 833 834 Returns: 835 The response from creating the reasoning record or None if failed 836 """ 837 try: 838 import requests 839 import json 840 from datetime import datetime, timezone 841 842 # Get session info from the client 843 access_token = None 844 user_did = None 845 846 # Try different ways to get the session info 847 if hasattr(client, '_session') and client._session: 848 access_token = client._session.access_jwt 849 user_did = client._session.did 850 elif hasattr(client, 'access_jwt'): 851 access_token = client.access_jwt 852 user_did = client.did if hasattr(client, 'did') else None 853 else: 854 logger.error("Cannot access client session information") 855 return None 856 857 if not access_token or not user_did: 858 logger.error("Missing access token or DID from session") 859 return None 860 861 pds_host = os.getenv("PDS_URI", "https://bsky.social") 862 863 # Create reasoning record 864 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 865 reasoning_record = { 866 "$type": "stream.thought.reasoning", 867 "reasoning": reasoning_text, 868 "createdAt": now 869 } 870 871 # Create the record 872 headers = {"Authorization": f"Bearer {access_token}"} 873 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 874 875 create_data = { 876 "repo": user_did, 877 "collection": "stream.thought.reasoning", 878 "record": reasoning_record 879 } 880 881 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 882 response.raise_for_status() 883 result = response.json() 884 885 logger.debug(f"Successfully recorded reasoning (length: {len(reasoning_text)} chars)") 886 return result 887 888 except Exception as e: 889 logger.error(f"Error creating reasoning record: {e}") 890 return None 891 892 893if __name__ == "__main__": 894 client = default_login() 895 # do something with the client 896 logger.info("Client is ready to use!")