a digital person for bluesky
1import os 2import logging 3import uuid 4import time 5from typing import Optional, Dict, Any, List 6from atproto_client import Client, Session, SessionEvent, models 7 8# Configure logging 9logging.basicConfig( 10 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 11) 12logger = logging.getLogger("bluesky_session_handler") 13 14# Load the environment variables 15import dotenv 16dotenv.load_dotenv(override=True) 17 18import yaml 19import json 20 21# Strip fields. A list of fields to remove from a JSON object 22STRIP_FIELDS = [ 23 "cid", 24 "rev", 25 "did", 26 "uri", 27 "langs", 28 "threadgate", 29 "py_type", 30 "labels", 31 "avatar", 32 "viewer", 33 "indexed_at", 34 "tags", 35 "associated", 36 "thread_context", 37 "aspect_ratio", 38 "thumb", 39 "fullsize", 40 "root", 41 "created_at", 42 "verification", 43 "like_count", 44 "quote_count", 45 "reply_count", 46 "repost_count", 47 "embedding_disabled", 48 "thread_muted", 49 "reply_disabled", 50 "pinned", 51 "like", 52 "repost", 53 "blocked_by", 54 "blocking", 55 "blocking_by_list", 56 "followed_by", 57 "following", 58 "known_followers", 59 "muted", 60 "muted_by_list", 61 "root_author_like", 62 "entities", 63 "ref", 64 "mime_type", 65 "size", 66] 67def convert_to_basic_types(obj): 68 """Convert complex Python objects to basic types for JSON/YAML serialization.""" 69 if hasattr(obj, '__dict__'): 70 # Convert objects with __dict__ to their dictionary representation 71 return convert_to_basic_types(obj.__dict__) 72 elif isinstance(obj, dict): 73 return {key: convert_to_basic_types(value) for key, value in obj.items()} 74 elif isinstance(obj, list): 75 return [convert_to_basic_types(item) for item in obj] 76 elif isinstance(obj, (str, int, float, bool)) or obj is None: 77 return obj 78 else: 79 # For other types, try to convert to string 80 return str(obj) 81 82 83def strip_fields(obj, strip_field_list): 84 """Recursively strip fields from a JSON object.""" 85 if isinstance(obj, dict): 86 keys_flagged_for_removal = [] 87 88 # Remove fields from strip list and pydantic metadata 89 for field in list(obj.keys()): 90 if field in strip_field_list or field.startswith("__"): 91 keys_flagged_for_removal.append(field) 92 93 # Remove flagged keys 94 for key in keys_flagged_for_removal: 95 obj.pop(key, None) 96 97 # Recursively process remaining values 98 for key, value in list(obj.items()): 99 obj[key] = strip_fields(value, strip_field_list) 100 # Remove empty/null values after processing 101 if ( 102 obj[key] is None 103 or (isinstance(obj[key], dict) and len(obj[key]) == 0) 104 or (isinstance(obj[key], list) and len(obj[key]) == 0) 105 or (isinstance(obj[key], str) and obj[key].strip() == "") 106 ): 107 obj.pop(key, None) 108 109 elif isinstance(obj, list): 110 for i, value in enumerate(obj): 111 obj[i] = strip_fields(value, strip_field_list) 112 # Remove None values from list 113 obj[:] = [item for item in obj if item is not None] 114 115 return obj 116 117 118def flatten_thread_structure(thread_data): 119 """ 120 Flatten a nested thread structure into a list while preserving all data. 121 122 Args: 123 thread_data: The thread data from get_post_thread 124 125 Returns: 126 Dict with 'posts' key containing a list of posts in chronological order 127 """ 128 posts = [] 129 130 def traverse_thread(node): 131 """Recursively traverse the thread structure to collect posts.""" 132 if not node: 133 return 134 135 # If this node has a parent, traverse it first (to maintain chronological order) 136 if hasattr(node, 'parent') and node.parent: 137 traverse_thread(node.parent) 138 139 # Then add this node's post 140 if hasattr(node, 'post') and node.post: 141 # Convert to dict if needed to ensure we can process it 142 if hasattr(node.post, '__dict__'): 143 post_dict = node.post.__dict__.copy() 144 elif isinstance(node.post, dict): 145 post_dict = node.post.copy() 146 else: 147 post_dict = {} 148 149 posts.append(post_dict) 150 151 # Handle the thread structure 152 if hasattr(thread_data, 'thread'): 153 # Start from the main thread node 154 traverse_thread(thread_data.thread) 155 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__: 156 traverse_thread(thread_data.__dict__['thread']) 157 158 # Return a simple structure with posts list 159 return {'posts': posts} 160 161 162def count_thread_posts(thread): 163 """ 164 Count the number of posts in a thread. 165 166 Args: 167 thread: The thread data from get_post_thread 168 169 Returns: 170 Integer count of posts in the thread 171 """ 172 flattened = flatten_thread_structure(thread) 173 return len(flattened.get('posts', [])) 174 175 176def thread_to_yaml_string(thread, strip_metadata=True): 177 """ 178 Convert thread data to a YAML-formatted string for LLM parsing. 179 180 Args: 181 thread: The thread data from get_post_thread 182 strip_metadata: Whether to strip metadata fields for cleaner output 183 184 Returns: 185 YAML-formatted string representation of the thread 186 """ 187 # First flatten the thread structure to avoid deep nesting 188 flattened = flatten_thread_structure(thread) 189 190 # Convert complex objects to basic types 191 basic_thread = convert_to_basic_types(flattened) 192 193 if strip_metadata: 194 # Create a copy and strip unwanted fields 195 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS) 196 else: 197 cleaned_thread = basic_thread 198 199 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False) 200 201 202 203 204 205 206 207def get_session(username: str) -> Optional[str]: 208 try: 209 with open(f"session_{username}.txt", encoding="UTF-8") as f: 210 return f.read() 211 except FileNotFoundError: 212 logger.debug(f"No existing session found for {username}") 213 return None 214 215def save_session(username: str, session_string: str) -> None: 216 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f: 217 f.write(session_string) 218 logger.debug(f"Session saved for {username}") 219 220def on_session_change(username: str, event: SessionEvent, session: Session) -> None: 221 logger.debug(f"Session changed: {event} {repr(session)}") 222 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 223 logger.debug(f"Saving changed session for {username}") 224 save_session(username, session.export()) 225 226def init_client(username: str, password: str) -> Client: 227 pds_uri = os.getenv("PDS_URI") 228 if pds_uri is None: 229 logger.warning( 230 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable." 231 ) 232 pds_uri = "https://bsky.social" 233 234 # Print the PDS URI 235 logger.debug(f"Using PDS URI: {pds_uri}") 236 237 client = Client(pds_uri) 238 client.on_session_change( 239 lambda event, session: on_session_change(username, event, session) 240 ) 241 242 session_string = get_session(username) 243 if session_string: 244 logger.debug(f"Reusing existing session for {username}") 245 client.login(session_string=session_string) 246 else: 247 logger.debug(f"Creating new session for {username}") 248 client.login(username, password) 249 250 return client 251 252 253def default_login() -> Client: 254 """Login using configuration from config.yaml or environment variables.""" 255 try: 256 from config_loader import get_bluesky_config 257 bluesky_config = get_bluesky_config() 258 259 username = bluesky_config['username'] 260 password = bluesky_config['password'] 261 pds_uri = bluesky_config.get('pds_uri', 'https://bsky.social') 262 263 logger.info(f"Logging into Bluesky as {username} via {pds_uri}") 264 265 # Use pds_uri from config 266 client = Client(base_url=pds_uri) 267 client.login(username, password) 268 return client 269 270 except Exception as e: 271 logger.error(f"Failed to load Bluesky configuration: {e}") 272 logger.error("Please check your config.yaml file or environment variables") 273 exit(1) 274 275def remove_outside_quotes(text: str) -> str: 276 """ 277 Remove outside double quotes from response text. 278 279 Only handles double quotes to avoid interfering with contractions: 280 - Double quotes: "text" → text 281 - Preserves single quotes and internal quotes 282 283 Args: 284 text: The text to process 285 286 Returns: 287 Text with outside double quotes removed 288 """ 289 if not text or len(text) < 2: 290 return text 291 292 text = text.strip() 293 294 # Only remove double quotes from start and end 295 if text.startswith('"') and text.endswith('"'): 296 return text[1:-1] 297 298 return text 299 300def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None, correlation_id: Optional[str] = None) -> Dict[str, Any]: 301 """ 302 Reply to a post on Bluesky with rich text support. 303 304 Args: 305 client: Authenticated Bluesky client 306 text: The reply text 307 reply_to_uri: The URI of the post being replied to (parent) 308 reply_to_cid: The CID of the post being replied to (parent) 309 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri 310 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid 311 lang: Language code for the post (e.g., 'en-US', 'es', 'ja') 312 correlation_id: Unique ID for tracking this message through the pipeline 313 314 Returns: 315 The response from sending the post 316 """ 317 import re 318 319 # Generate correlation ID if not provided 320 if correlation_id is None: 321 correlation_id = str(uuid.uuid4())[:8] 322 323 # Enhanced logging with structured data 324 logger.info(f"[{correlation_id}] Starting reply_to_post", extra={ 325 'correlation_id': correlation_id, 326 'text_length': len(text), 327 'text_preview': text[:100] + '...' if len(text) > 100 else text, 328 'reply_to_uri': reply_to_uri, 329 'root_uri': root_uri, 330 'lang': lang 331 }) 332 333 start_time = time.time() 334 335 # If root is not provided, this is a reply to the root post 336 if root_uri is None: 337 root_uri = reply_to_uri 338 root_cid = reply_to_cid 339 340 # Create references for the reply 341 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid)) 342 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid)) 343 344 # Parse rich text facets (mentions and URLs) 345 facets = [] 346 text_bytes = text.encode("UTF-8") 347 mentions_found = [] 348 urls_found = [] 349 350 logger.debug(f"[{correlation_id}] Parsing facets from text (length: {len(text_bytes)} bytes)") 351 352 # Parse mentions - fixed to handle @ at start of text 353 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 354 355 for m in re.finditer(mention_regex, text_bytes): 356 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix 357 mentions_found.append(handle) 358 # Adjust byte positions to account for the optional prefix 359 mention_start = m.start(1) 360 mention_end = m.end(1) 361 try: 362 # Resolve handle to DID using the API 363 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle}) 364 if resolve_resp and hasattr(resolve_resp, 'did'): 365 facets.append( 366 models.AppBskyRichtextFacet.Main( 367 index=models.AppBskyRichtextFacet.ByteSlice( 368 byteStart=mention_start, 369 byteEnd=mention_end 370 ), 371 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)] 372 ) 373 ) 374 logger.debug(f"[{correlation_id}] Resolved mention @{handle} -> {resolve_resp.did}") 375 except Exception as e: 376 logger.warning(f"[{correlation_id}] Failed to resolve handle @{handle}: {e}") 377 continue 378 379 # Parse URLs - fixed to handle URLs at start of text 380 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 381 382 for m in re.finditer(url_regex, text_bytes): 383 url = m.group(1).decode("UTF-8") 384 urls_found.append(url) 385 # Adjust byte positions to account for the optional prefix 386 url_start = m.start(1) 387 url_end = m.end(1) 388 facets.append( 389 models.AppBskyRichtextFacet.Main( 390 index=models.AppBskyRichtextFacet.ByteSlice( 391 byteStart=url_start, 392 byteEnd=url_end 393 ), 394 features=[models.AppBskyRichtextFacet.Link(uri=url)] 395 ) 396 ) 397 logger.debug(f"[{correlation_id}] Found URL: {url}") 398 399 # Parse hashtags 400 hashtag_regex = rb"(?:^|[$|\s])#([a-zA-Z0-9_]+)" 401 hashtags_found = [] 402 403 for m in re.finditer(hashtag_regex, text_bytes): 404 tag = m.group(1).decode("UTF-8") # Get tag without # prefix 405 hashtags_found.append(tag) 406 # Get byte positions for the entire hashtag including # 407 tag_start = m.start(0) 408 # Adjust start if there's a space/prefix 409 if text_bytes[tag_start:tag_start+1] in (b' ', b'$'): 410 tag_start += 1 411 tag_end = m.end(0) 412 facets.append( 413 models.AppBskyRichtextFacet.Main( 414 index=models.AppBskyRichtextFacet.ByteSlice( 415 byteStart=tag_start, 416 byteEnd=tag_end 417 ), 418 features=[models.AppBskyRichtextFacet.Tag(tag=tag)] 419 ) 420 ) 421 logger.debug(f"[{correlation_id}] Found hashtag: #{tag}") 422 423 logger.debug(f"[{correlation_id}] Facet parsing complete", extra={ 424 'correlation_id': correlation_id, 425 'mentions_count': len(mentions_found), 426 'mentions': mentions_found, 427 'urls_count': len(urls_found), 428 'urls': urls_found, 429 'hashtags_count': len(hashtags_found), 430 'hashtags': hashtags_found, 431 'total_facets': len(facets) 432 }) 433 434 # Send the reply with facets if any were found 435 logger.info(f"[{correlation_id}] Sending reply to Bluesky API", extra={ 436 'correlation_id': correlation_id, 437 'has_facets': bool(facets), 438 'facet_count': len(facets), 439 'lang': lang 440 }) 441 442 try: 443 if facets: 444 response = client.send_post( 445 text=text, 446 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 447 facets=facets, 448 langs=[lang] if lang else None 449 ) 450 else: 451 response = client.send_post( 452 text=text, 453 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 454 langs=[lang] if lang else None 455 ) 456 457 # Calculate response time 458 response_time = time.time() - start_time 459 460 # Extract post URL for user-friendly logging 461 post_url = None 462 if hasattr(response, 'uri') and response.uri: 463 # Convert AT-URI to web URL 464 # Format: at://did:plc:xxx/app.bsky.feed.post/xxx -> https://bsky.app/profile/handle/post/xxx 465 try: 466 uri_parts = response.uri.split('/') 467 if len(uri_parts) >= 4 and uri_parts[3] == 'app.bsky.feed.post': 468 rkey = uri_parts[4] 469 # We'd need to resolve DID to handle, but for now just use the URI 470 post_url = f"bsky://post/{rkey}" 471 except: 472 pass 473 474 logger.info(f"[{correlation_id}] Reply sent successfully ({response_time:.3f}s) - URI: {response.uri}" + 475 (f" - URL: {post_url}" if post_url else ""), extra={ 476 'correlation_id': correlation_id, 477 'response_time': round(response_time, 3), 478 'post_uri': response.uri, 479 'post_url': post_url, 480 'post_cid': getattr(response, 'cid', None), 481 'text_length': len(text) 482 }) 483 484 return response 485 486 except Exception as e: 487 response_time = time.time() - start_time 488 logger.error(f"[{correlation_id}] Failed to send reply", extra={ 489 'correlation_id': correlation_id, 490 'error': str(e), 491 'error_type': type(e).__name__, 492 'response_time': round(response_time, 3), 493 'text_length': len(text) 494 }) 495 raise 496 497 498def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]: 499 """ 500 Get the thread containing a post to find root post information. 501 502 Args: 503 client: Authenticated Bluesky client 504 uri: The URI of the post 505 506 Returns: 507 The thread data or None if not found 508 """ 509 try: 510 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10}) 511 return thread 512 except Exception as e: 513 logger.error(f"Error fetching post thread: {e}") 514 return None 515 516 517def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[Dict[str, Any]]: 518 """ 519 Reply to a notification (mention or reply). 520 521 Args: 522 client: Authenticated Bluesky client 523 notification: The notification object from list_notifications 524 reply_text: The text to reply with 525 lang: Language code for the post (defaults to "en-US") 526 correlation_id: Unique ID for tracking this message through the pipeline 527 528 Returns: 529 The response from sending the reply or None if failed 530 """ 531 # Generate correlation ID if not provided 532 if correlation_id is None: 533 correlation_id = str(uuid.uuid4())[:8] 534 535 logger.info(f"[{correlation_id}] Processing reply_to_notification", extra={ 536 'correlation_id': correlation_id, 537 'reply_length': len(reply_text), 538 'lang': lang 539 }) 540 541 try: 542 # Get the post URI and CID from the notification (handle both dict and object) 543 if isinstance(notification, dict): 544 post_uri = notification.get('uri') 545 post_cid = notification.get('cid') 546 # Check if the notification record has reply info with root 547 record = notification.get('record', {}) 548 reply_info = record.get('reply') if isinstance(record, dict) else None 549 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 550 post_uri = notification.uri 551 post_cid = notification.cid 552 # Check if the notification record has reply info with root 553 reply_info = None 554 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 555 reply_info = notification.record.reply 556 else: 557 post_uri = None 558 post_cid = None 559 reply_info = None 560 561 if not post_uri or not post_cid: 562 logger.error("Notification doesn't have required uri/cid fields") 563 return None 564 565 # Determine root: if post has reply info, use its root; otherwise this post IS the root 566 if reply_info: 567 # Extract root from the notification's reply structure 568 if isinstance(reply_info, dict): 569 root_ref = reply_info.get('root') 570 if root_ref and isinstance(root_ref, dict): 571 root_uri = root_ref.get('uri', post_uri) 572 root_cid = root_ref.get('cid', post_cid) 573 else: 574 # No root in reply info, use post as root 575 root_uri = post_uri 576 root_cid = post_cid 577 elif hasattr(reply_info, 'root'): 578 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 579 root_uri = reply_info.root.uri 580 root_cid = reply_info.root.cid 581 else: 582 root_uri = post_uri 583 root_cid = post_cid 584 else: 585 root_uri = post_uri 586 root_cid = post_cid 587 else: 588 # No reply info means this post IS the root 589 root_uri = post_uri 590 root_cid = post_cid 591 592 # Reply to the notification 593 return reply_to_post( 594 client=client, 595 text=reply_text, 596 reply_to_uri=post_uri, 597 reply_to_cid=post_cid, 598 root_uri=root_uri, 599 root_cid=root_cid, 600 lang=lang, 601 correlation_id=correlation_id 602 ) 603 604 except Exception as e: 605 logger.error(f"[{correlation_id}] Error replying to notification: {e}", extra={ 606 'correlation_id': correlation_id, 607 'error': str(e), 608 'error_type': type(e).__name__ 609 }) 610 return None 611 612 613def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[List[Dict[str, Any]]]: 614 """ 615 Reply to a notification with a threaded chain of messages (max 15). 616 617 Args: 618 client: Authenticated Bluesky client 619 notification: The notification object from list_notifications 620 reply_messages: List of reply texts (max 15 messages, each max 300 chars) 621 lang: Language code for the posts (defaults to "en-US") 622 correlation_id: Unique ID for tracking this message through the pipeline 623 624 Returns: 625 List of responses from sending the replies or None if failed 626 """ 627 # Generate correlation ID if not provided 628 if correlation_id is None: 629 correlation_id = str(uuid.uuid4())[:8] 630 631 logger.info(f"[{correlation_id}] Starting threaded reply", extra={ 632 'correlation_id': correlation_id, 633 'message_count': len(reply_messages), 634 'total_length': sum(len(msg) for msg in reply_messages), 635 'lang': lang 636 }) 637 638 try: 639 # Validate input 640 if not reply_messages or len(reply_messages) == 0: 641 logger.error(f"[{correlation_id}] Reply messages list cannot be empty") 642 return None 643 if len(reply_messages) > 15: 644 logger.error(f"[{correlation_id}] Cannot send more than 15 reply messages (got {len(reply_messages)})") 645 return None 646 647 # Get the post URI and CID from the notification (handle both dict and object) 648 if isinstance(notification, dict): 649 post_uri = notification.get('uri') 650 post_cid = notification.get('cid') 651 # Check if the notification record has reply info with root 652 record = notification.get('record', {}) 653 reply_info = record.get('reply') if isinstance(record, dict) else None 654 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 655 post_uri = notification.uri 656 post_cid = notification.cid 657 # Check if the notification record has reply info with root 658 reply_info = None 659 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 660 reply_info = notification.record.reply 661 else: 662 post_uri = None 663 post_cid = None 664 reply_info = None 665 666 if not post_uri or not post_cid: 667 logger.error("Notification doesn't have required uri/cid fields") 668 return None 669 670 # Determine root: if post has reply info, use its root; otherwise this post IS the root 671 if reply_info: 672 # Extract root from the notification's reply structure 673 if isinstance(reply_info, dict): 674 root_ref = reply_info.get('root') 675 if root_ref and isinstance(root_ref, dict): 676 root_uri = root_ref.get('uri', post_uri) 677 root_cid = root_ref.get('cid', post_cid) 678 else: 679 # No root in reply info, use post as root 680 root_uri = post_uri 681 root_cid = post_cid 682 elif hasattr(reply_info, 'root'): 683 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 684 root_uri = reply_info.root.uri 685 root_cid = reply_info.root.cid 686 else: 687 root_uri = post_uri 688 root_cid = post_cid 689 else: 690 root_uri = post_uri 691 root_cid = post_cid 692 else: 693 # No reply info means this post IS the root 694 root_uri = post_uri 695 root_cid = post_cid 696 697 # Send replies in sequence, creating a thread 698 responses = [] 699 current_parent_uri = post_uri 700 current_parent_cid = post_cid 701 702 for i, message in enumerate(reply_messages): 703 thread_correlation_id = f"{correlation_id}-{i+1}" 704 logger.info(f"[{thread_correlation_id}] Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...") 705 706 # Send this reply 707 response = reply_to_post( 708 client=client, 709 text=message, 710 reply_to_uri=current_parent_uri, 711 reply_to_cid=current_parent_cid, 712 root_uri=root_uri, 713 root_cid=root_cid, 714 lang=lang, 715 correlation_id=thread_correlation_id 716 ) 717 718 if not response: 719 logger.error(f"[{thread_correlation_id}] Failed to send reply {i+1}, posting system failure message") 720 # Try to post a system failure message 721 failure_response = reply_to_post( 722 client=client, 723 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]", 724 reply_to_uri=current_parent_uri, 725 reply_to_cid=current_parent_cid, 726 root_uri=root_uri, 727 root_cid=root_cid, 728 lang=lang, 729 correlation_id=f"{thread_correlation_id}-FAIL" 730 ) 731 if failure_response: 732 responses.append(failure_response) 733 current_parent_uri = failure_response.uri 734 current_parent_cid = failure_response.cid 735 else: 736 logger.error(f"[{thread_correlation_id}] Could not even send system failure message, stopping thread") 737 return responses if responses else None 738 else: 739 responses.append(response) 740 # Update parent references for next reply (if any) 741 if i < len(reply_messages) - 1: # Not the last message 742 current_parent_uri = response.uri 743 current_parent_cid = response.cid 744 745 logger.info(f"[{correlation_id}] Successfully sent {len(responses)} threaded replies", extra={ 746 'correlation_id': correlation_id, 747 'replies_sent': len(responses), 748 'replies_requested': len(reply_messages) 749 }) 750 return responses 751 752 except Exception as e: 753 logger.error(f"[{correlation_id}] Error sending threaded reply to notification: {e}", extra={ 754 'correlation_id': correlation_id, 755 'error': str(e), 756 'error_type': type(e).__name__, 757 'message_count': len(reply_messages) 758 }) 759 return None 760 761 762def create_synthesis_ack(client: Client, note: str) -> Optional[Dict[str, Any]]: 763 """ 764 Create a stream.thought.ack record for synthesis without a target post. 765 766 This creates a synthesis acknowledgment with null subject field. 767 768 Args: 769 client: Authenticated Bluesky client 770 note: The synthesis note/content 771 772 Returns: 773 The response from creating the acknowledgment record or None if failed 774 """ 775 try: 776 import requests 777 import json 778 from datetime import datetime, timezone 779 780 # Get session info from the client 781 access_token = None 782 user_did = None 783 784 # Try different ways to get the session info 785 if hasattr(client, '_session') and client._session: 786 access_token = client._session.access_jwt 787 user_did = client._session.did 788 elif hasattr(client, 'access_jwt'): 789 access_token = client.access_jwt 790 user_did = client.did if hasattr(client, 'did') else None 791 else: 792 logger.error("Cannot access client session information") 793 return None 794 795 if not access_token or not user_did: 796 logger.error("Missing access token or DID from session") 797 return None 798 799 # Get PDS URI from config instead of environment variables 800 from config_loader import get_bluesky_config 801 bluesky_config = get_bluesky_config() 802 pds_host = bluesky_config['pds_uri'] 803 804 # Create acknowledgment record with null subject 805 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 806 ack_record = { 807 "$type": "stream.thought.ack", 808 "subject": None, # Null subject for synthesis 809 "createdAt": now, 810 "note": note 811 } 812 813 # Create the record 814 headers = {"Authorization": f"Bearer {access_token}"} 815 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 816 817 create_data = { 818 "repo": user_did, 819 "collection": "stream.thought.ack", 820 "record": ack_record 821 } 822 823 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 824 response.raise_for_status() 825 result = response.json() 826 827 logger.info(f"Successfully created synthesis acknowledgment") 828 return result 829 830 except Exception as e: 831 logger.error(f"Error creating synthesis acknowledgment: {e}") 832 return None 833 834 835def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]: 836 """ 837 Create a stream.thought.ack record to acknowledge a post. 838 839 This creates a custom acknowledgment record instead of a standard Bluesky like, 840 allowing void to track which posts it has engaged with. 841 842 Args: 843 client: Authenticated Bluesky client 844 post_uri: The URI of the post to acknowledge 845 post_cid: The CID of the post to acknowledge 846 note: Optional note to attach to the acknowledgment 847 848 Returns: 849 The response from creating the acknowledgment record or None if failed 850 """ 851 try: 852 import requests 853 import json 854 from datetime import datetime, timezone 855 856 # Get session info from the client 857 # The atproto Client stores the session differently 858 access_token = None 859 user_did = None 860 861 # Try different ways to get the session info 862 if hasattr(client, '_session') and client._session: 863 access_token = client._session.access_jwt 864 user_did = client._session.did 865 elif hasattr(client, 'access_jwt'): 866 access_token = client.access_jwt 867 user_did = client.did if hasattr(client, 'did') else None 868 else: 869 logger.error("Cannot access client session information") 870 return None 871 872 if not access_token or not user_did: 873 logger.error("Missing access token or DID from session") 874 return None 875 876 # Get PDS URI from config instead of environment variables 877 from config_loader import get_bluesky_config 878 bluesky_config = get_bluesky_config() 879 pds_host = bluesky_config['pds_uri'] 880 881 # Create acknowledgment record with stream.thought.ack type 882 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 883 ack_record = { 884 "$type": "stream.thought.ack", 885 "subject": { 886 "uri": post_uri, 887 "cid": post_cid 888 }, 889 "createdAt": now, 890 "note": note # Will be null if no note provided 891 } 892 893 # Create the record 894 headers = {"Authorization": f"Bearer {access_token}"} 895 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 896 897 create_data = { 898 "repo": user_did, 899 "collection": "stream.thought.ack", 900 "record": ack_record 901 } 902 903 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 904 response.raise_for_status() 905 result = response.json() 906 907 logger.info(f"Successfully acknowledged post: {post_uri}") 908 return result 909 910 except Exception as e: 911 logger.error(f"Error acknowledging post: {e}") 912 return None 913 914 915def create_tool_call_record(client: Client, tool_name: str, arguments: str, tool_call_id: Optional[str] = None) -> Optional[Dict[str, Any]]: 916 """ 917 Create a stream.thought.tool_call record to track tool usage. 918 919 This creates a record of tool calls made by void during processing, 920 allowing for analysis of tool usage patterns and debugging. 921 922 Args: 923 client: Authenticated Bluesky client 924 tool_name: Name of the tool being called 925 arguments: Raw JSON string of the tool arguments 926 tool_call_id: Optional ID of the tool call for correlation 927 928 Returns: 929 The response from creating the tool call record or None if failed 930 """ 931 try: 932 import requests 933 import json 934 from datetime import datetime, timezone 935 936 # Get session info from the client 937 access_token = None 938 user_did = None 939 940 # Try different ways to get the session info 941 if hasattr(client, '_session') and client._session: 942 access_token = client._session.access_jwt 943 user_did = client._session.did 944 elif hasattr(client, 'access_jwt'): 945 access_token = client.access_jwt 946 user_did = client.did if hasattr(client, 'did') else None 947 else: 948 logger.error("Cannot access client session information") 949 return None 950 951 if not access_token or not user_did: 952 logger.error("Missing access token or DID from session") 953 return None 954 955 # Get PDS URI from config instead of environment variables 956 from config_loader import get_bluesky_config 957 bluesky_config = get_bluesky_config() 958 pds_host = bluesky_config['pds_uri'] 959 960 # Create tool call record 961 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 962 tool_record = { 963 "$type": "stream.thought.tool.call", 964 "tool_name": tool_name, 965 "arguments": arguments, # Store as string to avoid parsing issues 966 "createdAt": now 967 } 968 969 # Add tool_call_id if provided 970 if tool_call_id: 971 tool_record["tool_call_id"] = tool_call_id 972 973 # Create the record 974 headers = {"Authorization": f"Bearer {access_token}"} 975 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 976 977 create_data = { 978 "repo": user_did, 979 "collection": "stream.thought.tool.call", 980 "record": tool_record 981 } 982 983 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 984 if response.status_code != 200: 985 logger.error(f"Tool call record creation failed: {response.status_code} - {response.text}") 986 response.raise_for_status() 987 result = response.json() 988 989 logger.debug(f"Successfully recorded tool call: {tool_name}") 990 return result 991 992 except Exception as e: 993 logger.error(f"Error creating tool call record: {e}") 994 return None 995 996 997def create_reasoning_record(client: Client, reasoning_text: str) -> Optional[Dict[str, Any]]: 998 """ 999 Create a stream.thought.reasoning record to track agent reasoning. 1000 1001 This creates a record of void's reasoning during message processing, 1002 providing transparency into the decision-making process. 1003 1004 Args: 1005 client: Authenticated Bluesky client 1006 reasoning_text: The reasoning text from the agent 1007 1008 Returns: 1009 The response from creating the reasoning record or None if failed 1010 """ 1011 try: 1012 import requests 1013 import json 1014 from datetime import datetime, timezone 1015 1016 # Get session info from the client 1017 access_token = None 1018 user_did = None 1019 1020 # Try different ways to get the session info 1021 if hasattr(client, '_session') and client._session: 1022 access_token = client._session.access_jwt 1023 user_did = client._session.did 1024 elif hasattr(client, 'access_jwt'): 1025 access_token = client.access_jwt 1026 user_did = client.did if hasattr(client, 'did') else None 1027 else: 1028 logger.error("Cannot access client session information") 1029 return None 1030 1031 if not access_token or not user_did: 1032 logger.error("Missing access token or DID from session") 1033 return None 1034 1035 # Get PDS URI from config instead of environment variables 1036 from config_loader import get_bluesky_config 1037 bluesky_config = get_bluesky_config() 1038 pds_host = bluesky_config['pds_uri'] 1039 1040 # Create reasoning record 1041 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 1042 reasoning_record = { 1043 "$type": "stream.thought.reasoning", 1044 "reasoning": reasoning_text, 1045 "createdAt": now 1046 } 1047 1048 # Create the record 1049 headers = {"Authorization": f"Bearer {access_token}"} 1050 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 1051 1052 create_data = { 1053 "repo": user_did, 1054 "collection": "stream.thought.reasoning", 1055 "record": reasoning_record 1056 } 1057 1058 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 1059 response.raise_for_status() 1060 result = response.json() 1061 1062 logger.debug(f"Successfully recorded reasoning (length: {len(reasoning_text)} chars)") 1063 return result 1064 1065 except Exception as e: 1066 logger.error(f"Error creating reasoning record: {e}") 1067 return None 1068 1069 1070def create_memory_record(client: Client, content: str, tags: Optional[List[str]] = None) -> Optional[Dict[str, Any]]: 1071 """ 1072 Create a stream.thought.memory record to store archival memory insertions. 1073 1074 This creates a record of archival_memory_insert tool calls, preserving 1075 important memories and context in the AT Protocol. 1076 1077 Args: 1078 client: Authenticated Bluesky client 1079 content: The memory content being archived 1080 tags: Optional list of tags associated with this memory 1081 1082 Returns: 1083 The response from creating the memory record or None if failed 1084 """ 1085 try: 1086 import requests 1087 import json 1088 from datetime import datetime, timezone 1089 1090 # Get session info from the client 1091 access_token = None 1092 user_did = None 1093 1094 # Try different ways to get the session info 1095 if hasattr(client, '_session') and client._session: 1096 access_token = client._session.access_jwt 1097 user_did = client._session.did 1098 elif hasattr(client, 'access_jwt'): 1099 access_token = client.access_jwt 1100 user_did = client.did if hasattr(client, 'did') else None 1101 else: 1102 logger.error("Cannot access client session information") 1103 return None 1104 1105 if not access_token or not user_did: 1106 logger.error("Missing access token or DID from session") 1107 return None 1108 1109 # Get PDS URI from config instead of environment variables 1110 from config_loader import get_bluesky_config 1111 bluesky_config = get_bluesky_config() 1112 pds_host = bluesky_config['pds_uri'] 1113 1114 # Create memory record 1115 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 1116 memory_record = { 1117 "$type": "stream.thought.memory", 1118 "content": content, 1119 "createdAt": now 1120 } 1121 1122 # Add tags if provided (can be null) 1123 if tags is not None: 1124 memory_record["tags"] = tags 1125 1126 # Create the record 1127 headers = {"Authorization": f"Bearer {access_token}"} 1128 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 1129 1130 create_data = { 1131 "repo": user_did, 1132 "collection": "stream.thought.memory", 1133 "record": memory_record 1134 } 1135 1136 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 1137 response.raise_for_status() 1138 result = response.json() 1139 1140 tags_info = f" with {len(tags)} tags" if tags else " (no tags)" 1141 logger.debug(f"Successfully recorded memory (length: {len(content)} chars{tags_info})") 1142 return result 1143 1144 except Exception as e: 1145 logger.error(f"Error creating memory record: {e}") 1146 return None 1147 1148 1149def sync_followers(client: Client, dry_run: bool = False) -> Dict[str, Any]: 1150 """ 1151 Check who is following the bot and who the bot is following, 1152 then follow back users who aren't already followed. 1153 1154 This implements the autofollow feature by creating follow records 1155 (app.bsky.graph.follow) for users who follow the bot. 1156 1157 Args: 1158 client: Authenticated Bluesky client 1159 dry_run: If True, only report what would be done without actually following 1160 1161 Returns: 1162 Dict with stats: { 1163 'followers_count': int, 1164 'following_count': int, 1165 'to_follow': List[str], # List of handles to follow 1166 'newly_followed': List[str], # List of handles actually followed (empty if dry_run) 1167 'errors': List[str] # Any errors encountered 1168 } 1169 """ 1170 try: 1171 from datetime import datetime, timezone 1172 1173 # Get session info from the client 1174 access_token = None 1175 user_did = None 1176 1177 if hasattr(client, '_session') and client._session: 1178 access_token = client._session.access_jwt 1179 user_did = client._session.did 1180 elif hasattr(client, 'access_jwt'): 1181 access_token = client.access_jwt 1182 user_did = client.did if hasattr(client, 'did') else None 1183 else: 1184 logger.error("Cannot access client session information") 1185 return {'error': 'Cannot access client session'} 1186 1187 if not access_token or not user_did: 1188 logger.error("Missing access token or DID from session") 1189 return {'error': 'Missing access token or DID'} 1190 1191 # Get PDS URI from config 1192 from config_loader import get_bluesky_config 1193 bluesky_config = get_bluesky_config() 1194 pds_host = bluesky_config['pds_uri'] 1195 1196 # Get followers using the API 1197 followers_response = client.app.bsky.graph.get_followers({'actor': user_did}) 1198 followers = followers_response.followers if hasattr(followers_response, 'followers') else [] 1199 follower_dids = {f.did for f in followers} 1200 1201 # Get following using the API 1202 following_response = client.app.bsky.graph.get_follows({'actor': user_did}) 1203 following = following_response.follows if hasattr(following_response, 'follows') else [] 1204 following_dids = {f.did for f in following} 1205 1206 # Find users who follow us but we don't follow back 1207 to_follow_dids = follower_dids - following_dids 1208 1209 # Build result object 1210 result = { 1211 'followers_count': len(followers), 1212 'following_count': len(following), 1213 'to_follow': [], 1214 'newly_followed': [], 1215 'errors': [] 1216 } 1217 1218 # Get handles for users to follow 1219 to_follow_handles = [] 1220 for follower in followers: 1221 if follower.did in to_follow_dids: 1222 handle = follower.handle if hasattr(follower, 'handle') else follower.did 1223 to_follow_handles.append(handle) 1224 result['to_follow'].append(handle) 1225 1226 logger.info(f"Follower sync: {len(followers)} followers, {len(following)} following, {len(to_follow_dids)} to follow") 1227 1228 # If dry run, just return the stats 1229 if dry_run: 1230 logger.info(f"Dry run - would follow: {', '.join(to_follow_handles)}") 1231 return result 1232 1233 # Actually follow the users with rate limiting 1234 import requests 1235 headers = {"Authorization": f"Bearer {access_token}"} 1236 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 1237 1238 for i, did in enumerate(to_follow_dids): 1239 try: 1240 # Rate limiting: wait 2 seconds between follows to avoid spamming the server 1241 if i > 0: 1242 time.sleep(2) 1243 1244 # Create follow record 1245 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 1246 follow_record = { 1247 "$type": "app.bsky.graph.follow", 1248 "subject": did, 1249 "createdAt": now 1250 } 1251 1252 create_data = { 1253 "repo": user_did, 1254 "collection": "app.bsky.graph.follow", 1255 "record": follow_record 1256 } 1257 1258 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 1259 response.raise_for_status() 1260 1261 # Find handle for this DID 1262 handle = next((f.handle for f in followers if f.did == did), did) 1263 result['newly_followed'].append(handle) 1264 logger.info(f"Followed: {handle}") 1265 1266 except Exception as e: 1267 error_msg = f"Failed to follow {did}: {e}" 1268 logger.error(error_msg) 1269 result['errors'].append(error_msg) 1270 1271 return result 1272 1273 except Exception as e: 1274 logger.error(f"Error syncing followers: {e}") 1275 return {'error': str(e)} 1276 1277 1278if __name__ == "__main__": 1279 client = default_login() 1280 # do something with the client 1281 logger.info("Client is ready to use!")