a digital person for bluesky
1import os 2import logging 3import uuid 4import time 5from typing import Optional, Dict, Any, List 6from atproto_client import Client, Session, SessionEvent, models 7 8# Configure logging 9logging.basicConfig( 10 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 11) 12logger = logging.getLogger("bluesky_session_handler") 13 14# Load the environment variables 15import dotenv 16dotenv.load_dotenv(override=True) 17 18import yaml 19import json 20 21# Strip fields. A list of fields to remove from a JSON object 22STRIP_FIELDS = [ 23 "cid", 24 "rev", 25 "did", 26 "uri", 27 "langs", 28 "threadgate", 29 "py_type", 30 "labels", 31 "avatar", 32 "viewer", 33 "indexed_at", 34 "tags", 35 "associated", 36 "thread_context", 37 "aspect_ratio", 38 "thumb", 39 "fullsize", 40 "root", 41 "created_at", 42 "verification", 43 "like_count", 44 "quote_count", 45 "reply_count", 46 "repost_count", 47 "embedding_disabled", 48 "thread_muted", 49 "reply_disabled", 50 "pinned", 51 "like", 52 "repost", 53 "blocked_by", 54 "blocking", 55 "blocking_by_list", 56 "followed_by", 57 "following", 58 "known_followers", 59 "muted", 60 "muted_by_list", 61 "root_author_like", 62 "entities", 63 "ref", 64 "mime_type", 65 "size", 66] 67def convert_to_basic_types(obj): 68 """Convert complex Python objects to basic types for JSON/YAML serialization.""" 69 if hasattr(obj, '__dict__'): 70 # Convert objects with __dict__ to their dictionary representation 71 return convert_to_basic_types(obj.__dict__) 72 elif isinstance(obj, dict): 73 return {key: convert_to_basic_types(value) for key, value in obj.items()} 74 elif isinstance(obj, list): 75 return [convert_to_basic_types(item) for item in obj] 76 elif isinstance(obj, (str, int, float, bool)) or obj is None: 77 return obj 78 else: 79 # For other types, try to convert to string 80 return str(obj) 81 82 83def strip_fields(obj, strip_field_list): 84 """Recursively strip fields from a JSON object.""" 85 if isinstance(obj, dict): 86 keys_flagged_for_removal = [] 87 88 # Remove fields from strip list and pydantic metadata 89 for field in list(obj.keys()): 90 if field in strip_field_list or field.startswith("__"): 91 keys_flagged_for_removal.append(field) 92 93 # Remove flagged keys 94 for key in keys_flagged_for_removal: 95 obj.pop(key, None) 96 97 # Recursively process remaining values 98 for key, value in list(obj.items()): 99 obj[key] = strip_fields(value, strip_field_list) 100 # Remove empty/null values after processing 101 if ( 102 obj[key] is None 103 or (isinstance(obj[key], dict) and len(obj[key]) == 0) 104 or (isinstance(obj[key], list) and len(obj[key]) == 0) 105 or (isinstance(obj[key], str) and obj[key].strip() == "") 106 ): 107 obj.pop(key, None) 108 109 elif isinstance(obj, list): 110 for i, value in enumerate(obj): 111 obj[i] = strip_fields(value, strip_field_list) 112 # Remove None values from list 113 obj[:] = [item for item in obj if item is not None] 114 115 return obj 116 117 118def flatten_thread_structure(thread_data): 119 """ 120 Flatten a nested thread structure into a list while preserving all data. 121 122 Args: 123 thread_data: The thread data from get_post_thread 124 125 Returns: 126 Dict with 'posts' key containing a list of posts in chronological order 127 """ 128 posts = [] 129 130 def traverse_thread(node): 131 """Recursively traverse the thread structure to collect posts.""" 132 if not node: 133 return 134 135 # If this node has a parent, traverse it first (to maintain chronological order) 136 if hasattr(node, 'parent') and node.parent: 137 traverse_thread(node.parent) 138 139 # Then add this node's post 140 if hasattr(node, 'post') and node.post: 141 # Convert to dict if needed to ensure we can process it 142 if hasattr(node.post, '__dict__'): 143 post_dict = node.post.__dict__.copy() 144 elif isinstance(node.post, dict): 145 post_dict = node.post.copy() 146 else: 147 post_dict = {} 148 149 posts.append(post_dict) 150 151 # Handle the thread structure 152 if hasattr(thread_data, 'thread'): 153 # Start from the main thread node 154 traverse_thread(thread_data.thread) 155 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__: 156 traverse_thread(thread_data.__dict__['thread']) 157 158 # Return a simple structure with posts list 159 return {'posts': posts} 160 161 162def thread_to_yaml_string(thread, strip_metadata=True): 163 """ 164 Convert thread data to a YAML-formatted string for LLM parsing. 165 166 Args: 167 thread: The thread data from get_post_thread 168 strip_metadata: Whether to strip metadata fields for cleaner output 169 170 Returns: 171 YAML-formatted string representation of the thread 172 """ 173 # First flatten the thread structure to avoid deep nesting 174 flattened = flatten_thread_structure(thread) 175 176 # Convert complex objects to basic types 177 basic_thread = convert_to_basic_types(flattened) 178 179 if strip_metadata: 180 # Create a copy and strip unwanted fields 181 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS) 182 else: 183 cleaned_thread = basic_thread 184 185 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False) 186 187 188 189 190 191 192 193def get_session(username: str) -> Optional[str]: 194 try: 195 with open(f"session_{username}.txt", encoding="UTF-8") as f: 196 return f.read() 197 except FileNotFoundError: 198 logger.debug(f"No existing session found for {username}") 199 return None 200 201def save_session(username: str, session_string: str) -> None: 202 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f: 203 f.write(session_string) 204 logger.debug(f"Session saved for {username}") 205 206def on_session_change(username: str, event: SessionEvent, session: Session) -> None: 207 logger.debug(f"Session changed: {event} {repr(session)}") 208 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 209 logger.debug(f"Saving changed session for {username}") 210 save_session(username, session.export()) 211 212def init_client(username: str, password: str) -> Client: 213 pds_uri = os.getenv("PDS_URI") 214 if pds_uri is None: 215 logger.warning( 216 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable." 217 ) 218 pds_uri = "https://bsky.social" 219 220 # Print the PDS URI 221 logger.debug(f"Using PDS URI: {pds_uri}") 222 223 client = Client(pds_uri) 224 client.on_session_change( 225 lambda event, session: on_session_change(username, event, session) 226 ) 227 228 session_string = get_session(username) 229 if session_string: 230 logger.debug(f"Reusing existing session for {username}") 231 client.login(session_string=session_string) 232 else: 233 logger.debug(f"Creating new session for {username}") 234 client.login(username, password) 235 236 return client 237 238 239def default_login() -> Client: 240 """Login using configuration from config.yaml or environment variables.""" 241 try: 242 from config_loader import get_bluesky_config 243 bluesky_config = get_bluesky_config() 244 245 username = bluesky_config['username'] 246 password = bluesky_config['password'] 247 pds_uri = bluesky_config.get('pds_uri', 'https://bsky.social') 248 249 logger.info(f"Logging into Bluesky as {username} via {pds_uri}") 250 251 # Use pds_uri from config 252 client = Client(base_url=pds_uri) 253 client.login(username, password) 254 return client 255 256 except Exception as e: 257 logger.error(f"Failed to load Bluesky configuration: {e}") 258 logger.error("Please check your config.yaml file or environment variables") 259 exit(1) 260 261def remove_outside_quotes(text: str) -> str: 262 """ 263 Remove outside double quotes from response text. 264 265 Only handles double quotes to avoid interfering with contractions: 266 - Double quotes: "text" → text 267 - Preserves single quotes and internal quotes 268 269 Args: 270 text: The text to process 271 272 Returns: 273 Text with outside double quotes removed 274 """ 275 if not text or len(text) < 2: 276 return text 277 278 text = text.strip() 279 280 # Only remove double quotes from start and end 281 if text.startswith('"') and text.endswith('"'): 282 return text[1:-1] 283 284 return text 285 286def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None, correlation_id: Optional[str] = None) -> Dict[str, Any]: 287 """ 288 Reply to a post on Bluesky with rich text support. 289 290 Args: 291 client: Authenticated Bluesky client 292 text: The reply text 293 reply_to_uri: The URI of the post being replied to (parent) 294 reply_to_cid: The CID of the post being replied to (parent) 295 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri 296 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid 297 lang: Language code for the post (e.g., 'en-US', 'es', 'ja') 298 correlation_id: Unique ID for tracking this message through the pipeline 299 300 Returns: 301 The response from sending the post 302 """ 303 import re 304 305 # Generate correlation ID if not provided 306 if correlation_id is None: 307 correlation_id = str(uuid.uuid4())[:8] 308 309 # Enhanced logging with structured data 310 logger.info(f"[{correlation_id}] Starting reply_to_post", extra={ 311 'correlation_id': correlation_id, 312 'text_length': len(text), 313 'text_preview': text[:100] + '...' if len(text) > 100 else text, 314 'reply_to_uri': reply_to_uri, 315 'root_uri': root_uri, 316 'lang': lang 317 }) 318 319 start_time = time.time() 320 321 # If root is not provided, this is a reply to the root post 322 if root_uri is None: 323 root_uri = reply_to_uri 324 root_cid = reply_to_cid 325 326 # Create references for the reply 327 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid)) 328 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid)) 329 330 # Parse rich text facets (mentions and URLs) 331 facets = [] 332 text_bytes = text.encode("UTF-8") 333 mentions_found = [] 334 urls_found = [] 335 336 logger.debug(f"[{correlation_id}] Parsing facets from text (length: {len(text_bytes)} bytes)") 337 338 # Parse mentions - fixed to handle @ at start of text 339 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 340 341 for m in re.finditer(mention_regex, text_bytes): 342 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix 343 mentions_found.append(handle) 344 # Adjust byte positions to account for the optional prefix 345 mention_start = m.start(1) 346 mention_end = m.end(1) 347 try: 348 # Resolve handle to DID using the API 349 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle}) 350 if resolve_resp and hasattr(resolve_resp, 'did'): 351 facets.append( 352 models.AppBskyRichtextFacet.Main( 353 index=models.AppBskyRichtextFacet.ByteSlice( 354 byteStart=mention_start, 355 byteEnd=mention_end 356 ), 357 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)] 358 ) 359 ) 360 logger.debug(f"[{correlation_id}] Resolved mention @{handle} -> {resolve_resp.did}") 361 except Exception as e: 362 logger.warning(f"[{correlation_id}] Failed to resolve handle @{handle}: {e}") 363 continue 364 365 # Parse URLs - fixed to handle URLs at start of text 366 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 367 368 for m in re.finditer(url_regex, text_bytes): 369 url = m.group(1).decode("UTF-8") 370 urls_found.append(url) 371 # Adjust byte positions to account for the optional prefix 372 url_start = m.start(1) 373 url_end = m.end(1) 374 facets.append( 375 models.AppBskyRichtextFacet.Main( 376 index=models.AppBskyRichtextFacet.ByteSlice( 377 byteStart=url_start, 378 byteEnd=url_end 379 ), 380 features=[models.AppBskyRichtextFacet.Link(uri=url)] 381 ) 382 ) 383 logger.debug(f"[{correlation_id}] Found URL: {url}") 384 385 # Parse hashtags 386 hashtag_regex = rb"(?:^|[$|\s])#([a-zA-Z0-9_]+)" 387 hashtags_found = [] 388 389 for m in re.finditer(hashtag_regex, text_bytes): 390 tag = m.group(1).decode("UTF-8") # Get tag without # prefix 391 hashtags_found.append(tag) 392 # Get byte positions for the entire hashtag including # 393 tag_start = m.start(0) 394 # Adjust start if there's a space/prefix 395 if text_bytes[tag_start:tag_start+1] in (b' ', b'$'): 396 tag_start += 1 397 tag_end = m.end(0) 398 facets.append( 399 models.AppBskyRichtextFacet.Main( 400 index=models.AppBskyRichtextFacet.ByteSlice( 401 byteStart=tag_start, 402 byteEnd=tag_end 403 ), 404 features=[models.AppBskyRichtextFacet.Tag(tag=tag)] 405 ) 406 ) 407 logger.debug(f"[{correlation_id}] Found hashtag: #{tag}") 408 409 logger.debug(f"[{correlation_id}] Facet parsing complete", extra={ 410 'correlation_id': correlation_id, 411 'mentions_count': len(mentions_found), 412 'mentions': mentions_found, 413 'urls_count': len(urls_found), 414 'urls': urls_found, 415 'hashtags_count': len(hashtags_found), 416 'hashtags': hashtags_found, 417 'total_facets': len(facets) 418 }) 419 420 # Send the reply with facets if any were found 421 logger.info(f"[{correlation_id}] Sending reply to Bluesky API", extra={ 422 'correlation_id': correlation_id, 423 'has_facets': bool(facets), 424 'facet_count': len(facets), 425 'lang': lang 426 }) 427 428 try: 429 if facets: 430 response = client.send_post( 431 text=text, 432 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 433 facets=facets, 434 langs=[lang] if lang else None 435 ) 436 else: 437 response = client.send_post( 438 text=text, 439 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 440 langs=[lang] if lang else None 441 ) 442 443 # Calculate response time 444 response_time = time.time() - start_time 445 446 # Extract post URL for user-friendly logging 447 post_url = None 448 if hasattr(response, 'uri') and response.uri: 449 # Convert AT-URI to web URL 450 # Format: at://did:plc:xxx/app.bsky.feed.post/xxx -> https://bsky.app/profile/handle/post/xxx 451 try: 452 uri_parts = response.uri.split('/') 453 if len(uri_parts) >= 4 and uri_parts[3] == 'app.bsky.feed.post': 454 rkey = uri_parts[4] 455 # We'd need to resolve DID to handle, but for now just use the URI 456 post_url = f"bsky://post/{rkey}" 457 except: 458 pass 459 460 logger.info(f"[{correlation_id}] Reply sent successfully ({response_time:.3f}s) - URI: {response.uri}" + 461 (f" - URL: {post_url}" if post_url else ""), extra={ 462 'correlation_id': correlation_id, 463 'response_time': round(response_time, 3), 464 'post_uri': response.uri, 465 'post_url': post_url, 466 'post_cid': getattr(response, 'cid', None), 467 'text_length': len(text) 468 }) 469 470 return response 471 472 except Exception as e: 473 response_time = time.time() - start_time 474 logger.error(f"[{correlation_id}] Failed to send reply", extra={ 475 'correlation_id': correlation_id, 476 'error': str(e), 477 'error_type': type(e).__name__, 478 'response_time': round(response_time, 3), 479 'text_length': len(text) 480 }) 481 raise 482 483 484def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]: 485 """ 486 Get the thread containing a post to find root post information. 487 488 Args: 489 client: Authenticated Bluesky client 490 uri: The URI of the post 491 492 Returns: 493 The thread data or None if not found 494 """ 495 try: 496 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10}) 497 return thread 498 except Exception as e: 499 logger.error(f"Error fetching post thread: {e}") 500 return None 501 502 503def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[Dict[str, Any]]: 504 """ 505 Reply to a notification (mention or reply). 506 507 Args: 508 client: Authenticated Bluesky client 509 notification: The notification object from list_notifications 510 reply_text: The text to reply with 511 lang: Language code for the post (defaults to "en-US") 512 correlation_id: Unique ID for tracking this message through the pipeline 513 514 Returns: 515 The response from sending the reply or None if failed 516 """ 517 # Generate correlation ID if not provided 518 if correlation_id is None: 519 correlation_id = str(uuid.uuid4())[:8] 520 521 logger.info(f"[{correlation_id}] Processing reply_to_notification", extra={ 522 'correlation_id': correlation_id, 523 'reply_length': len(reply_text), 524 'lang': lang 525 }) 526 527 try: 528 # Get the post URI and CID from the notification (handle both dict and object) 529 if isinstance(notification, dict): 530 post_uri = notification.get('uri') 531 post_cid = notification.get('cid') 532 # Check if the notification record has reply info with root 533 record = notification.get('record', {}) 534 reply_info = record.get('reply') if isinstance(record, dict) else None 535 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 536 post_uri = notification.uri 537 post_cid = notification.cid 538 # Check if the notification record has reply info with root 539 reply_info = None 540 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 541 reply_info = notification.record.reply 542 else: 543 post_uri = None 544 post_cid = None 545 reply_info = None 546 547 if not post_uri or not post_cid: 548 logger.error("Notification doesn't have required uri/cid fields") 549 return None 550 551 # Determine root: if post has reply info, use its root; otherwise this post IS the root 552 if reply_info: 553 # Extract root from the notification's reply structure 554 if isinstance(reply_info, dict): 555 root_ref = reply_info.get('root') 556 if root_ref and isinstance(root_ref, dict): 557 root_uri = root_ref.get('uri', post_uri) 558 root_cid = root_ref.get('cid', post_cid) 559 else: 560 # No root in reply info, use post as root 561 root_uri = post_uri 562 root_cid = post_cid 563 elif hasattr(reply_info, 'root'): 564 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 565 root_uri = reply_info.root.uri 566 root_cid = reply_info.root.cid 567 else: 568 root_uri = post_uri 569 root_cid = post_cid 570 else: 571 root_uri = post_uri 572 root_cid = post_cid 573 else: 574 # No reply info means this post IS the root 575 root_uri = post_uri 576 root_cid = post_cid 577 578 # Reply to the notification 579 return reply_to_post( 580 client=client, 581 text=reply_text, 582 reply_to_uri=post_uri, 583 reply_to_cid=post_cid, 584 root_uri=root_uri, 585 root_cid=root_cid, 586 lang=lang, 587 correlation_id=correlation_id 588 ) 589 590 except Exception as e: 591 logger.error(f"[{correlation_id}] Error replying to notification: {e}", extra={ 592 'correlation_id': correlation_id, 593 'error': str(e), 594 'error_type': type(e).__name__ 595 }) 596 return None 597 598 599def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[List[Dict[str, Any]]]: 600 """ 601 Reply to a notification with a threaded chain of messages (max 15). 602 603 Args: 604 client: Authenticated Bluesky client 605 notification: The notification object from list_notifications 606 reply_messages: List of reply texts (max 15 messages, each max 300 chars) 607 lang: Language code for the posts (defaults to "en-US") 608 correlation_id: Unique ID for tracking this message through the pipeline 609 610 Returns: 611 List of responses from sending the replies or None if failed 612 """ 613 # Generate correlation ID if not provided 614 if correlation_id is None: 615 correlation_id = str(uuid.uuid4())[:8] 616 617 logger.info(f"[{correlation_id}] Starting threaded reply", extra={ 618 'correlation_id': correlation_id, 619 'message_count': len(reply_messages), 620 'total_length': sum(len(msg) for msg in reply_messages), 621 'lang': lang 622 }) 623 624 try: 625 # Validate input 626 if not reply_messages or len(reply_messages) == 0: 627 logger.error(f"[{correlation_id}] Reply messages list cannot be empty") 628 return None 629 if len(reply_messages) > 15: 630 logger.error(f"[{correlation_id}] Cannot send more than 15 reply messages (got {len(reply_messages)})") 631 return None 632 633 # Get the post URI and CID from the notification (handle both dict and object) 634 if isinstance(notification, dict): 635 post_uri = notification.get('uri') 636 post_cid = notification.get('cid') 637 # Check if the notification record has reply info with root 638 record = notification.get('record', {}) 639 reply_info = record.get('reply') if isinstance(record, dict) else None 640 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 641 post_uri = notification.uri 642 post_cid = notification.cid 643 # Check if the notification record has reply info with root 644 reply_info = None 645 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 646 reply_info = notification.record.reply 647 else: 648 post_uri = None 649 post_cid = None 650 reply_info = None 651 652 if not post_uri or not post_cid: 653 logger.error("Notification doesn't have required uri/cid fields") 654 return None 655 656 # Determine root: if post has reply info, use its root; otherwise this post IS the root 657 if reply_info: 658 # Extract root from the notification's reply structure 659 if isinstance(reply_info, dict): 660 root_ref = reply_info.get('root') 661 if root_ref and isinstance(root_ref, dict): 662 root_uri = root_ref.get('uri', post_uri) 663 root_cid = root_ref.get('cid', post_cid) 664 else: 665 # No root in reply info, use post as root 666 root_uri = post_uri 667 root_cid = post_cid 668 elif hasattr(reply_info, 'root'): 669 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 670 root_uri = reply_info.root.uri 671 root_cid = reply_info.root.cid 672 else: 673 root_uri = post_uri 674 root_cid = post_cid 675 else: 676 root_uri = post_uri 677 root_cid = post_cid 678 else: 679 # No reply info means this post IS the root 680 root_uri = post_uri 681 root_cid = post_cid 682 683 # Send replies in sequence, creating a thread 684 responses = [] 685 current_parent_uri = post_uri 686 current_parent_cid = post_cid 687 688 for i, message in enumerate(reply_messages): 689 thread_correlation_id = f"{correlation_id}-{i+1}" 690 logger.info(f"[{thread_correlation_id}] Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...") 691 692 # Send this reply 693 response = reply_to_post( 694 client=client, 695 text=message, 696 reply_to_uri=current_parent_uri, 697 reply_to_cid=current_parent_cid, 698 root_uri=root_uri, 699 root_cid=root_cid, 700 lang=lang, 701 correlation_id=thread_correlation_id 702 ) 703 704 if not response: 705 logger.error(f"[{thread_correlation_id}] Failed to send reply {i+1}, posting system failure message") 706 # Try to post a system failure message 707 failure_response = reply_to_post( 708 client=client, 709 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]", 710 reply_to_uri=current_parent_uri, 711 reply_to_cid=current_parent_cid, 712 root_uri=root_uri, 713 root_cid=root_cid, 714 lang=lang, 715 correlation_id=f"{thread_correlation_id}-FAIL" 716 ) 717 if failure_response: 718 responses.append(failure_response) 719 current_parent_uri = failure_response.uri 720 current_parent_cid = failure_response.cid 721 else: 722 logger.error(f"[{thread_correlation_id}] Could not even send system failure message, stopping thread") 723 return responses if responses else None 724 else: 725 responses.append(response) 726 # Update parent references for next reply (if any) 727 if i < len(reply_messages) - 1: # Not the last message 728 current_parent_uri = response.uri 729 current_parent_cid = response.cid 730 731 logger.info(f"[{correlation_id}] Successfully sent {len(responses)} threaded replies", extra={ 732 'correlation_id': correlation_id, 733 'replies_sent': len(responses), 734 'replies_requested': len(reply_messages) 735 }) 736 return responses 737 738 except Exception as e: 739 logger.error(f"[{correlation_id}] Error sending threaded reply to notification: {e}", extra={ 740 'correlation_id': correlation_id, 741 'error': str(e), 742 'error_type': type(e).__name__, 743 'message_count': len(reply_messages) 744 }) 745 return None 746 747 748def create_synthesis_ack(client: Client, note: str) -> Optional[Dict[str, Any]]: 749 """ 750 Create a stream.thought.ack record for synthesis without a target post. 751 752 This creates a synthesis acknowledgment with null subject field. 753 754 Args: 755 client: Authenticated Bluesky client 756 note: The synthesis note/content 757 758 Returns: 759 The response from creating the acknowledgment record or None if failed 760 """ 761 try: 762 import requests 763 import json 764 from datetime import datetime, timezone 765 766 # Get session info from the client 767 access_token = None 768 user_did = None 769 770 # Try different ways to get the session info 771 if hasattr(client, '_session') and client._session: 772 access_token = client._session.access_jwt 773 user_did = client._session.did 774 elif hasattr(client, 'access_jwt'): 775 access_token = client.access_jwt 776 user_did = client.did if hasattr(client, 'did') else None 777 else: 778 logger.error("Cannot access client session information") 779 return None 780 781 if not access_token or not user_did: 782 logger.error("Missing access token or DID from session") 783 return None 784 785 # Get PDS URI from config instead of environment variables 786 from config_loader import get_bluesky_config 787 bluesky_config = get_bluesky_config() 788 pds_host = bluesky_config['pds_uri'] 789 790 # Create acknowledgment record with null subject 791 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 792 ack_record = { 793 "$type": "stream.thought.ack", 794 "subject": None, # Null subject for synthesis 795 "createdAt": now, 796 "note": note 797 } 798 799 # Create the record 800 headers = {"Authorization": f"Bearer {access_token}"} 801 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 802 803 create_data = { 804 "repo": user_did, 805 "collection": "stream.thought.ack", 806 "record": ack_record 807 } 808 809 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 810 response.raise_for_status() 811 result = response.json() 812 813 logger.info(f"Successfully created synthesis acknowledgment") 814 return result 815 816 except Exception as e: 817 logger.error(f"Error creating synthesis acknowledgment: {e}") 818 return None 819 820 821def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]: 822 """ 823 Create a stream.thought.ack record to acknowledge a post. 824 825 This creates a custom acknowledgment record instead of a standard Bluesky like, 826 allowing void to track which posts it has engaged with. 827 828 Args: 829 client: Authenticated Bluesky client 830 post_uri: The URI of the post to acknowledge 831 post_cid: The CID of the post to acknowledge 832 note: Optional note to attach to the acknowledgment 833 834 Returns: 835 The response from creating the acknowledgment record or None if failed 836 """ 837 try: 838 import requests 839 import json 840 from datetime import datetime, timezone 841 842 # Get session info from the client 843 # The atproto Client stores the session differently 844 access_token = None 845 user_did = None 846 847 # Try different ways to get the session info 848 if hasattr(client, '_session') and client._session: 849 access_token = client._session.access_jwt 850 user_did = client._session.did 851 elif hasattr(client, 'access_jwt'): 852 access_token = client.access_jwt 853 user_did = client.did if hasattr(client, 'did') else None 854 else: 855 logger.error("Cannot access client session information") 856 return None 857 858 if not access_token or not user_did: 859 logger.error("Missing access token or DID from session") 860 return None 861 862 # Get PDS URI from config instead of environment variables 863 from config_loader import get_bluesky_config 864 bluesky_config = get_bluesky_config() 865 pds_host = bluesky_config['pds_uri'] 866 867 # Create acknowledgment record with stream.thought.ack type 868 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 869 ack_record = { 870 "$type": "stream.thought.ack", 871 "subject": { 872 "uri": post_uri, 873 "cid": post_cid 874 }, 875 "createdAt": now, 876 "note": note # Will be null if no note provided 877 } 878 879 # Create the record 880 headers = {"Authorization": f"Bearer {access_token}"} 881 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 882 883 create_data = { 884 "repo": user_did, 885 "collection": "stream.thought.ack", 886 "record": ack_record 887 } 888 889 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 890 response.raise_for_status() 891 result = response.json() 892 893 logger.info(f"Successfully acknowledged post: {post_uri}") 894 return result 895 896 except Exception as e: 897 logger.error(f"Error acknowledging post: {e}") 898 return None 899 900 901def create_tool_call_record(client: Client, tool_name: str, arguments: str, tool_call_id: Optional[str] = None) -> Optional[Dict[str, Any]]: 902 """ 903 Create a stream.thought.tool_call record to track tool usage. 904 905 This creates a record of tool calls made by void during processing, 906 allowing for analysis of tool usage patterns and debugging. 907 908 Args: 909 client: Authenticated Bluesky client 910 tool_name: Name of the tool being called 911 arguments: Raw JSON string of the tool arguments 912 tool_call_id: Optional ID of the tool call for correlation 913 914 Returns: 915 The response from creating the tool call record or None if failed 916 """ 917 try: 918 import requests 919 import json 920 from datetime import datetime, timezone 921 922 # Get session info from the client 923 access_token = None 924 user_did = None 925 926 # Try different ways to get the session info 927 if hasattr(client, '_session') and client._session: 928 access_token = client._session.access_jwt 929 user_did = client._session.did 930 elif hasattr(client, 'access_jwt'): 931 access_token = client.access_jwt 932 user_did = client.did if hasattr(client, 'did') else None 933 else: 934 logger.error("Cannot access client session information") 935 return None 936 937 if not access_token or not user_did: 938 logger.error("Missing access token or DID from session") 939 return None 940 941 # Get PDS URI from config instead of environment variables 942 from config_loader import get_bluesky_config 943 bluesky_config = get_bluesky_config() 944 pds_host = bluesky_config['pds_uri'] 945 946 # Create tool call record 947 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 948 tool_record = { 949 "$type": "stream.thought.tool.call", 950 "tool_name": tool_name, 951 "arguments": arguments, # Store as string to avoid parsing issues 952 "createdAt": now 953 } 954 955 # Add tool_call_id if provided 956 if tool_call_id: 957 tool_record["tool_call_id"] = tool_call_id 958 959 # Create the record 960 headers = {"Authorization": f"Bearer {access_token}"} 961 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 962 963 create_data = { 964 "repo": user_did, 965 "collection": "stream.thought.tool.call", 966 "record": tool_record 967 } 968 969 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 970 if response.status_code != 200: 971 logger.error(f"Tool call record creation failed: {response.status_code} - {response.text}") 972 response.raise_for_status() 973 result = response.json() 974 975 logger.debug(f"Successfully recorded tool call: {tool_name}") 976 return result 977 978 except Exception as e: 979 logger.error(f"Error creating tool call record: {e}") 980 return None 981 982 983def create_reasoning_record(client: Client, reasoning_text: str) -> Optional[Dict[str, Any]]: 984 """ 985 Create a stream.thought.reasoning record to track agent reasoning. 986 987 This creates a record of void's reasoning during message processing, 988 providing transparency into the decision-making process. 989 990 Args: 991 client: Authenticated Bluesky client 992 reasoning_text: The reasoning text from the agent 993 994 Returns: 995 The response from creating the reasoning record or None if failed 996 """ 997 try: 998 import requests 999 import json 1000 from datetime import datetime, timezone 1001 1002 # Get session info from the client 1003 access_token = None 1004 user_did = None 1005 1006 # Try different ways to get the session info 1007 if hasattr(client, '_session') and client._session: 1008 access_token = client._session.access_jwt 1009 user_did = client._session.did 1010 elif hasattr(client, 'access_jwt'): 1011 access_token = client.access_jwt 1012 user_did = client.did if hasattr(client, 'did') else None 1013 else: 1014 logger.error("Cannot access client session information") 1015 return None 1016 1017 if not access_token or not user_did: 1018 logger.error("Missing access token or DID from session") 1019 return None 1020 1021 # Get PDS URI from config instead of environment variables 1022 from config_loader import get_bluesky_config 1023 bluesky_config = get_bluesky_config() 1024 pds_host = bluesky_config['pds_uri'] 1025 1026 # Create reasoning record 1027 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 1028 reasoning_record = { 1029 "$type": "stream.thought.reasoning", 1030 "reasoning": reasoning_text, 1031 "createdAt": now 1032 } 1033 1034 # Create the record 1035 headers = {"Authorization": f"Bearer {access_token}"} 1036 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 1037 1038 create_data = { 1039 "repo": user_did, 1040 "collection": "stream.thought.reasoning", 1041 "record": reasoning_record 1042 } 1043 1044 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 1045 response.raise_for_status() 1046 result = response.json() 1047 1048 logger.debug(f"Successfully recorded reasoning (length: {len(reasoning_text)} chars)") 1049 return result 1050 1051 except Exception as e: 1052 logger.error(f"Error creating reasoning record: {e}") 1053 return None 1054 1055 1056def create_memory_record(client: Client, content: str, tags: Optional[List[str]] = None) -> Optional[Dict[str, Any]]: 1057 """ 1058 Create a stream.thought.memory record to store archival memory insertions. 1059 1060 This creates a record of archival_memory_insert tool calls, preserving 1061 important memories and context in the AT Protocol. 1062 1063 Args: 1064 client: Authenticated Bluesky client 1065 content: The memory content being archived 1066 tags: Optional list of tags associated with this memory 1067 1068 Returns: 1069 The response from creating the memory record or None if failed 1070 """ 1071 try: 1072 import requests 1073 import json 1074 from datetime import datetime, timezone 1075 1076 # Get session info from the client 1077 access_token = None 1078 user_did = None 1079 1080 # Try different ways to get the session info 1081 if hasattr(client, '_session') and client._session: 1082 access_token = client._session.access_jwt 1083 user_did = client._session.did 1084 elif hasattr(client, 'access_jwt'): 1085 access_token = client.access_jwt 1086 user_did = client.did if hasattr(client, 'did') else None 1087 else: 1088 logger.error("Cannot access client session information") 1089 return None 1090 1091 if not access_token or not user_did: 1092 logger.error("Missing access token or DID from session") 1093 return None 1094 1095 # Get PDS URI from config instead of environment variables 1096 from config_loader import get_bluesky_config 1097 bluesky_config = get_bluesky_config() 1098 pds_host = bluesky_config['pds_uri'] 1099 1100 # Create memory record 1101 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 1102 memory_record = { 1103 "$type": "stream.thought.memory", 1104 "content": content, 1105 "createdAt": now 1106 } 1107 1108 # Add tags if provided (can be null) 1109 if tags is not None: 1110 memory_record["tags"] = tags 1111 1112 # Create the record 1113 headers = {"Authorization": f"Bearer {access_token}"} 1114 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 1115 1116 create_data = { 1117 "repo": user_did, 1118 "collection": "stream.thought.memory", 1119 "record": memory_record 1120 } 1121 1122 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 1123 response.raise_for_status() 1124 result = response.json() 1125 1126 tags_info = f" with {len(tags)} tags" if tags else " (no tags)" 1127 logger.debug(f"Successfully recorded memory (length: {len(content)} chars{tags_info})") 1128 return result 1129 1130 except Exception as e: 1131 logger.error(f"Error creating memory record: {e}") 1132 return None 1133 1134 1135def sync_followers(client: Client, dry_run: bool = False) -> Dict[str, Any]: 1136 """ 1137 Check who is following the bot and who the bot is following, 1138 then follow back users who aren't already followed. 1139 1140 This implements the autofollow feature by creating follow records 1141 (app.bsky.graph.follow) for users who follow the bot. 1142 1143 Args: 1144 client: Authenticated Bluesky client 1145 dry_run: If True, only report what would be done without actually following 1146 1147 Returns: 1148 Dict with stats: { 1149 'followers_count': int, 1150 'following_count': int, 1151 'to_follow': List[str], # List of handles to follow 1152 'newly_followed': List[str], # List of handles actually followed (empty if dry_run) 1153 'errors': List[str] # Any errors encountered 1154 } 1155 """ 1156 try: 1157 from datetime import datetime, timezone 1158 1159 # Get session info from the client 1160 access_token = None 1161 user_did = None 1162 1163 if hasattr(client, '_session') and client._session: 1164 access_token = client._session.access_jwt 1165 user_did = client._session.did 1166 elif hasattr(client, 'access_jwt'): 1167 access_token = client.access_jwt 1168 user_did = client.did if hasattr(client, 'did') else None 1169 else: 1170 logger.error("Cannot access client session information") 1171 return {'error': 'Cannot access client session'} 1172 1173 if not access_token or not user_did: 1174 logger.error("Missing access token or DID from session") 1175 return {'error': 'Missing access token or DID'} 1176 1177 # Get PDS URI from config 1178 from config_loader import get_bluesky_config 1179 bluesky_config = get_bluesky_config() 1180 pds_host = bluesky_config['pds_uri'] 1181 1182 # Get followers using the API 1183 followers_response = client.app.bsky.graph.get_followers({'actor': user_did}) 1184 followers = followers_response.followers if hasattr(followers_response, 'followers') else [] 1185 follower_dids = {f.did for f in followers} 1186 1187 # Get following using the API 1188 following_response = client.app.bsky.graph.get_follows({'actor': user_did}) 1189 following = following_response.follows if hasattr(following_response, 'follows') else [] 1190 following_dids = {f.did for f in following} 1191 1192 # Find users who follow us but we don't follow back 1193 to_follow_dids = follower_dids - following_dids 1194 1195 # Build result object 1196 result = { 1197 'followers_count': len(followers), 1198 'following_count': len(following), 1199 'to_follow': [], 1200 'newly_followed': [], 1201 'errors': [] 1202 } 1203 1204 # Get handles for users to follow 1205 to_follow_handles = [] 1206 for follower in followers: 1207 if follower.did in to_follow_dids: 1208 handle = follower.handle if hasattr(follower, 'handle') else follower.did 1209 to_follow_handles.append(handle) 1210 result['to_follow'].append(handle) 1211 1212 logger.info(f"Follower sync: {len(followers)} followers, {len(following)} following, {len(to_follow_dids)} to follow") 1213 1214 # If dry run, just return the stats 1215 if dry_run: 1216 logger.info(f"Dry run - would follow: {', '.join(to_follow_handles)}") 1217 return result 1218 1219 # Actually follow the users with rate limiting 1220 import requests 1221 headers = {"Authorization": f"Bearer {access_token}"} 1222 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 1223 1224 for i, did in enumerate(to_follow_dids): 1225 try: 1226 # Rate limiting: wait 2 seconds between follows to avoid spamming the server 1227 if i > 0: 1228 time.sleep(2) 1229 1230 # Create follow record 1231 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 1232 follow_record = { 1233 "$type": "app.bsky.graph.follow", 1234 "subject": did, 1235 "createdAt": now 1236 } 1237 1238 create_data = { 1239 "repo": user_did, 1240 "collection": "app.bsky.graph.follow", 1241 "record": follow_record 1242 } 1243 1244 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 1245 response.raise_for_status() 1246 1247 # Find handle for this DID 1248 handle = next((f.handle for f in followers if f.did == did), did) 1249 result['newly_followed'].append(handle) 1250 logger.info(f"Followed: {handle}") 1251 1252 except Exception as e: 1253 error_msg = f"Failed to follow {did}: {e}" 1254 logger.error(error_msg) 1255 result['errors'].append(error_msg) 1256 1257 return result 1258 1259 except Exception as e: 1260 logger.error(f"Error syncing followers: {e}") 1261 return {'error': str(e)} 1262 1263 1264if __name__ == "__main__": 1265 client = default_login() 1266 # do something with the client 1267 logger.info("Client is ready to use!")