a digital person for bluesky
1import os 2import logging 3import uuid 4import time 5from typing import Optional, Dict, Any, List 6from atproto_client import Client, Session, SessionEvent, models 7 8# Configure logging 9logging.basicConfig( 10 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 11) 12logger = logging.getLogger("bluesky_session_handler") 13 14# Load the environment variables 15import dotenv 16dotenv.load_dotenv(override=True) 17 18import yaml 19import json 20 21# Strip fields. A list of fields to remove from a JSON object 22STRIP_FIELDS = [ 23 "cid", 24 "rev", 25 "did", 26 "uri", 27 "langs", 28 "threadgate", 29 "py_type", 30 "labels", 31 "avatar", 32 "viewer", 33 "indexed_at", 34 "tags", 35 "associated", 36 "thread_context", 37 "aspect_ratio", 38 "thumb", 39 "fullsize", 40 "root", 41 "created_at", 42 "verification", 43 "like_count", 44 "quote_count", 45 "reply_count", 46 "repost_count", 47 "embedding_disabled", 48 "thread_muted", 49 "reply_disabled", 50 "pinned", 51 "like", 52 "repost", 53 "blocked_by", 54 "blocking", 55 "blocking_by_list", 56 "followed_by", 57 "following", 58 "known_followers", 59 "muted", 60 "muted_by_list", 61 "root_author_like", 62 "entities", 63 "ref", 64 "mime_type", 65 "size", 66] 67def convert_to_basic_types(obj): 68 """Convert complex Python objects to basic types for JSON/YAML serialization.""" 69 if hasattr(obj, '__dict__'): 70 # Convert objects with __dict__ to their dictionary representation 71 return convert_to_basic_types(obj.__dict__) 72 elif isinstance(obj, dict): 73 return {key: convert_to_basic_types(value) for key, value in obj.items()} 74 elif isinstance(obj, list): 75 return [convert_to_basic_types(item) for item in obj] 76 elif isinstance(obj, (str, int, float, bool)) or obj is None: 77 return obj 78 else: 79 # For other types, try to convert to string 80 return str(obj) 81 82 83def strip_fields(obj, strip_field_list): 84 """Recursively strip fields from a JSON object.""" 85 if isinstance(obj, dict): 86 keys_flagged_for_removal = [] 87 88 # Remove fields from strip list and pydantic metadata 89 for field in list(obj.keys()): 90 if field in strip_field_list or field.startswith("__"): 91 keys_flagged_for_removal.append(field) 92 93 # Remove flagged keys 94 for key in keys_flagged_for_removal: 95 obj.pop(key, None) 96 97 # Recursively process remaining values 98 for key, value in list(obj.items()): 99 obj[key] = strip_fields(value, strip_field_list) 100 # Remove empty/null values after processing 101 if ( 102 obj[key] is None 103 or (isinstance(obj[key], dict) and len(obj[key]) == 0) 104 or (isinstance(obj[key], list) and len(obj[key]) == 0) 105 or (isinstance(obj[key], str) and obj[key].strip() == "") 106 ): 107 obj.pop(key, None) 108 109 elif isinstance(obj, list): 110 for i, value in enumerate(obj): 111 obj[i] = strip_fields(value, strip_field_list) 112 # Remove None values from list 113 obj[:] = [item for item in obj if item is not None] 114 115 return obj 116 117 118def flatten_thread_structure(thread_data): 119 """ 120 Flatten a nested thread structure into a list while preserving all data. 121 122 Args: 123 thread_data: The thread data from get_post_thread 124 125 Returns: 126 Dict with 'posts' key containing a list of posts in chronological order 127 """ 128 posts = [] 129 130 def traverse_thread(node): 131 """Recursively traverse the thread structure to collect posts.""" 132 if not node: 133 return 134 135 # If this node has a parent, traverse it first (to maintain chronological order) 136 if hasattr(node, 'parent') and node.parent: 137 traverse_thread(node.parent) 138 139 # Then add this node's post 140 if hasattr(node, 'post') and node.post: 141 # Convert to dict if needed to ensure we can process it 142 if hasattr(node.post, '__dict__'): 143 post_dict = node.post.__dict__.copy() 144 elif isinstance(node.post, dict): 145 post_dict = node.post.copy() 146 else: 147 post_dict = {} 148 149 posts.append(post_dict) 150 151 # Handle the thread structure 152 if hasattr(thread_data, 'thread'): 153 # Start from the main thread node 154 traverse_thread(thread_data.thread) 155 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__: 156 traverse_thread(thread_data.__dict__['thread']) 157 158 # Return a simple structure with posts list 159 return {'posts': posts} 160 161 162def thread_to_yaml_string(thread, strip_metadata=True): 163 """ 164 Convert thread data to a YAML-formatted string for LLM parsing. 165 166 Args: 167 thread: The thread data from get_post_thread 168 strip_metadata: Whether to strip metadata fields for cleaner output 169 170 Returns: 171 YAML-formatted string representation of the thread 172 """ 173 # First flatten the thread structure to avoid deep nesting 174 flattened = flatten_thread_structure(thread) 175 176 # Convert complex objects to basic types 177 basic_thread = convert_to_basic_types(flattened) 178 179 if strip_metadata: 180 # Create a copy and strip unwanted fields 181 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS) 182 else: 183 cleaned_thread = basic_thread 184 185 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False) 186 187 188 189 190 191 192 193def get_session(username: str) -> Optional[str]: 194 try: 195 with open(f"session_{username}.txt", encoding="UTF-8") as f: 196 return f.read() 197 except FileNotFoundError: 198 logger.debug(f"No existing session found for {username}") 199 return None 200 201def save_session(username: str, session_string: str) -> None: 202 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f: 203 f.write(session_string) 204 logger.debug(f"Session saved for {username}") 205 206def on_session_change(username: str, event: SessionEvent, session: Session) -> None: 207 logger.debug(f"Session changed: {event} {repr(session)}") 208 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 209 logger.debug(f"Saving changed session for {username}") 210 save_session(username, session.export()) 211 212def init_client(username: str, password: str) -> Client: 213 pds_uri = os.getenv("PDS_URI") 214 if pds_uri is None: 215 logger.warning( 216 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable." 217 ) 218 pds_uri = "https://bsky.social" 219 220 # Print the PDS URI 221 logger.debug(f"Using PDS URI: {pds_uri}") 222 223 client = Client(pds_uri) 224 client.on_session_change( 225 lambda event, session: on_session_change(username, event, session) 226 ) 227 228 session_string = get_session(username) 229 if session_string: 230 logger.debug(f"Reusing existing session for {username}") 231 client.login(session_string=session_string) 232 else: 233 logger.debug(f"Creating new session for {username}") 234 client.login(username, password) 235 236 return client 237 238 239def default_login() -> Client: 240 """Login using configuration from config.yaml or environment variables.""" 241 try: 242 from config_loader import get_bluesky_config 243 bluesky_config = get_bluesky_config() 244 245 username = bluesky_config['username'] 246 password = bluesky_config['password'] 247 pds_uri = bluesky_config.get('pds_uri', 'https://bsky.social') 248 249 logger.info(f"Logging into Bluesky as {username} via {pds_uri}") 250 251 # Use pds_uri from config 252 client = Client(base_url=pds_uri) 253 client.login(username, password) 254 return client 255 256 except Exception as e: 257 logger.error(f"Failed to load Bluesky configuration: {e}") 258 logger.error("Please check your config.yaml file or environment variables") 259 exit(1) 260 261def remove_outside_quotes(text: str) -> str: 262 """ 263 Remove outside double quotes from response text. 264 265 Only handles double quotes to avoid interfering with contractions: 266 - Double quotes: "text" → text 267 - Preserves single quotes and internal quotes 268 269 Args: 270 text: The text to process 271 272 Returns: 273 Text with outside double quotes removed 274 """ 275 if not text or len(text) < 2: 276 return text 277 278 text = text.strip() 279 280 # Only remove double quotes from start and end 281 if text.startswith('"') and text.endswith('"'): 282 return text[1:-1] 283 284 return text 285 286def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None, correlation_id: Optional[str] = None) -> Dict[str, Any]: 287 """ 288 Reply to a post on Bluesky with rich text support. 289 290 Args: 291 client: Authenticated Bluesky client 292 text: The reply text 293 reply_to_uri: The URI of the post being replied to (parent) 294 reply_to_cid: The CID of the post being replied to (parent) 295 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri 296 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid 297 lang: Language code for the post (e.g., 'en-US', 'es', 'ja') 298 correlation_id: Unique ID for tracking this message through the pipeline 299 300 Returns: 301 The response from sending the post 302 """ 303 import re 304 305 # Generate correlation ID if not provided 306 if correlation_id is None: 307 correlation_id = str(uuid.uuid4())[:8] 308 309 # Enhanced logging with structured data 310 logger.info(f"[{correlation_id}] Starting reply_to_post", extra={ 311 'correlation_id': correlation_id, 312 'text_length': len(text), 313 'text_preview': text[:100] + '...' if len(text) > 100 else text, 314 'reply_to_uri': reply_to_uri, 315 'root_uri': root_uri, 316 'lang': lang 317 }) 318 319 start_time = time.time() 320 321 # If root is not provided, this is a reply to the root post 322 if root_uri is None: 323 root_uri = reply_to_uri 324 root_cid = reply_to_cid 325 326 # Create references for the reply 327 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid)) 328 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid)) 329 330 # Parse rich text facets (mentions and URLs) 331 facets = [] 332 text_bytes = text.encode("UTF-8") 333 mentions_found = [] 334 urls_found = [] 335 336 logger.debug(f"[{correlation_id}] Parsing facets from text (length: {len(text_bytes)} bytes)") 337 338 # Parse mentions - fixed to handle @ at start of text 339 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 340 341 for m in re.finditer(mention_regex, text_bytes): 342 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix 343 mentions_found.append(handle) 344 # Adjust byte positions to account for the optional prefix 345 mention_start = m.start(1) 346 mention_end = m.end(1) 347 try: 348 # Resolve handle to DID using the API 349 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle}) 350 if resolve_resp and hasattr(resolve_resp, 'did'): 351 facets.append( 352 models.AppBskyRichtextFacet.Main( 353 index=models.AppBskyRichtextFacet.ByteSlice( 354 byteStart=mention_start, 355 byteEnd=mention_end 356 ), 357 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)] 358 ) 359 ) 360 logger.debug(f"[{correlation_id}] Resolved mention @{handle} -> {resolve_resp.did}") 361 except Exception as e: 362 logger.warning(f"[{correlation_id}] Failed to resolve handle @{handle}: {e}") 363 continue 364 365 # Parse URLs - fixed to handle URLs at start of text 366 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 367 368 for m in re.finditer(url_regex, text_bytes): 369 url = m.group(1).decode("UTF-8") 370 urls_found.append(url) 371 # Adjust byte positions to account for the optional prefix 372 url_start = m.start(1) 373 url_end = m.end(1) 374 facets.append( 375 models.AppBskyRichtextFacet.Main( 376 index=models.AppBskyRichtextFacet.ByteSlice( 377 byteStart=url_start, 378 byteEnd=url_end 379 ), 380 features=[models.AppBskyRichtextFacet.Link(uri=url)] 381 ) 382 ) 383 logger.debug(f"[{correlation_id}] Found URL: {url}") 384 385 logger.debug(f"[{correlation_id}] Facet parsing complete", extra={ 386 'correlation_id': correlation_id, 387 'mentions_count': len(mentions_found), 388 'mentions': mentions_found, 389 'urls_count': len(urls_found), 390 'urls': urls_found, 391 'total_facets': len(facets) 392 }) 393 394 # Send the reply with facets if any were found 395 logger.info(f"[{correlation_id}] Sending reply to Bluesky API", extra={ 396 'correlation_id': correlation_id, 397 'has_facets': bool(facets), 398 'facet_count': len(facets), 399 'lang': lang 400 }) 401 402 try: 403 if facets: 404 response = client.send_post( 405 text=text, 406 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 407 facets=facets, 408 langs=[lang] if lang else None 409 ) 410 else: 411 response = client.send_post( 412 text=text, 413 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 414 langs=[lang] if lang else None 415 ) 416 417 # Calculate response time 418 response_time = time.time() - start_time 419 420 # Extract post URL for user-friendly logging 421 post_url = None 422 if hasattr(response, 'uri') and response.uri: 423 # Convert AT-URI to web URL 424 # Format: at://did:plc:xxx/app.bsky.feed.post/xxx -> https://bsky.app/profile/handle/post/xxx 425 try: 426 uri_parts = response.uri.split('/') 427 if len(uri_parts) >= 4 and uri_parts[3] == 'app.bsky.feed.post': 428 rkey = uri_parts[4] 429 # We'd need to resolve DID to handle, but for now just use the URI 430 post_url = f"bsky://post/{rkey}" 431 except: 432 pass 433 434 logger.info(f"[{correlation_id}] Reply sent successfully ({response_time:.3f}s) - URI: {response.uri}" + 435 (f" - URL: {post_url}" if post_url else ""), extra={ 436 'correlation_id': correlation_id, 437 'response_time': round(response_time, 3), 438 'post_uri': response.uri, 439 'post_url': post_url, 440 'post_cid': getattr(response, 'cid', None), 441 'text_length': len(text) 442 }) 443 444 return response 445 446 except Exception as e: 447 response_time = time.time() - start_time 448 logger.error(f"[{correlation_id}] Failed to send reply", extra={ 449 'correlation_id': correlation_id, 450 'error': str(e), 451 'error_type': type(e).__name__, 452 'response_time': round(response_time, 3), 453 'text_length': len(text) 454 }) 455 raise 456 457 458def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]: 459 """ 460 Get the thread containing a post to find root post information. 461 462 Args: 463 client: Authenticated Bluesky client 464 uri: The URI of the post 465 466 Returns: 467 The thread data or None if not found 468 """ 469 try: 470 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10}) 471 return thread 472 except Exception as e: 473 logger.error(f"Error fetching post thread: {e}") 474 return None 475 476 477def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[Dict[str, Any]]: 478 """ 479 Reply to a notification (mention or reply). 480 481 Args: 482 client: Authenticated Bluesky client 483 notification: The notification object from list_notifications 484 reply_text: The text to reply with 485 lang: Language code for the post (defaults to "en-US") 486 correlation_id: Unique ID for tracking this message through the pipeline 487 488 Returns: 489 The response from sending the reply or None if failed 490 """ 491 # Generate correlation ID if not provided 492 if correlation_id is None: 493 correlation_id = str(uuid.uuid4())[:8] 494 495 logger.info(f"[{correlation_id}] Processing reply_to_notification", extra={ 496 'correlation_id': correlation_id, 497 'reply_length': len(reply_text), 498 'lang': lang 499 }) 500 501 try: 502 # Get the post URI and CID from the notification (handle both dict and object) 503 if isinstance(notification, dict): 504 post_uri = notification.get('uri') 505 post_cid = notification.get('cid') 506 # Check if the notification record has reply info with root 507 record = notification.get('record', {}) 508 reply_info = record.get('reply') if isinstance(record, dict) else None 509 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 510 post_uri = notification.uri 511 post_cid = notification.cid 512 # Check if the notification record has reply info with root 513 reply_info = None 514 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 515 reply_info = notification.record.reply 516 else: 517 post_uri = None 518 post_cid = None 519 reply_info = None 520 521 if not post_uri or not post_cid: 522 logger.error("Notification doesn't have required uri/cid fields") 523 return None 524 525 # Determine root: if post has reply info, use its root; otherwise this post IS the root 526 if reply_info: 527 # Extract root from the notification's reply structure 528 if isinstance(reply_info, dict): 529 root_ref = reply_info.get('root') 530 if root_ref and isinstance(root_ref, dict): 531 root_uri = root_ref.get('uri', post_uri) 532 root_cid = root_ref.get('cid', post_cid) 533 else: 534 # No root in reply info, use post as root 535 root_uri = post_uri 536 root_cid = post_cid 537 elif hasattr(reply_info, 'root'): 538 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 539 root_uri = reply_info.root.uri 540 root_cid = reply_info.root.cid 541 else: 542 root_uri = post_uri 543 root_cid = post_cid 544 else: 545 root_uri = post_uri 546 root_cid = post_cid 547 else: 548 # No reply info means this post IS the root 549 root_uri = post_uri 550 root_cid = post_cid 551 552 # Reply to the notification 553 return reply_to_post( 554 client=client, 555 text=reply_text, 556 reply_to_uri=post_uri, 557 reply_to_cid=post_cid, 558 root_uri=root_uri, 559 root_cid=root_cid, 560 lang=lang, 561 correlation_id=correlation_id 562 ) 563 564 except Exception as e: 565 logger.error(f"[{correlation_id}] Error replying to notification: {e}", extra={ 566 'correlation_id': correlation_id, 567 'error': str(e), 568 'error_type': type(e).__name__ 569 }) 570 return None 571 572 573def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[List[Dict[str, Any]]]: 574 """ 575 Reply to a notification with a threaded chain of messages (max 15). 576 577 Args: 578 client: Authenticated Bluesky client 579 notification: The notification object from list_notifications 580 reply_messages: List of reply texts (max 15 messages, each max 300 chars) 581 lang: Language code for the posts (defaults to "en-US") 582 correlation_id: Unique ID for tracking this message through the pipeline 583 584 Returns: 585 List of responses from sending the replies or None if failed 586 """ 587 # Generate correlation ID if not provided 588 if correlation_id is None: 589 correlation_id = str(uuid.uuid4())[:8] 590 591 logger.info(f"[{correlation_id}] Starting threaded reply", extra={ 592 'correlation_id': correlation_id, 593 'message_count': len(reply_messages), 594 'total_length': sum(len(msg) for msg in reply_messages), 595 'lang': lang 596 }) 597 598 try: 599 # Validate input 600 if not reply_messages or len(reply_messages) == 0: 601 logger.error(f"[{correlation_id}] Reply messages list cannot be empty") 602 return None 603 if len(reply_messages) > 15: 604 logger.error(f"[{correlation_id}] Cannot send more than 15 reply messages (got {len(reply_messages)})") 605 return None 606 607 # Get the post URI and CID from the notification (handle both dict and object) 608 if isinstance(notification, dict): 609 post_uri = notification.get('uri') 610 post_cid = notification.get('cid') 611 # Check if the notification record has reply info with root 612 record = notification.get('record', {}) 613 reply_info = record.get('reply') if isinstance(record, dict) else None 614 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 615 post_uri = notification.uri 616 post_cid = notification.cid 617 # Check if the notification record has reply info with root 618 reply_info = None 619 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 620 reply_info = notification.record.reply 621 else: 622 post_uri = None 623 post_cid = None 624 reply_info = None 625 626 if not post_uri or not post_cid: 627 logger.error("Notification doesn't have required uri/cid fields") 628 return None 629 630 # Determine root: if post has reply info, use its root; otherwise this post IS the root 631 if reply_info: 632 # Extract root from the notification's reply structure 633 if isinstance(reply_info, dict): 634 root_ref = reply_info.get('root') 635 if root_ref and isinstance(root_ref, dict): 636 root_uri = root_ref.get('uri', post_uri) 637 root_cid = root_ref.get('cid', post_cid) 638 else: 639 # No root in reply info, use post as root 640 root_uri = post_uri 641 root_cid = post_cid 642 elif hasattr(reply_info, 'root'): 643 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 644 root_uri = reply_info.root.uri 645 root_cid = reply_info.root.cid 646 else: 647 root_uri = post_uri 648 root_cid = post_cid 649 else: 650 root_uri = post_uri 651 root_cid = post_cid 652 else: 653 # No reply info means this post IS the root 654 root_uri = post_uri 655 root_cid = post_cid 656 657 # Send replies in sequence, creating a thread 658 responses = [] 659 current_parent_uri = post_uri 660 current_parent_cid = post_cid 661 662 for i, message in enumerate(reply_messages): 663 thread_correlation_id = f"{correlation_id}-{i+1}" 664 logger.info(f"[{thread_correlation_id}] Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...") 665 666 # Send this reply 667 response = reply_to_post( 668 client=client, 669 text=message, 670 reply_to_uri=current_parent_uri, 671 reply_to_cid=current_parent_cid, 672 root_uri=root_uri, 673 root_cid=root_cid, 674 lang=lang, 675 correlation_id=thread_correlation_id 676 ) 677 678 if not response: 679 logger.error(f"[{thread_correlation_id}] Failed to send reply {i+1}, posting system failure message") 680 # Try to post a system failure message 681 failure_response = reply_to_post( 682 client=client, 683 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]", 684 reply_to_uri=current_parent_uri, 685 reply_to_cid=current_parent_cid, 686 root_uri=root_uri, 687 root_cid=root_cid, 688 lang=lang, 689 correlation_id=f"{thread_correlation_id}-FAIL" 690 ) 691 if failure_response: 692 responses.append(failure_response) 693 current_parent_uri = failure_response.uri 694 current_parent_cid = failure_response.cid 695 else: 696 logger.error(f"[{thread_correlation_id}] Could not even send system failure message, stopping thread") 697 return responses if responses else None 698 else: 699 responses.append(response) 700 # Update parent references for next reply (if any) 701 if i < len(reply_messages) - 1: # Not the last message 702 current_parent_uri = response.uri 703 current_parent_cid = response.cid 704 705 logger.info(f"[{correlation_id}] Successfully sent {len(responses)} threaded replies", extra={ 706 'correlation_id': correlation_id, 707 'replies_sent': len(responses), 708 'replies_requested': len(reply_messages) 709 }) 710 return responses 711 712 except Exception as e: 713 logger.error(f"[{correlation_id}] Error sending threaded reply to notification: {e}", extra={ 714 'correlation_id': correlation_id, 715 'error': str(e), 716 'error_type': type(e).__name__, 717 'message_count': len(reply_messages) 718 }) 719 return None 720 721 722def create_synthesis_ack(client: Client, note: str) -> Optional[Dict[str, Any]]: 723 """ 724 Create a stream.thought.ack record for synthesis without a target post. 725 726 This creates a synthesis acknowledgment with null subject field. 727 728 Args: 729 client: Authenticated Bluesky client 730 note: The synthesis note/content 731 732 Returns: 733 The response from creating the acknowledgment record or None if failed 734 """ 735 try: 736 import requests 737 import json 738 from datetime import datetime, timezone 739 740 # Get session info from the client 741 access_token = None 742 user_did = None 743 744 # Try different ways to get the session info 745 if hasattr(client, '_session') and client._session: 746 access_token = client._session.access_jwt 747 user_did = client._session.did 748 elif hasattr(client, 'access_jwt'): 749 access_token = client.access_jwt 750 user_did = client.did if hasattr(client, 'did') else None 751 else: 752 logger.error("Cannot access client session information") 753 return None 754 755 if not access_token or not user_did: 756 logger.error("Missing access token or DID from session") 757 return None 758 759 # Get PDS URI from config instead of environment variables 760 from config_loader import get_bluesky_config 761 bluesky_config = get_bluesky_config() 762 pds_host = bluesky_config['pds_uri'] 763 764 # Create acknowledgment record with null subject 765 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 766 ack_record = { 767 "$type": "stream.thought.ack", 768 "subject": None, # Null subject for synthesis 769 "createdAt": now, 770 "note": note 771 } 772 773 # Create the record 774 headers = {"Authorization": f"Bearer {access_token}"} 775 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 776 777 create_data = { 778 "repo": user_did, 779 "collection": "stream.thought.ack", 780 "record": ack_record 781 } 782 783 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 784 response.raise_for_status() 785 result = response.json() 786 787 logger.info(f"Successfully created synthesis acknowledgment") 788 return result 789 790 except Exception as e: 791 logger.error(f"Error creating synthesis acknowledgment: {e}") 792 return None 793 794 795def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]: 796 """ 797 Create a stream.thought.ack record to acknowledge a post. 798 799 This creates a custom acknowledgment record instead of a standard Bluesky like, 800 allowing void to track which posts it has engaged with. 801 802 Args: 803 client: Authenticated Bluesky client 804 post_uri: The URI of the post to acknowledge 805 post_cid: The CID of the post to acknowledge 806 note: Optional note to attach to the acknowledgment 807 808 Returns: 809 The response from creating the acknowledgment record or None if failed 810 """ 811 try: 812 import requests 813 import json 814 from datetime import datetime, timezone 815 816 # Get session info from the client 817 # The atproto Client stores the session differently 818 access_token = None 819 user_did = None 820 821 # Try different ways to get the session info 822 if hasattr(client, '_session') and client._session: 823 access_token = client._session.access_jwt 824 user_did = client._session.did 825 elif hasattr(client, 'access_jwt'): 826 access_token = client.access_jwt 827 user_did = client.did if hasattr(client, 'did') else None 828 else: 829 logger.error("Cannot access client session information") 830 return None 831 832 if not access_token or not user_did: 833 logger.error("Missing access token or DID from session") 834 return None 835 836 # Get PDS URI from config instead of environment variables 837 from config_loader import get_bluesky_config 838 bluesky_config = get_bluesky_config() 839 pds_host = bluesky_config['pds_uri'] 840 841 # Create acknowledgment record with stream.thought.ack type 842 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 843 ack_record = { 844 "$type": "stream.thought.ack", 845 "subject": { 846 "uri": post_uri, 847 "cid": post_cid 848 }, 849 "createdAt": now, 850 "note": note # Will be null if no note provided 851 } 852 853 # Create the record 854 headers = {"Authorization": f"Bearer {access_token}"} 855 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 856 857 create_data = { 858 "repo": user_did, 859 "collection": "stream.thought.ack", 860 "record": ack_record 861 } 862 863 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 864 response.raise_for_status() 865 result = response.json() 866 867 logger.info(f"Successfully acknowledged post: {post_uri}") 868 return result 869 870 except Exception as e: 871 logger.error(f"Error acknowledging post: {e}") 872 return None 873 874 875def create_tool_call_record(client: Client, tool_name: str, arguments: str, tool_call_id: Optional[str] = None) -> Optional[Dict[str, Any]]: 876 """ 877 Create a stream.thought.tool_call record to track tool usage. 878 879 This creates a record of tool calls made by void during processing, 880 allowing for analysis of tool usage patterns and debugging. 881 882 Args: 883 client: Authenticated Bluesky client 884 tool_name: Name of the tool being called 885 arguments: Raw JSON string of the tool arguments 886 tool_call_id: Optional ID of the tool call for correlation 887 888 Returns: 889 The response from creating the tool call record or None if failed 890 """ 891 try: 892 import requests 893 import json 894 from datetime import datetime, timezone 895 896 # Get session info from the client 897 access_token = None 898 user_did = None 899 900 # Try different ways to get the session info 901 if hasattr(client, '_session') and client._session: 902 access_token = client._session.access_jwt 903 user_did = client._session.did 904 elif hasattr(client, 'access_jwt'): 905 access_token = client.access_jwt 906 user_did = client.did if hasattr(client, 'did') else None 907 else: 908 logger.error("Cannot access client session information") 909 return None 910 911 if not access_token or not user_did: 912 logger.error("Missing access token or DID from session") 913 return None 914 915 # Get PDS URI from config instead of environment variables 916 from config_loader import get_bluesky_config 917 bluesky_config = get_bluesky_config() 918 pds_host = bluesky_config['pds_uri'] 919 920 # Create tool call record 921 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 922 tool_record = { 923 "$type": "stream.thought.tool.call", 924 "tool_name": tool_name, 925 "arguments": arguments, # Store as string to avoid parsing issues 926 "createdAt": now 927 } 928 929 # Add tool_call_id if provided 930 if tool_call_id: 931 tool_record["tool_call_id"] = tool_call_id 932 933 # Create the record 934 headers = {"Authorization": f"Bearer {access_token}"} 935 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 936 937 create_data = { 938 "repo": user_did, 939 "collection": "stream.thought.tool.call", 940 "record": tool_record 941 } 942 943 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 944 if response.status_code != 200: 945 logger.error(f"Tool call record creation failed: {response.status_code} - {response.text}") 946 response.raise_for_status() 947 result = response.json() 948 949 logger.debug(f"Successfully recorded tool call: {tool_name}") 950 return result 951 952 except Exception as e: 953 logger.error(f"Error creating tool call record: {e}") 954 return None 955 956 957def create_reasoning_record(client: Client, reasoning_text: str) -> Optional[Dict[str, Any]]: 958 """ 959 Create a stream.thought.reasoning record to track agent reasoning. 960 961 This creates a record of void's reasoning during message processing, 962 providing transparency into the decision-making process. 963 964 Args: 965 client: Authenticated Bluesky client 966 reasoning_text: The reasoning text from the agent 967 968 Returns: 969 The response from creating the reasoning record or None if failed 970 """ 971 try: 972 import requests 973 import json 974 from datetime import datetime, timezone 975 976 # Get session info from the client 977 access_token = None 978 user_did = None 979 980 # Try different ways to get the session info 981 if hasattr(client, '_session') and client._session: 982 access_token = client._session.access_jwt 983 user_did = client._session.did 984 elif hasattr(client, 'access_jwt'): 985 access_token = client.access_jwt 986 user_did = client.did if hasattr(client, 'did') else None 987 else: 988 logger.error("Cannot access client session information") 989 return None 990 991 if not access_token or not user_did: 992 logger.error("Missing access token or DID from session") 993 return None 994 995 # Get PDS URI from config instead of environment variables 996 from config_loader import get_bluesky_config 997 bluesky_config = get_bluesky_config() 998 pds_host = bluesky_config['pds_uri'] 999 1000 # Create reasoning record 1001 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 1002 reasoning_record = { 1003 "$type": "stream.thought.reasoning", 1004 "reasoning": reasoning_text, 1005 "createdAt": now 1006 } 1007 1008 # Create the record 1009 headers = {"Authorization": f"Bearer {access_token}"} 1010 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 1011 1012 create_data = { 1013 "repo": user_did, 1014 "collection": "stream.thought.reasoning", 1015 "record": reasoning_record 1016 } 1017 1018 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 1019 response.raise_for_status() 1020 result = response.json() 1021 1022 logger.debug(f"Successfully recorded reasoning (length: {len(reasoning_text)} chars)") 1023 return result 1024 1025 except Exception as e: 1026 logger.error(f"Error creating reasoning record: {e}") 1027 return None 1028 1029 1030if __name__ == "__main__": 1031 client = default_login() 1032 # do something with the client 1033 logger.info("Client is ready to use!")