a digital person for bluesky
at x 33 kB view raw
1import os 2import logging 3from typing import Optional, Dict, Any, List 4from atproto_client import Client, Session, SessionEvent, models 5 6# Configure logging 7logging.basicConfig( 8 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 9) 10logger = logging.getLogger("bluesky_session_handler") 11 12# Load the environment variables 13import dotenv 14dotenv.load_dotenv(override=True) 15 16import yaml 17import json 18 19# Strip fields. A list of fields to remove from a JSON object 20STRIP_FIELDS = [ 21 "cid", 22 "rev", 23 "did", 24 "uri", 25 "langs", 26 "threadgate", 27 "py_type", 28 "labels", 29 "avatar", 30 "viewer", 31 "indexed_at", 32 "tags", 33 "associated", 34 "thread_context", 35 "aspect_ratio", 36 "thumb", 37 "fullsize", 38 "root", 39 "created_at", 40 "verification", 41 "like_count", 42 "quote_count", 43 "reply_count", 44 "repost_count", 45 "embedding_disabled", 46 "thread_muted", 47 "reply_disabled", 48 "pinned", 49 "like", 50 "repost", 51 "blocked_by", 52 "blocking", 53 "blocking_by_list", 54 "followed_by", 55 "following", 56 "known_followers", 57 "muted", 58 "muted_by_list", 59 "root_author_like", 60 "entities", 61 "ref", 62 "mime_type", 63 "size", 64] 65def convert_to_basic_types(obj): 66 """Convert complex Python objects to basic types for JSON/YAML serialization.""" 67 if hasattr(obj, '__dict__'): 68 # Convert objects with __dict__ to their dictionary representation 69 return convert_to_basic_types(obj.__dict__) 70 elif isinstance(obj, dict): 71 return {key: convert_to_basic_types(value) for key, value in obj.items()} 72 elif isinstance(obj, list): 73 return [convert_to_basic_types(item) for item in obj] 74 elif isinstance(obj, (str, int, float, bool)) or obj is None: 75 return obj 76 else: 77 # For other types, try to convert to string 78 return str(obj) 79 80 81def strip_fields(obj, strip_field_list): 82 """Recursively strip fields from a JSON object.""" 83 if isinstance(obj, dict): 84 keys_flagged_for_removal = [] 85 86 # Remove fields from strip list and pydantic metadata 87 for field in list(obj.keys()): 88 if field in strip_field_list or field.startswith("__"): 89 keys_flagged_for_removal.append(field) 90 91 # Remove flagged keys 92 for key in keys_flagged_for_removal: 93 obj.pop(key, None) 94 95 # Recursively process remaining values 96 for key, value in list(obj.items()): 97 obj[key] = strip_fields(value, strip_field_list) 98 # Remove empty/null values after processing 99 if ( 100 obj[key] is None 101 or (isinstance(obj[key], dict) and len(obj[key]) == 0) 102 or (isinstance(obj[key], list) and len(obj[key]) == 0) 103 or (isinstance(obj[key], str) and obj[key].strip() == "") 104 ): 105 obj.pop(key, None) 106 107 elif isinstance(obj, list): 108 for i, value in enumerate(obj): 109 obj[i] = strip_fields(value, strip_field_list) 110 # Remove None values from list 111 obj[:] = [item for item in obj if item is not None] 112 113 return obj 114 115 116def flatten_thread_structure(thread_data): 117 """ 118 Flatten a nested thread structure into a list while preserving all data. 119 120 Args: 121 thread_data: The thread data from get_post_thread 122 123 Returns: 124 Dict with 'posts' key containing a list of posts in chronological order 125 """ 126 posts = [] 127 128 def traverse_thread(node): 129 """Recursively traverse the thread structure to collect posts.""" 130 if not node: 131 return 132 133 # If this node has a parent, traverse it first (to maintain chronological order) 134 if hasattr(node, 'parent') and node.parent: 135 traverse_thread(node.parent) 136 137 # Then add this node's post 138 if hasattr(node, 'post') and node.post: 139 # Convert to dict if needed to ensure we can process it 140 if hasattr(node.post, '__dict__'): 141 post_dict = node.post.__dict__.copy() 142 elif isinstance(node.post, dict): 143 post_dict = node.post.copy() 144 else: 145 post_dict = {} 146 147 posts.append(post_dict) 148 149 # Handle the thread structure 150 if hasattr(thread_data, 'thread'): 151 # Start from the main thread node 152 traverse_thread(thread_data.thread) 153 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__: 154 traverse_thread(thread_data.__dict__['thread']) 155 156 # Return a simple structure with posts list 157 return {'posts': posts} 158 159 160def thread_to_yaml_string(thread, strip_metadata=True): 161 """ 162 Convert thread data to a YAML-formatted string for LLM parsing. 163 164 Args: 165 thread: The thread data from get_post_thread 166 strip_metadata: Whether to strip metadata fields for cleaner output 167 168 Returns: 169 YAML-formatted string representation of the thread 170 """ 171 # First flatten the thread structure to avoid deep nesting 172 flattened = flatten_thread_structure(thread) 173 174 # Convert complex objects to basic types 175 basic_thread = convert_to_basic_types(flattened) 176 177 if strip_metadata: 178 # Create a copy and strip unwanted fields 179 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS) 180 else: 181 cleaned_thread = basic_thread 182 183 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False) 184 185 186 187 188 189 190 191def get_session(username: str) -> Optional[str]: 192 try: 193 with open(f"session_{username}.txt", encoding="UTF-8") as f: 194 return f.read() 195 except FileNotFoundError: 196 logger.debug(f"No existing session found for {username}") 197 return None 198 199def save_session(username: str, session_string: str) -> None: 200 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f: 201 f.write(session_string) 202 logger.debug(f"Session saved for {username}") 203 204def on_session_change(username: str, event: SessionEvent, session: Session) -> None: 205 logger.debug(f"Session changed: {event} {repr(session)}") 206 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 207 logger.debug(f"Saving changed session for {username}") 208 save_session(username, session.export()) 209 210def init_client(username: str, password: str) -> Client: 211 from config_loader import get_bluesky_config 212 try: 213 bluesky_config = get_bluesky_config() 214 pds_uri = bluesky_config.get('pds_uri', 'https://bsky.social') 215 except (ValueError, KeyError): 216 pds_uri = os.getenv("PDS_URI", "https://bsky.social") 217 logger.warning( 218 "Failed to load PDS URI from config. Using environment variable or default." 219 ) 220 221 # Print the PDS URI 222 logger.debug(f"Using PDS URI: {pds_uri}") 223 224 client = Client(pds_uri) 225 client.on_session_change( 226 lambda event, session: on_session_change(username, event, session) 227 ) 228 229 session_string = get_session(username) 230 if session_string: 231 logger.debug(f"Reusing existing session for {username}") 232 client.login(session_string=session_string) 233 else: 234 logger.debug(f"Creating new session for {username}") 235 client.login(username, password) 236 237 return client 238 239 240def default_login(config_path: str = "config.yaml") -> Client: 241 from config_loader import get_bluesky_config 242 try: 243 bluesky_config = get_bluesky_config(config_path) 244 username = bluesky_config['username'] 245 password = bluesky_config['password'] 246 except (ValueError, KeyError) as e: 247 logger.error(f"Failed to load Bluesky configuration: {e}") 248 exit() 249 250 251 if username is None: 252 logger.error( 253 "No username provided. Please provide a username using the BSKY_USERNAME environment variable." 254 ) 255 exit() 256 257 if password is None: 258 logger.error( 259 "No password provided. Please provide a password using the BSKY_PASSWORD environment variable." 260 ) 261 exit() 262 263 return init_client(username, password) 264 265def remove_outside_quotes(text: str) -> str: 266 """ 267 Remove outside double quotes from response text. 268 269 Only handles double quotes to avoid interfering with contractions: 270 - Double quotes: "text" → text 271 - Preserves single quotes and internal quotes 272 273 Args: 274 text: The text to process 275 276 Returns: 277 Text with outside double quotes removed 278 """ 279 if not text or len(text) < 2: 280 return text 281 282 text = text.strip() 283 284 # Only remove double quotes from start and end 285 if text.startswith('"') and text.endswith('"'): 286 return text[1:-1] 287 288 return text 289 290def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None) -> Dict[str, Any]: 291 """ 292 Reply to a post on Bluesky with rich text support. 293 294 Args: 295 client: Authenticated Bluesky client 296 text: The reply text 297 reply_to_uri: The URI of the post being replied to (parent) 298 reply_to_cid: The CID of the post being replied to (parent) 299 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri 300 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid 301 lang: Language code for the post (e.g., 'en-US', 'es', 'ja') 302 303 Returns: 304 The response from sending the post 305 """ 306 import re 307 308 # If root is not provided, this is a reply to the root post 309 if root_uri is None: 310 root_uri = reply_to_uri 311 root_cid = reply_to_cid 312 313 # Create references for the reply 314 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid)) 315 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid)) 316 317 # Parse rich text facets (mentions and URLs) 318 facets = [] 319 text_bytes = text.encode("UTF-8") 320 321 # Parse mentions - fixed to handle @ at start of text 322 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 323 324 for m in re.finditer(mention_regex, text_bytes): 325 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix 326 # Adjust byte positions to account for the optional prefix 327 mention_start = m.start(1) 328 mention_end = m.end(1) 329 try: 330 # Resolve handle to DID using the API 331 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle}) 332 if resolve_resp and hasattr(resolve_resp, 'did'): 333 facets.append( 334 models.AppBskyRichtextFacet.Main( 335 index=models.AppBskyRichtextFacet.ByteSlice( 336 byteStart=mention_start, 337 byteEnd=mention_end 338 ), 339 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)] 340 ) 341 ) 342 except Exception as e: 343 logger.debug(f"Failed to resolve handle {handle}: {e}") 344 continue 345 346 # Parse URLs - fixed to handle URLs at start of text 347 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 348 349 for m in re.finditer(url_regex, text_bytes): 350 url = m.group(1).decode("UTF-8") 351 # Adjust byte positions to account for the optional prefix 352 url_start = m.start(1) 353 url_end = m.end(1) 354 facets.append( 355 models.AppBskyRichtextFacet.Main( 356 index=models.AppBskyRichtextFacet.ByteSlice( 357 byteStart=url_start, 358 byteEnd=url_end 359 ), 360 features=[models.AppBskyRichtextFacet.Link(uri=url)] 361 ) 362 ) 363 364 # Send the reply with facets if any were found 365 if facets: 366 response = client.send_post( 367 text=text, 368 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 369 facets=facets, 370 langs=[lang] if lang else None 371 ) 372 else: 373 response = client.send_post( 374 text=text, 375 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 376 langs=[lang] if lang else None 377 ) 378 379 logger.info(f"Reply sent successfully: {response.uri}") 380 return response 381 382 383def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]: 384 """ 385 Get the thread containing a post to find root post information. 386 387 Args: 388 client: Authenticated Bluesky client 389 uri: The URI of the post 390 391 Returns: 392 The thread data or None if not found 393 """ 394 try: 395 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10}) 396 return thread 397 except Exception as e: 398 logger.error(f"Error fetching post thread: {e}") 399 return None 400 401 402def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US") -> Optional[Dict[str, Any]]: 403 """ 404 Reply to a notification (mention or reply). 405 406 Args: 407 client: Authenticated Bluesky client 408 notification: The notification object from list_notifications 409 reply_text: The text to reply with 410 lang: Language code for the post (defaults to "en-US") 411 412 Returns: 413 The response from sending the reply or None if failed 414 """ 415 try: 416 # Get the post URI and CID from the notification (handle both dict and object) 417 if isinstance(notification, dict): 418 post_uri = notification.get('uri') 419 post_cid = notification.get('cid') 420 # Check if the notification record has reply info with root 421 record = notification.get('record', {}) 422 reply_info = record.get('reply') if isinstance(record, dict) else None 423 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 424 post_uri = notification.uri 425 post_cid = notification.cid 426 # Check if the notification record has reply info with root 427 reply_info = None 428 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 429 reply_info = notification.record.reply 430 else: 431 post_uri = None 432 post_cid = None 433 reply_info = None 434 435 if not post_uri or not post_cid: 436 logger.error("Notification doesn't have required uri/cid fields") 437 return None 438 439 # Determine root: if post has reply info, use its root; otherwise this post IS the root 440 if reply_info: 441 # Extract root from the notification's reply structure 442 if isinstance(reply_info, dict): 443 root_ref = reply_info.get('root') 444 if root_ref and isinstance(root_ref, dict): 445 root_uri = root_ref.get('uri', post_uri) 446 root_cid = root_ref.get('cid', post_cid) 447 else: 448 # No root in reply info, use post as root 449 root_uri = post_uri 450 root_cid = post_cid 451 elif hasattr(reply_info, 'root'): 452 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 453 root_uri = reply_info.root.uri 454 root_cid = reply_info.root.cid 455 else: 456 root_uri = post_uri 457 root_cid = post_cid 458 else: 459 root_uri = post_uri 460 root_cid = post_cid 461 else: 462 # No reply info means this post IS the root 463 root_uri = post_uri 464 root_cid = post_cid 465 466 # Reply to the notification 467 return reply_to_post( 468 client=client, 469 text=reply_text, 470 reply_to_uri=post_uri, 471 reply_to_cid=post_cid, 472 root_uri=root_uri, 473 root_cid=root_cid, 474 lang=lang 475 ) 476 477 except Exception as e: 478 logger.error(f"Error replying to notification: {e}") 479 return None 480 481 482def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US") -> Optional[List[Dict[str, Any]]]: 483 """ 484 Reply to a notification with a threaded chain of messages (max 15). 485 486 Args: 487 client: Authenticated Bluesky client 488 notification: The notification object from list_notifications 489 reply_messages: List of reply texts (max 15 messages, each max 300 chars) 490 lang: Language code for the posts (defaults to "en-US") 491 492 Returns: 493 List of responses from sending the replies or None if failed 494 """ 495 try: 496 # Validate input 497 if not reply_messages or len(reply_messages) == 0: 498 logger.error("Reply messages list cannot be empty") 499 return None 500 if len(reply_messages) > 15: 501 logger.error(f"Cannot send more than 15 reply messages (got {len(reply_messages)})") 502 return None 503 504 # Get the post URI and CID from the notification (handle both dict and object) 505 if isinstance(notification, dict): 506 post_uri = notification.get('uri') 507 post_cid = notification.get('cid') 508 # Check if the notification record has reply info with root 509 record = notification.get('record', {}) 510 reply_info = record.get('reply') if isinstance(record, dict) else None 511 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 512 post_uri = notification.uri 513 post_cid = notification.cid 514 # Check if the notification record has reply info with root 515 reply_info = None 516 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 517 reply_info = notification.record.reply 518 else: 519 post_uri = None 520 post_cid = None 521 reply_info = None 522 523 if not post_uri or not post_cid: 524 logger.error("Notification doesn't have required uri/cid fields") 525 return None 526 527 # Determine root: if post has reply info, use its root; otherwise this post IS the root 528 if reply_info: 529 # Extract root from the notification's reply structure 530 if isinstance(reply_info, dict): 531 root_ref = reply_info.get('root') 532 if root_ref and isinstance(root_ref, dict): 533 root_uri = root_ref.get('uri', post_uri) 534 root_cid = root_ref.get('cid', post_cid) 535 else: 536 # No root in reply info, use post as root 537 root_uri = post_uri 538 root_cid = post_cid 539 elif hasattr(reply_info, 'root'): 540 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 541 root_uri = reply_info.root.uri 542 root_cid = reply_info.root.cid 543 else: 544 root_uri = post_uri 545 root_cid = post_cid 546 else: 547 root_uri = post_uri 548 root_cid = post_cid 549 else: 550 # No reply info means this post IS the root 551 root_uri = post_uri 552 root_cid = post_cid 553 554 # Send replies in sequence, creating a thread 555 responses = [] 556 current_parent_uri = post_uri 557 current_parent_cid = post_cid 558 559 for i, message in enumerate(reply_messages): 560 logger.info(f"Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...") 561 562 # Send this reply 563 response = reply_to_post( 564 client=client, 565 text=message, 566 reply_to_uri=current_parent_uri, 567 reply_to_cid=current_parent_cid, 568 root_uri=root_uri, 569 root_cid=root_cid, 570 lang=lang 571 ) 572 573 if not response: 574 logger.error(f"Failed to send reply {i+1}, posting system failure message") 575 # Try to post a system failure message 576 failure_response = reply_to_post( 577 client=client, 578 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]", 579 reply_to_uri=current_parent_uri, 580 reply_to_cid=current_parent_cid, 581 root_uri=root_uri, 582 root_cid=root_cid, 583 lang=lang 584 ) 585 if failure_response: 586 responses.append(failure_response) 587 current_parent_uri = failure_response.uri 588 current_parent_cid = failure_response.cid 589 else: 590 logger.error("Could not even send system failure message, stopping thread") 591 return responses if responses else None 592 else: 593 responses.append(response) 594 # Update parent references for next reply (if any) 595 if i < len(reply_messages) - 1: # Not the last message 596 current_parent_uri = response.uri 597 current_parent_cid = response.cid 598 599 logger.info(f"Successfully sent {len(responses)} threaded replies") 600 return responses 601 602 except Exception as e: 603 logger.error(f"Error sending threaded reply to notification: {e}") 604 return None 605 606 607def create_synthesis_ack(client: Client, note: str) -> Optional[Dict[str, Any]]: 608 """ 609 Create a stream.thought.ack record for synthesis without a target post. 610 611 This creates a synthesis acknowledgment with null subject field. 612 613 Args: 614 client: Authenticated Bluesky client 615 note: The synthesis note/content 616 617 Returns: 618 The response from creating the acknowledgment record or None if failed 619 """ 620 try: 621 import requests 622 import json 623 from datetime import datetime, timezone 624 625 # Get session info from the client 626 access_token = None 627 user_did = None 628 629 # Try different ways to get the session info 630 if hasattr(client, '_session') and client._session: 631 access_token = client._session.access_jwt 632 user_did = client._session.did 633 elif hasattr(client, 'access_jwt'): 634 access_token = client.access_jwt 635 user_did = client.did if hasattr(client, 'did') else None 636 else: 637 logger.error("Cannot access client session information") 638 return None 639 640 if not access_token or not user_did: 641 logger.error("Missing access token or DID from session") 642 return None 643 644 from config_loader import get_bluesky_config 645 try: 646 pds_host = get_bluesky_config().get('pds_uri', 'https://bsky.social') 647 except: 648 pds_host = os.getenv("PDS_URI", "https://bsky.social") 649 650 # Create acknowledgment record with null subject 651 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 652 ack_record = { 653 "$type": "stream.thought.ack", 654 "subject": None, # Null subject for synthesis 655 "createdAt": now, 656 "note": note 657 } 658 659 # Create the record 660 headers = {"Authorization": f"Bearer {access_token}"} 661 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 662 663 create_data = { 664 "repo": user_did, 665 "collection": "stream.thought.ack", 666 "record": ack_record 667 } 668 669 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 670 response.raise_for_status() 671 result = response.json() 672 673 logger.info(f"Successfully created synthesis acknowledgment") 674 return result 675 676 except Exception as e: 677 logger.error(f"Error creating synthesis acknowledgment: {e}") 678 return None 679 680 681def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]: 682 """ 683 Create a stream.thought.ack record to acknowledge a post. 684 685 This creates a custom acknowledgment record instead of a standard Bluesky like, 686 allowing void to track which posts it has engaged with. 687 688 Args: 689 client: Authenticated Bluesky client 690 post_uri: The URI of the post to acknowledge 691 post_cid: The CID of the post to acknowledge 692 note: Optional note to attach to the acknowledgment 693 694 Returns: 695 The response from creating the acknowledgment record or None if failed 696 """ 697 try: 698 import requests 699 import json 700 from datetime import datetime, timezone 701 702 # Get session info from the client 703 # The atproto Client stores the session differently 704 access_token = None 705 user_did = None 706 707 # Try different ways to get the session info 708 if hasattr(client, '_session') and client._session: 709 access_token = client._session.access_jwt 710 user_did = client._session.did 711 elif hasattr(client, 'access_jwt'): 712 access_token = client.access_jwt 713 user_did = client.did if hasattr(client, 'did') else None 714 else: 715 logger.error("Cannot access client session information") 716 return None 717 718 if not access_token or not user_did: 719 logger.error("Missing access token or DID from session") 720 return None 721 722 from config_loader import get_bluesky_config 723 try: 724 pds_host = get_bluesky_config().get('pds_uri', 'https://bsky.social') 725 except: 726 pds_host = os.getenv("PDS_URI", "https://bsky.social") 727 728 # Create acknowledgment record with stream.thought.ack type 729 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 730 ack_record = { 731 "$type": "stream.thought.ack", 732 "subject": { 733 "uri": post_uri, 734 "cid": post_cid 735 }, 736 "createdAt": now, 737 "note": note # Will be null if no note provided 738 } 739 740 # Create the record 741 headers = {"Authorization": f"Bearer {access_token}"} 742 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 743 744 create_data = { 745 "repo": user_did, 746 "collection": "stream.thought.ack", 747 "record": ack_record 748 } 749 750 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 751 response.raise_for_status() 752 result = response.json() 753 754 logger.info(f"Successfully acknowledged post: {post_uri}") 755 return result 756 757 except Exception as e: 758 logger.error(f"Error acknowledging post: {e}") 759 return None 760 761 762def create_tool_call_record(client: Client, tool_name: str, arguments: str, tool_call_id: Optional[str] = None) -> Optional[Dict[str, Any]]: 763 """ 764 Create a stream.thought.tool_call record to track tool usage. 765 766 This creates a record of tool calls made by void during processing, 767 allowing for analysis of tool usage patterns and debugging. 768 769 Args: 770 client: Authenticated Bluesky client 771 tool_name: Name of the tool being called 772 arguments: Raw JSON string of the tool arguments 773 tool_call_id: Optional ID of the tool call for correlation 774 775 Returns: 776 The response from creating the tool call record or None if failed 777 """ 778 try: 779 import requests 780 import json 781 from datetime import datetime, timezone 782 783 # Get session info from the client 784 access_token = None 785 user_did = None 786 787 # Try different ways to get the session info 788 if hasattr(client, '_session') and client._session: 789 access_token = client._session.access_jwt 790 user_did = client._session.did 791 elif hasattr(client, 'access_jwt'): 792 access_token = client.access_jwt 793 user_did = client.did if hasattr(client, 'did') else None 794 else: 795 logger.error("Cannot access client session information") 796 return None 797 798 if not access_token or not user_did: 799 logger.error("Missing access token or DID from session") 800 return None 801 802 from config_loader import get_bluesky_config 803 try: 804 pds_host = get_bluesky_config().get('pds_uri', 'https://bsky.social') 805 except: 806 pds_host = os.getenv("PDS_URI", "https://bsky.social") 807 808 # Create tool call record 809 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 810 tool_record = { 811 "$type": "stream.thought.tool.call", 812 "tool_name": tool_name, 813 "arguments": arguments, # Store as string to avoid parsing issues 814 "createdAt": now 815 } 816 817 # Add tool_call_id if provided 818 if tool_call_id: 819 tool_record["tool_call_id"] = tool_call_id 820 821 # Create the record 822 headers = {"Authorization": f"Bearer {access_token}"} 823 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 824 825 create_data = { 826 "repo": user_did, 827 "collection": "stream.thought.tool.call", 828 "record": tool_record 829 } 830 831 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 832 if response.status_code != 200: 833 logger.error(f"Tool call record creation failed: {response.status_code} - {response.text}") 834 response.raise_for_status() 835 result = response.json() 836 837 logger.debug(f"Successfully recorded tool call: {tool_name}") 838 return result 839 840 except Exception as e: 841 logger.error(f"Error creating tool call record: {e}") 842 return None 843 844 845def create_reasoning_record(client: Client, reasoning_text: str) -> Optional[Dict[str, Any]]: 846 """ 847 Create a stream.thought.reasoning record to track agent reasoning. 848 849 This creates a record of void's reasoning during message processing, 850 providing transparency into the decision-making process. 851 852 Args: 853 client: Authenticated Bluesky client 854 reasoning_text: The reasoning text from the agent 855 856 Returns: 857 The response from creating the reasoning record or None if failed 858 """ 859 try: 860 import requests 861 import json 862 from datetime import datetime, timezone 863 864 # Get session info from the client 865 access_token = None 866 user_did = None 867 868 # Try different ways to get the session info 869 if hasattr(client, '_session') and client._session: 870 access_token = client._session.access_jwt 871 user_did = client._session.did 872 elif hasattr(client, 'access_jwt'): 873 access_token = client.access_jwt 874 user_did = client.did if hasattr(client, 'did') else None 875 else: 876 logger.error("Cannot access client session information") 877 return None 878 879 if not access_token or not user_did: 880 logger.error("Missing access token or DID from session") 881 return None 882 883 from config_loader import get_bluesky_config 884 try: 885 pds_host = get_bluesky_config().get('pds_uri', 'https://bsky.social') 886 except: 887 pds_host = os.getenv("PDS_URI", "https://bsky.social") 888 889 # Create reasoning record 890 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 891 reasoning_record = { 892 "$type": "stream.thought.reasoning", 893 "reasoning": reasoning_text, 894 "createdAt": now 895 } 896 897 # Create the record 898 headers = {"Authorization": f"Bearer {access_token}"} 899 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 900 901 create_data = { 902 "repo": user_did, 903 "collection": "stream.thought.reasoning", 904 "record": reasoning_record 905 } 906 907 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 908 response.raise_for_status() 909 result = response.json() 910 911 logger.debug(f"Successfully recorded reasoning (length: {len(reasoning_text)} chars)") 912 return result 913 914 except Exception as e: 915 logger.error(f"Error creating reasoning record: {e}") 916 return None 917 918 919if __name__ == "__main__": 920 client = default_login() 921 # do something with the client 922 logger.info("Client is ready to use!")