a digital person for bluesky
at main 77 kB view raw
1import os 2import logging 3import uuid 4import time 5from typing import Optional, Dict, Any, List 6from atproto_client import Client, Session, SessionEvent, models 7 8# Configure logging 9logging.basicConfig( 10 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 11) 12logger = logging.getLogger("bluesky_session_handler") 13 14# Load the environment variables 15import dotenv 16dotenv.load_dotenv(override=True) 17 18import yaml 19import json 20 21# Strip fields. A list of fields to remove from a JSON object 22STRIP_FIELDS = [ 23 "cid", 24 "rev", 25 "did", 26 "uri", 27 "langs", 28 "threadgate", 29 "py_type", 30 "labels", 31 "avatar", 32 "viewer", 33 "indexed_at", 34 "tags", 35 "associated", 36 "thread_context", 37 "aspect_ratio", 38 "thumb", 39 "fullsize", 40 "root", 41 "created_at", 42 "verification", 43 "like_count", 44 "quote_count", 45 "reply_count", 46 "repost_count", 47 "embedding_disabled", 48 "thread_muted", 49 "reply_disabled", 50 "pinned", 51 "like", 52 "repost", 53 "blocked_by", 54 "blocking", 55 "blocking_by_list", 56 "followed_by", 57 "following", 58 "known_followers", 59 "muted", 60 "muted_by_list", 61 "root_author_like", 62 "entities", 63 "ref", 64 "mime_type", 65 "size", 66] 67def convert_to_basic_types(obj): 68 """Convert complex Python objects to basic types for JSON/YAML serialization.""" 69 if hasattr(obj, '__dict__'): 70 # Convert objects with __dict__ to their dictionary representation 71 return convert_to_basic_types(obj.__dict__) 72 elif isinstance(obj, dict): 73 return {key: convert_to_basic_types(value) for key, value in obj.items()} 74 elif isinstance(obj, list): 75 return [convert_to_basic_types(item) for item in obj] 76 elif isinstance(obj, (str, int, float, bool)) or obj is None: 77 return obj 78 else: 79 # For other types, try to convert to string 80 return str(obj) 81 82 83def strip_fields(obj, strip_field_list): 84 """Recursively strip fields from a JSON object.""" 85 if isinstance(obj, dict): 86 keys_flagged_for_removal = [] 87 88 # Remove fields from strip list and pydantic metadata 89 for field in list(obj.keys()): 90 if field in strip_field_list or field.startswith("__"): 91 keys_flagged_for_removal.append(field) 92 93 # Remove flagged keys 94 for key in keys_flagged_for_removal: 95 obj.pop(key, None) 96 97 # Recursively process remaining values 98 for key, value in list(obj.items()): 99 obj[key] = strip_fields(value, strip_field_list) 100 # Remove empty/null values after processing 101 if ( 102 obj[key] is None 103 or (isinstance(obj[key], dict) and len(obj[key]) == 0) 104 or (isinstance(obj[key], list) and len(obj[key]) == 0) 105 or (isinstance(obj[key], str) and obj[key].strip() == "") 106 ): 107 obj.pop(key, None) 108 109 elif isinstance(obj, list): 110 for i, value in enumerate(obj): 111 obj[i] = strip_fields(value, strip_field_list) 112 # Remove None values from list 113 obj[:] = [item for item in obj if item is not None] 114 115 return obj 116 117 118def extract_links_from_facets(record_text: str, facets: list) -> list: 119 """ 120 Extract link URLs from facets with their associated text. 121 122 Args: 123 record_text: The post text (needed to extract link text using byte offsets) 124 facets: List of facet objects from post record 125 126 Returns: 127 List of dicts with 'url' and 'text' keys 128 """ 129 links = [] 130 text_bytes = record_text.encode('utf-8') 131 132 for facet in facets: 133 for feature in facet.features: 134 if hasattr(feature, 'uri'): # Link facet 135 byte_start = facet.index.byte_start 136 byte_end = facet.index.byte_end 137 try: 138 link_text = text_bytes[byte_start:byte_end].decode('utf-8') 139 except (UnicodeDecodeError, IndexError): 140 link_text = feature.uri # Fallback to URL itself 141 links.append({ 142 'url': feature.uri, 143 'text': link_text 144 }) 145 return links 146 147 148def extract_images_from_embed(embed, include_thumbnails: bool = True) -> list[dict]: 149 """Extract image URLs and alt text from a post embed (View type). 150 151 This function handles the View types returned by get_post_thread(), 152 which contain CDN URLs for images (unlike raw record embeds which 153 only have BlobRefs). 154 155 Also extracts thumbnails from external links and videos when include_thumbnails=True. 156 157 Args: 158 embed: The embed object from post.embed (View type) 159 include_thumbnails: Whether to include thumbnails from links/videos (default True) 160 161 Returns: 162 List of dicts with 'fullsize', 'thumb', 'alt', and optional 'source' keys 163 """ 164 images = [] 165 if not embed: 166 return images 167 168 embed_type = getattr(embed, 'py_type', '') 169 170 # Direct image embed (app.bsky.embed.images#view) 171 if 'images' in embed_type and 'record' not in embed_type: 172 for img in embed.images: 173 images.append({ 174 'fullsize': getattr(img, 'fullsize', None), 175 'thumb': getattr(img, 'thumb', None), 176 'alt': getattr(img, 'alt', '') or '' 177 }) 178 179 # External link with thumbnail (app.bsky.embed.external#view) 180 elif 'external' in embed_type and 'record' not in embed_type and include_thumbnails: 181 if hasattr(embed, 'external') and embed.external: 182 thumb = getattr(embed.external, 'thumb', None) 183 if thumb: 184 title = getattr(embed.external, 'title', '') or '' 185 images.append({ 186 'fullsize': thumb, # External links only have thumb, use as fullsize too 187 'thumb': thumb, 188 'alt': f"Link preview: {title}" if title else 'Link preview image', 189 'source': 'external_link' 190 }) 191 192 # Video with thumbnail (app.bsky.embed.video#view) 193 elif 'video' in embed_type and 'record' not in embed_type and include_thumbnails: 194 thumb = getattr(embed, 'thumbnail', None) 195 if thumb: 196 alt = getattr(embed, 'alt', '') or 'Video thumbnail' 197 images.append({ 198 'fullsize': thumb, 199 'thumb': thumb, 200 'alt': alt, 201 'source': 'video' 202 }) 203 204 # Quote post with media (app.bsky.embed.recordWithMedia#view) 205 elif 'recordWithMedia' in embed_type and hasattr(embed, 'media'): 206 media_type = getattr(embed.media, 'py_type', '') 207 # Images in media 208 if 'images' in media_type and hasattr(embed.media, 'images'): 209 for img in embed.media.images: 210 images.append({ 211 'fullsize': getattr(img, 'fullsize', None), 212 'thumb': getattr(img, 'thumb', None), 213 'alt': getattr(img, 'alt', '') or '' 214 }) 215 # External link thumbnail in media 216 elif 'external' in media_type and include_thumbnails: 217 if hasattr(embed.media, 'external') and embed.media.external: 218 thumb = getattr(embed.media.external, 'thumb', None) 219 if thumb: 220 title = getattr(embed.media.external, 'title', '') or '' 221 images.append({ 222 'fullsize': thumb, 223 'thumb': thumb, 224 'alt': f"Link preview: {title}" if title else 'Link preview image', 225 'source': 'external_link' 226 }) 227 # Video thumbnail in media 228 elif 'video' in media_type and include_thumbnails: 229 thumb = getattr(embed.media, 'thumbnail', None) 230 if thumb: 231 alt = getattr(embed.media, 'alt', '') or 'Video thumbnail' 232 images.append({ 233 'fullsize': thumb, 234 'thumb': thumb, 235 'alt': alt, 236 'source': 'video' 237 }) 238 239 # Quote post - check for images in nested embeds (app.bsky.embed.record#view) 240 elif 'record' in embed_type and 'recordWithMedia' not in embed_type: 241 if hasattr(embed, 'record') and embed.record: 242 record = embed.record 243 if hasattr(record, 'embeds') and record.embeds: 244 for nested in record.embeds: 245 nested_type = getattr(nested, 'py_type', '') 246 # Nested images 247 if 'images' in nested_type and hasattr(nested, 'images'): 248 for img in nested.images: 249 images.append({ 250 'fullsize': getattr(img, 'fullsize', None), 251 'thumb': getattr(img, 'thumb', None), 252 'alt': getattr(img, 'alt', '') or '', 253 'source': 'quoted_post' 254 }) 255 # Nested external link thumbnail 256 elif 'external' in nested_type and include_thumbnails: 257 if hasattr(nested, 'external') and nested.external: 258 thumb = getattr(nested.external, 'thumb', None) 259 if thumb: 260 title = getattr(nested.external, 'title', '') or '' 261 images.append({ 262 'fullsize': thumb, 263 'thumb': thumb, 264 'alt': f"Link preview: {title}" if title else 'Link preview image', 265 'source': 'quoted_post_link' 266 }) 267 # Nested video thumbnail 268 elif 'video' in nested_type and include_thumbnails: 269 thumb = getattr(nested, 'thumbnail', None) 270 if thumb: 271 alt = getattr(nested, 'alt', '') or 'Video thumbnail' 272 images.append({ 273 'fullsize': thumb, 274 'thumb': thumb, 275 'alt': alt, 276 'source': 'quoted_post_video' 277 }) 278 279 return images 280 281 282def extract_images_from_thread(thread_data, max_images: int = 8) -> list[dict]: 283 """Extract all images from a thread, up to max_images. 284 285 Traverses the thread structure and extracts image URLs from post embeds. 286 Images are collected in chronological order (parents before children). 287 288 Args: 289 thread_data: The thread data from get_post_thread 290 max_images: Maximum number of images to extract (default 8) 291 292 Returns: 293 List of image dicts with 'fullsize', 'thumb', 'alt', 'author_handle' keys 294 """ 295 images = [] 296 297 def traverse_thread(node): 298 if not node or len(images) >= max_images: 299 return 300 301 # Traverse parent first (chronological order) 302 if hasattr(node, 'parent') and node.parent: 303 traverse_thread(node.parent) 304 305 # Extract images from this post's embed (View type, not record.embed) 306 if hasattr(node, 'post') and node.post: 307 post = node.post 308 if hasattr(post, 'embed') and post.embed: 309 post_images = extract_images_from_embed(post.embed) 310 author_handle = getattr(post.author, 'handle', 'unknown') if hasattr(post, 'author') else 'unknown' 311 for img in post_images: 312 if len(images) >= max_images: 313 break 314 img['author_handle'] = author_handle 315 images.append(img) 316 317 # Traverse replies 318 if hasattr(node, 'replies') and node.replies: 319 for reply in node.replies: 320 if len(images) >= max_images: 321 break 322 traverse_thread(reply) 323 324 if hasattr(thread_data, 'thread'): 325 traverse_thread(thread_data.thread) 326 327 return images 328 329 330def extract_external_link_from_embed(embed) -> dict | None: 331 """Extract external link card data from a post embed (View type). 332 333 External links are shown as "link cards" with URL, title, description, 334 and optional thumbnail. 335 336 Args: 337 embed: The embed object from post.embed (View type) 338 339 Returns: 340 Dict with 'url', 'title', 'description', 'thumbnail' keys, or None 341 """ 342 if not embed: 343 return None 344 345 embed_type = getattr(embed, 'py_type', '') 346 347 # Direct external link embed (app.bsky.embed.external#view) 348 if 'external' in embed_type and hasattr(embed, 'external'): 349 external = embed.external 350 return { 351 'url': getattr(external, 'uri', ''), 352 'title': getattr(external, 'title', ''), 353 'description': getattr(external, 'description', ''), 354 'thumbnail': getattr(external, 'thumb', None) 355 } 356 357 # RecordWithMedia with external link (app.bsky.embed.recordWithMedia#view) 358 if 'recordWithMedia' in embed_type and hasattr(embed, 'media'): 359 media_type = getattr(embed.media, 'py_type', '') 360 if 'external' in media_type and hasattr(embed.media, 'external'): 361 external = embed.media.external 362 return { 363 'url': getattr(external, 'uri', ''), 364 'title': getattr(external, 'title', ''), 365 'description': getattr(external, 'description', ''), 366 'thumbnail': getattr(external, 'thumb', None) 367 } 368 369 return None 370 371 372def extract_quote_post_from_embed(embed) -> dict | None: 373 """Extract quoted post data from a record embed (View type). 374 375 Quote posts embed another post, which can include the quoted text, 376 author, and any media attached to the quoted post. 377 378 Args: 379 embed: The embed object from post.embed (View type) 380 381 Returns: 382 Dict with quote post data, or None if not a quote or unavailable 383 """ 384 if not embed: 385 return None 386 387 embed_type = getattr(embed, 'py_type', '') 388 389 # Get the record object (works for both record and recordWithMedia) 390 record = None 391 if 'recordWithMedia' in embed_type and hasattr(embed, 'record'): 392 # recordWithMedia has record.record for the actual quote 393 record = getattr(embed.record, 'record', None) 394 elif 'record' in embed_type and hasattr(embed, 'record'): 395 record = embed.record 396 397 if not record: 398 return None 399 400 record_type = getattr(record, 'py_type', '') 401 402 # Handle different quote post states 403 if 'viewNotFound' in record_type: 404 return { 405 'status': 'not_found', 406 'uri': getattr(record, 'uri', ''), 407 'message': 'Quoted post was deleted or not found' 408 } 409 410 if 'viewBlocked' in record_type: 411 return { 412 'status': 'blocked', 413 'uri': getattr(record, 'uri', ''), 414 'message': 'Quoted post is from a blocked account' 415 } 416 417 if 'viewDetached' in record_type: 418 return { 419 'status': 'detached', 420 'uri': getattr(record, 'uri', ''), 421 'message': 'Quoted post was detached' 422 } 423 424 # Normal quote post (viewRecord) 425 if 'viewRecord' in record_type or hasattr(record, 'author'): 426 result = { 427 'status': 'available', 428 'uri': getattr(record, 'uri', ''), 429 } 430 431 # Extract author info 432 if hasattr(record, 'author') and record.author: 433 author = record.author 434 result['author'] = { 435 'handle': getattr(author, 'handle', 'unknown'), 436 'display_name': getattr(author, 'display_name', '') or getattr(author, 'handle', 'unknown') 437 } 438 439 # Extract the quoted post text from value 440 # The 'value' field contains the actual post record 441 if hasattr(record, 'value') and record.value: 442 value = record.value 443 # value can be a dict or an object 444 if isinstance(value, dict): 445 result['text'] = value.get('text', '') 446 elif hasattr(value, 'text'): 447 result['text'] = getattr(value, 'text', '') 448 449 # Extract engagement metrics if present 450 metrics = {} 451 if hasattr(record, 'like_count') and record.like_count is not None: 452 metrics['likes'] = record.like_count 453 if hasattr(record, 'repost_count') and record.repost_count is not None: 454 metrics['reposts'] = record.repost_count 455 if hasattr(record, 'reply_count') and record.reply_count is not None: 456 metrics['replies'] = record.reply_count 457 if hasattr(record, 'quote_count') and record.quote_count is not None: 458 metrics['quotes'] = record.quote_count 459 if metrics: 460 result['metrics'] = metrics 461 462 # Add thread context hints (for hybrid thread navigation) 463 thread_context = {} 464 465 # Reply count indicates replies exist below this post 466 if metrics.get('replies'): 467 thread_context['reply_count'] = metrics['replies'] 468 469 # Check if quoted post is itself a reply (has parents above) 470 if hasattr(record, 'value') and record.value: 471 value = record.value 472 reply_ref = value.get('reply') if isinstance(value, dict) else getattr(value, 'reply', None) 473 if reply_ref: 474 thread_context['has_parents'] = True 475 476 if thread_context: 477 result['thread_context'] = thread_context 478 479 # Check for nested embeds in the quoted post 480 if hasattr(record, 'embeds') and record.embeds: 481 nested_embeds = [] 482 for nested in record.embeds: 483 nested_type = getattr(nested, 'py_type', '') 484 if 'images' in nested_type: 485 nested_embeds.append({'type': 'images', 'count': len(getattr(nested, 'images', []))}) 486 elif 'video' in nested_type: 487 nested_embeds.append({'type': 'video'}) 488 elif 'external' in nested_type: 489 ext = getattr(nested, 'external', None) 490 if ext: 491 nested_embeds.append({ 492 'type': 'external_link', 493 'url': getattr(ext, 'uri', ''), 494 'title': getattr(ext, 'title', '') 495 }) 496 if nested_embeds: 497 result['embeds'] = nested_embeds 498 499 return result 500 501 return None 502 503 504def extract_embed_data(embed) -> dict | None: 505 """Extract structured data from any embed type. 506 507 This is the main entry point for embed extraction. It detects the embed 508 type and delegates to the appropriate extraction function. 509 510 Args: 511 embed: The embed object from post.embed (View type) 512 513 Returns: 514 Dict with embed type and extracted data, or None if no embed 515 """ 516 if not embed: 517 return None 518 519 embed_type = getattr(embed, 'py_type', '') 520 521 # Images 522 if 'images' in embed_type and 'record' not in embed_type: 523 images = extract_images_from_embed(embed) 524 if images: 525 return { 526 'type': 'images', 527 'images': images 528 } 529 530 # External link 531 if 'external' in embed_type and 'record' not in embed_type: 532 link = extract_external_link_from_embed(embed) 533 if link: 534 return { 535 'type': 'external_link', 536 'link': link 537 } 538 539 # Quote post (record) 540 if embed_type == 'app.bsky.embed.record#view': 541 quote = extract_quote_post_from_embed(embed) 542 if quote: 543 return { 544 'type': 'quote_post', 545 'quote': quote 546 } 547 548 # Quote post with media (recordWithMedia) 549 if 'recordWithMedia' in embed_type: 550 result = {'type': 'quote_with_media'} 551 552 # Extract the quote 553 quote = extract_quote_post_from_embed(embed) 554 if quote: 555 result['quote'] = quote 556 557 # Extract the media 558 if hasattr(embed, 'media'): 559 media_type = getattr(embed.media, 'py_type', '') 560 if 'images' in media_type: 561 images = extract_images_from_embed(embed) 562 if images: 563 result['media'] = {'type': 'images', 'images': images} 564 elif 'external' in media_type: 565 link = extract_external_link_from_embed(embed) 566 if link: 567 result['media'] = {'type': 'external_link', 'link': link} 568 elif 'video' in media_type: 569 # Basic video info 570 result['media'] = { 571 'type': 'video', 572 'thumbnail': getattr(embed.media, 'thumbnail', None), 573 'alt': getattr(embed.media, 'alt', None) 574 } 575 576 return result 577 578 # Video (basic handling) 579 if 'video' in embed_type: 580 return { 581 'type': 'video', 582 'thumbnail': getattr(embed, 'thumbnail', None), 583 'alt': getattr(embed, 'alt', None) 584 } 585 586 return None 587 588 589def flatten_thread_structure(thread_data): 590 """ 591 Flatten a nested thread structure into a list while preserving all data. 592 593 Args: 594 thread_data: The thread data from get_post_thread 595 596 Returns: 597 Dict with 'posts' key containing a list of posts in chronological order 598 """ 599 posts = [] 600 601 def traverse_thread(node): 602 """Recursively traverse the thread structure to collect posts.""" 603 if not node: 604 return 605 606 # If this node has a parent, traverse it first (to maintain chronological order) 607 if hasattr(node, 'parent') and node.parent: 608 traverse_thread(node.parent) 609 610 # Then add this node's post 611 if hasattr(node, 'post') and node.post: 612 # Extract post data by accessing properties directly (not __dict__) 613 # AT Protocol objects store data in properties, not __dict__ 614 post = node.post 615 616 # Build post dict with proper property access 617 post_dict = {} 618 619 # Extract basic fields 620 if hasattr(post, 'uri'): 621 post_dict['uri'] = post.uri 622 if hasattr(post, 'cid'): 623 post_dict['cid'] = post.cid 624 625 # Extract author info 626 if hasattr(post, 'author') and post.author: 627 author = post.author 628 post_dict['author'] = { 629 'handle': getattr(author, 'handle', 'unknown'), 630 'display_name': getattr(author, 'display_name', 'unknown'), 631 'did': getattr(author, 'did', 'unknown') 632 } 633 634 # Extract record info (text, created_at, etc.) 635 if hasattr(post, 'record') and post.record: 636 record = post.record 637 record_dict = { 638 'text': getattr(record, 'text', ''), 639 'createdAt': getattr(record, 'created_at', 'unknown') 640 } 641 642 # Extract links from facets if present 643 if hasattr(record, 'facets') and record.facets: 644 links = extract_links_from_facets( 645 getattr(record, 'text', ''), 646 record.facets 647 ) 648 if links: 649 record_dict['links'] = links 650 651 post_dict['record'] = record_dict 652 653 # Extract embed data from post.embed (View type with CDN URLs) 654 # This is different from record.embed which only has raw BlobRefs 655 if hasattr(post, 'embed') and post.embed: 656 embed_data = extract_embed_data(post.embed) 657 if embed_data: 658 post_dict['embed'] = embed_data 659 660 # Extract parent_uri for tree visualization 661 parent_uri = None 662 if hasattr(post, 'record') and post.record: 663 record_obj = post.record 664 if hasattr(record_obj, 'reply') and record_obj.reply: 665 reply_ref = record_obj.reply 666 if hasattr(reply_ref, 'parent') and reply_ref.parent: 667 if hasattr(reply_ref.parent, 'uri'): 668 parent_uri = reply_ref.parent.uri 669 post_dict['parent_uri'] = parent_uri 670 671 posts.append(post_dict) 672 673 # Then traverse any replies (going DOWN the thread) 674 if hasattr(node, 'replies') and node.replies: 675 for reply in node.replies: 676 traverse_thread(reply) 677 678 # Handle the thread structure 679 if hasattr(thread_data, 'thread'): 680 # Start from the main thread node 681 traverse_thread(thread_data.thread) 682 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__: 683 traverse_thread(thread_data.__dict__['thread']) 684 685 # Return a simple structure with posts list 686 return {'posts': posts} 687 688 689def count_thread_posts(thread): 690 """ 691 Count the number of posts in a thread. 692 693 Args: 694 thread: The thread data from get_post_thread 695 696 Returns: 697 Integer count of posts in the thread 698 """ 699 flattened = flatten_thread_structure(thread) 700 return len(flattened.get('posts', [])) 701 702 703def compute_tree_prefixes(posts: List[Dict]) -> Dict[str, str]: 704 """ 705 Compute tree-style prefixes based on parent relationships. 706 707 Args: 708 posts: List of post dicts, each with 'uri' and 'parent_uri' keys 709 710 Returns: 711 Dict mapping uri -> prefix string (e.g., "├─ ", "│ └─ ") 712 """ 713 if not posts: 714 return {} 715 716 uri_to_post = {p.get('uri'): p for p in posts if p.get('uri')} 717 children_map: Dict[str, List[str]] = {} # parent_uri -> [child_uris] 718 root_uris: List[str] = [] 719 720 for post in posts: 721 uri = post.get('uri') 722 if not uri: 723 continue 724 parent_uri = post.get('parent_uri') 725 if not parent_uri or parent_uri not in uri_to_post: 726 root_uris.append(uri) 727 else: 728 children_map.setdefault(parent_uri, []).append(uri) 729 730 prefixes: Dict[str, str] = {} 731 visited: set = set() 732 733 def compute_recursive(uri: str, ancestors_last: List[bool]): 734 if uri in visited: 735 return 736 visited.add(uri) 737 738 prefix_parts = [] 739 for is_last in ancestors_last[:-1]: 740 prefix_parts.append(" " if is_last else "") 741 if ancestors_last: 742 prefix_parts.append("└─ " if ancestors_last[-1] else "├─ ") 743 prefixes[uri] = "".join(prefix_parts) 744 745 children = children_map.get(uri, []) 746 for i, child_uri in enumerate(children): 747 compute_recursive(child_uri, ancestors_last + [i == len(children) - 1]) 748 749 for i, root_uri in enumerate(root_uris): 750 if len(root_uris) == 1: 751 prefixes[root_uri] = "" 752 children = children_map.get(root_uri, []) 753 for j, child_uri in enumerate(children): 754 compute_recursive(child_uri, [j == len(children) - 1]) 755 else: 756 compute_recursive(root_uri, [i == len(root_uris) - 1]) 757 758 return prefixes 759 760 761def build_tree_view(posts: List[Dict]) -> str: 762 """ 763 Build a tree-style text visualization of a thread. 764 765 Args: 766 posts: List of post dicts with uri, parent_uri, author, record fields 767 768 Returns: 769 Multi-line string showing thread structure with tree prefixes 770 """ 771 if not posts: 772 return "(empty thread)" 773 774 prefixes = compute_tree_prefixes(posts) 775 lines = [] 776 777 for post in posts: 778 uri = post.get('uri', '') 779 prefix = prefixes.get(uri, '') 780 781 author = post.get('author', {}) 782 handle = author.get('handle', 'unknown') 783 record = post.get('record', {}) 784 text = record.get('text', '').replace('\n', ' | ') 785 786 lines.append(f"{prefix}@{handle}: {text}") 787 788 return "\n".join(lines) 789 790 791def thread_to_yaml_string(thread, strip_metadata=True, include_tree_view=True): 792 """ 793 Convert thread data to a YAML-formatted string for LLM parsing. 794 795 Args: 796 thread: The thread data from get_post_thread 797 strip_metadata: Whether to strip metadata fields for cleaner output 798 include_tree_view: Whether to prepend a tree visualization of the thread 799 800 Returns: 801 String representation of the thread with optional tree view and YAML data 802 """ 803 # First flatten the thread structure to avoid deep nesting 804 flattened = flatten_thread_structure(thread) 805 posts = flattened.get('posts', []) 806 807 output_parts = [] 808 809 # Build tree visualization if requested 810 if include_tree_view and posts: 811 tree_view = build_tree_view(posts) 812 output_parts.append("THREAD STRUCTURE:") 813 output_parts.append(tree_view) 814 output_parts.append("") 815 output_parts.append("FULL POST DATA:") 816 817 # Convert complex objects to basic types 818 basic_thread = convert_to_basic_types(flattened) 819 820 if strip_metadata: 821 # Create a copy and strip unwanted fields 822 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS) 823 else: 824 cleaned_thread = basic_thread 825 826 yaml_output = yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False) 827 output_parts.append(yaml_output) 828 829 return "\n".join(output_parts) 830 831 832 833 834 835 836 837def get_session(username: str) -> Optional[str]: 838 try: 839 with open(f"session_{username}.txt", encoding="UTF-8") as f: 840 return f.read() 841 except FileNotFoundError: 842 logger.debug(f"No existing session found for {username}") 843 return None 844 845def save_session(username: str, session_string: str) -> None: 846 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f: 847 f.write(session_string) 848 logger.debug(f"Session saved for {username}") 849 850def on_session_change(username: str, event: SessionEvent, session: Session) -> None: 851 logger.debug(f"Session changed: {event} {repr(session)}") 852 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 853 logger.debug(f"Saving changed session for {username}") 854 save_session(username, session.export()) 855 856def init_client(username: str, password: str) -> Client: 857 pds_uri = os.getenv("PDS_URI") 858 if pds_uri is None: 859 logger.warning( 860 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable." 861 ) 862 pds_uri = "https://bsky.social" 863 864 # Print the PDS URI 865 logger.debug(f"Using PDS URI: {pds_uri}") 866 867 client = Client(pds_uri) 868 client.on_session_change( 869 lambda event, session: on_session_change(username, event, session) 870 ) 871 872 session_string = get_session(username) 873 if session_string: 874 logger.debug(f"Reusing existing session for {username}") 875 client.login(session_string=session_string) 876 else: 877 logger.debug(f"Creating new session for {username}") 878 client.login(username, password) 879 880 return client 881 882 883def default_login() -> Client: 884 """Login using configuration from config.yaml or environment variables.""" 885 try: 886 from config_loader import get_bluesky_config 887 bluesky_config = get_bluesky_config() 888 889 username = bluesky_config['username'] 890 password = bluesky_config['password'] 891 pds_uri = bluesky_config.get('pds_uri', 'https://bsky.social') 892 893 logger.info(f"Logging into Bluesky as {username} via {pds_uri}") 894 895 # Use pds_uri from config 896 client = Client(base_url=pds_uri) 897 client.login(username, password) 898 return client 899 900 except Exception as e: 901 logger.error(f"Failed to load Bluesky configuration: {e}") 902 logger.error("Please check your config.yaml file or environment variables") 903 exit(1) 904 905def remove_outside_quotes(text: str) -> str: 906 """ 907 Remove outside double quotes from response text. 908 909 Only handles double quotes to avoid interfering with contractions: 910 - Double quotes: "text" → text 911 - Preserves single quotes and internal quotes 912 913 Args: 914 text: The text to process 915 916 Returns: 917 Text with outside double quotes removed 918 """ 919 if not text or len(text) < 2: 920 return text 921 922 text = text.strip() 923 924 # Only remove double quotes from start and end 925 if text.startswith('"') and text.endswith('"'): 926 return text[1:-1] 927 928 return text 929 930def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None, correlation_id: Optional[str] = None) -> Dict[str, Any]: 931 """ 932 Reply to a post on Bluesky with rich text support. 933 934 Args: 935 client: Authenticated Bluesky client 936 text: The reply text 937 reply_to_uri: The URI of the post being replied to (parent) 938 reply_to_cid: The CID of the post being replied to (parent) 939 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri 940 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid 941 lang: Language code for the post (e.g., 'en-US', 'es', 'ja') 942 correlation_id: Unique ID for tracking this message through the pipeline 943 944 Returns: 945 The response from sending the post 946 """ 947 import re 948 949 # Generate correlation ID if not provided 950 if correlation_id is None: 951 correlation_id = str(uuid.uuid4())[:8] 952 953 # Enhanced logging with structured data 954 logger.info(f"[{correlation_id}] Starting reply_to_post", extra={ 955 'correlation_id': correlation_id, 956 'text_length': len(text), 957 'text_preview': text[:100] + '...' if len(text) > 100 else text, 958 'reply_to_uri': reply_to_uri, 959 'root_uri': root_uri, 960 'lang': lang 961 }) 962 963 start_time = time.time() 964 965 # If root is not provided, this is a reply to the root post 966 if root_uri is None: 967 root_uri = reply_to_uri 968 root_cid = reply_to_cid 969 970 # Create references for the reply 971 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid)) 972 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid)) 973 974 # Parse rich text facets (mentions and URLs) 975 facets = [] 976 text_bytes = text.encode("UTF-8") 977 mentions_found = [] 978 urls_found = [] 979 980 logger.debug(f"[{correlation_id}] Parsing facets from text (length: {len(text_bytes)} bytes)") 981 982 # Parse mentions - fixed to handle @ at start of text 983 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 984 985 for m in re.finditer(mention_regex, text_bytes): 986 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix 987 mentions_found.append(handle) 988 # Adjust byte positions to account for the optional prefix 989 mention_start = m.start(1) 990 mention_end = m.end(1) 991 try: 992 # Resolve handle to DID using the API 993 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle}) 994 if resolve_resp and hasattr(resolve_resp, 'did'): 995 facets.append( 996 models.AppBskyRichtextFacet.Main( 997 index=models.AppBskyRichtextFacet.ByteSlice( 998 byteStart=mention_start, 999 byteEnd=mention_end 1000 ), 1001 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)] 1002 ) 1003 ) 1004 logger.debug(f"[{correlation_id}] Resolved mention @{handle} -> {resolve_resp.did}") 1005 except Exception as e: 1006 logger.warning(f"[{correlation_id}] Failed to resolve handle @{handle}: {e}") 1007 continue 1008 1009 # Parse URLs - fixed to handle URLs at start of text 1010 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 1011 1012 for m in re.finditer(url_regex, text_bytes): 1013 url = m.group(1).decode("UTF-8") 1014 urls_found.append(url) 1015 # Adjust byte positions to account for the optional prefix 1016 url_start = m.start(1) 1017 url_end = m.end(1) 1018 facets.append( 1019 models.AppBskyRichtextFacet.Main( 1020 index=models.AppBskyRichtextFacet.ByteSlice( 1021 byteStart=url_start, 1022 byteEnd=url_end 1023 ), 1024 features=[models.AppBskyRichtextFacet.Link(uri=url)] 1025 ) 1026 ) 1027 logger.debug(f"[{correlation_id}] Found URL: {url}") 1028 1029 # Parse hashtags 1030 hashtag_regex = rb"(?:^|[$|\s])#([a-zA-Z0-9_]+)" 1031 hashtags_found = [] 1032 1033 for m in re.finditer(hashtag_regex, text_bytes): 1034 tag = m.group(1).decode("UTF-8") # Get tag without # prefix 1035 hashtags_found.append(tag) 1036 # Get byte positions for the entire hashtag including # 1037 tag_start = m.start(0) 1038 # Adjust start if there's a space/prefix 1039 if text_bytes[tag_start:tag_start+1] in (b' ', b'$'): 1040 tag_start += 1 1041 tag_end = m.end(0) 1042 facets.append( 1043 models.AppBskyRichtextFacet.Main( 1044 index=models.AppBskyRichtextFacet.ByteSlice( 1045 byteStart=tag_start, 1046 byteEnd=tag_end 1047 ), 1048 features=[models.AppBskyRichtextFacet.Tag(tag=tag)] 1049 ) 1050 ) 1051 logger.debug(f"[{correlation_id}] Found hashtag: #{tag}") 1052 1053 logger.debug(f"[{correlation_id}] Facet parsing complete", extra={ 1054 'correlation_id': correlation_id, 1055 'mentions_count': len(mentions_found), 1056 'mentions': mentions_found, 1057 'urls_count': len(urls_found), 1058 'urls': urls_found, 1059 'hashtags_count': len(hashtags_found), 1060 'hashtags': hashtags_found, 1061 'total_facets': len(facets) 1062 }) 1063 1064 # Send the reply with facets if any were found 1065 logger.info(f"[{correlation_id}] Sending reply to Bluesky API", extra={ 1066 'correlation_id': correlation_id, 1067 'has_facets': bool(facets), 1068 'facet_count': len(facets), 1069 'lang': lang 1070 }) 1071 1072 try: 1073 if facets: 1074 response = client.send_post( 1075 text=text, 1076 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 1077 facets=facets, 1078 langs=[lang] if lang else None 1079 ) 1080 else: 1081 response = client.send_post( 1082 text=text, 1083 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 1084 langs=[lang] if lang else None 1085 ) 1086 1087 # Calculate response time 1088 response_time = time.time() - start_time 1089 1090 # Extract post URL for user-friendly logging 1091 post_url = None 1092 if hasattr(response, 'uri') and response.uri: 1093 # Convert AT-URI to web URL 1094 # Format: at://did:plc:xxx/app.bsky.feed.post/xxx -> https://bsky.app/profile/handle/post/xxx 1095 try: 1096 uri_parts = response.uri.split('/') 1097 if len(uri_parts) >= 4 and uri_parts[3] == 'app.bsky.feed.post': 1098 rkey = uri_parts[4] 1099 # We'd need to resolve DID to handle, but for now just use the URI 1100 post_url = f"bsky://post/{rkey}" 1101 except: 1102 pass 1103 1104 logger.info(f"[{correlation_id}] Reply sent successfully ({response_time:.3f}s) - URI: {response.uri}" + 1105 (f" - URL: {post_url}" if post_url else ""), extra={ 1106 'correlation_id': correlation_id, 1107 'response_time': round(response_time, 3), 1108 'post_uri': response.uri, 1109 'post_url': post_url, 1110 'post_cid': getattr(response, 'cid', None), 1111 'text_length': len(text) 1112 }) 1113 1114 return response 1115 1116 except Exception as e: 1117 response_time = time.time() - start_time 1118 logger.error(f"[{correlation_id}] Failed to send reply", extra={ 1119 'correlation_id': correlation_id, 1120 'error': str(e), 1121 'error_type': type(e).__name__, 1122 'response_time': round(response_time, 3), 1123 'text_length': len(text) 1124 }) 1125 raise 1126 1127 1128def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]: 1129 """ 1130 Get the thread containing a post to find root post information. 1131 1132 Args: 1133 client: Authenticated Bluesky client 1134 uri: The URI of the post 1135 1136 Returns: 1137 The thread data or None if not found 1138 """ 1139 try: 1140 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10}) 1141 return thread 1142 except Exception as e: 1143 logger.error(f"Error fetching post thread: {e}") 1144 return None 1145 1146 1147def find_last_consecutive_post_in_chain(thread_node, author_handle: str): 1148 """ 1149 Find the last consecutive post in the direct reply chain by the same author. 1150 1151 Starting from the given thread node, this function traverses down the direct reply chain 1152 (not all branches) to find the last consecutive post made by the specified author. 1153 1154 Args: 1155 thread_node: The thread node to start from (usually the mention post's thread node) 1156 author_handle: The handle of the author to match (e.g., "user.bsky.social") 1157 1158 Returns: 1159 Tuple of (uri, cid, text) for the last consecutive post by the author, or None if no consecutive posts 1160 1161 Example: 1162 If the thread structure is: 1163 - Post A by @alice (mention) -> thread_node starts here 1164 - Post B by @alice (consecutive) 1165 - Post C by @alice (consecutive) 1166 - Post D by @bob (different author, stop here) 1167 1168 Returns (uri_C, cid_C, text_C) 1169 """ 1170 if not thread_node: 1171 return None 1172 1173 # Start with the current node's post 1174 current_post = None 1175 if hasattr(thread_node, 'post') and thread_node.post: 1176 current_post = thread_node.post 1177 1178 if not current_post: 1179 return None 1180 1181 # Check if current post is by the target author 1182 current_author = None 1183 if hasattr(current_post, 'author') and hasattr(current_post.author, 'handle'): 1184 current_author = current_post.author.handle 1185 1186 if current_author != author_handle: 1187 # Current post is not by target author, can't find consecutive posts 1188 return None 1189 1190 # Track the last consecutive post (start with current) 1191 last_uri = current_post.uri if hasattr(current_post, 'uri') else None 1192 last_cid = current_post.cid if hasattr(current_post, 'cid') else None 1193 last_text = "" 1194 if hasattr(current_post, 'record') and hasattr(current_post.record, 'text'): 1195 last_text = current_post.record.text 1196 1197 # Traverse down the direct reply chain 1198 current_node = thread_node 1199 while True: 1200 # Check if there are replies to this node 1201 if not hasattr(current_node, 'replies') or not current_node.replies: 1202 # No more replies, we've found the last consecutive post 1203 break 1204 1205 # For direct chain traversal, we look for replies by the same author 1206 # If there are multiple replies, we'll take the first one by the same author 1207 next_node = None 1208 for reply in current_node.replies: 1209 if hasattr(reply, 'post') and reply.post: 1210 reply_author = None 1211 if hasattr(reply.post, 'author') and hasattr(reply.post.author, 'handle'): 1212 reply_author = reply.post.author.handle 1213 1214 if reply_author == author_handle: 1215 # Found a consecutive post by same author 1216 next_node = reply 1217 break 1218 1219 if not next_node: 1220 # No more consecutive posts by same author 1221 break 1222 1223 # Update last post info to this consecutive post 1224 current_node = next_node 1225 current_post = current_node.post 1226 1227 if hasattr(current_post, 'uri'): 1228 last_uri = current_post.uri 1229 if hasattr(current_post, 'cid'): 1230 last_cid = current_post.cid 1231 if hasattr(current_post, 'record') and hasattr(current_post.record, 'text'): 1232 last_text = current_post.record.text 1233 1234 # Return the last consecutive post's metadata 1235 # Only return if we actually have valid URI and CID 1236 if last_uri and last_cid: 1237 return (last_uri, last_cid, last_text) 1238 1239 return None 1240 1241 1242def find_consecutive_parent_posts_by_author(thread_node, author_handle: str) -> List[Dict]: 1243 """ 1244 Find consecutive posts by the same author in the parent chain. 1245 1246 Starting from the given thread node, this function traverses UP the parent chain 1247 to find all consecutive posts made by the specified author. 1248 1249 This is the inverse of find_last_consecutive_post_in_chain which traverses DOWN. 1250 1251 Args: 1252 thread_node: The thread node to start from (the notification post's thread node) 1253 author_handle: The handle of the author to match (e.g., "user.bsky.social") 1254 1255 Returns: 1256 List of post dicts for consecutive posts by the author in the parent chain, 1257 in chronological order (oldest first). Returns empty list if no parent posts 1258 by the same author. 1259 1260 Example: 1261 If the thread structure is: 1262 - Post A by @alice (first part) 1263 - Post B by @alice (consecutive) <- start from here (notification) 1264 1265 Returns [Post A dict] (not including Post B since that's the current node) 1266 """ 1267 parent_posts = [] 1268 1269 if not thread_node: 1270 return parent_posts 1271 1272 # Traverse up the parent chain 1273 current_node = thread_node 1274 while True: 1275 # Check if this node has a parent 1276 if not hasattr(current_node, 'parent') or not current_node.parent: 1277 break 1278 1279 parent_node = current_node.parent 1280 if not hasattr(parent_node, 'post') or not parent_node.post: 1281 break 1282 1283 parent_post = parent_node.post 1284 1285 # Check if parent is by the same author 1286 parent_author = None 1287 if hasattr(parent_post, 'author') and hasattr(parent_post.author, 'handle'): 1288 parent_author = parent_post.author.handle 1289 1290 if parent_author != author_handle: 1291 # Parent is by different author, stop here 1292 break 1293 1294 # Collect this parent post 1295 post_dict = { 1296 'uri': getattr(parent_post, 'uri', ''), 1297 'cid': getattr(parent_post, 'cid', ''), 1298 'author': { 1299 'handle': parent_author, 1300 'display_name': getattr(parent_post.author, 'display_name', '') if hasattr(parent_post, 'author') else '', 1301 'did': getattr(parent_post.author, 'did', '') if hasattr(parent_post, 'author') else '' 1302 }, 1303 'record': { 1304 'text': getattr(parent_post.record, 'text', '') if hasattr(parent_post, 'record') else '', 1305 'createdAt': getattr(parent_post.record, 'created_at', '') if hasattr(parent_post, 'record') else '' 1306 } 1307 } 1308 parent_posts.append(post_dict) 1309 1310 # Move up to the next parent 1311 current_node = parent_node 1312 1313 # Return in chronological order (oldest first) 1314 parent_posts.reverse() 1315 return parent_posts 1316 1317 1318def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[Dict[str, Any]]: 1319 """ 1320 Reply to a notification (mention or reply). 1321 1322 Args: 1323 client: Authenticated Bluesky client 1324 notification: The notification object from list_notifications 1325 reply_text: The text to reply with 1326 lang: Language code for the post (defaults to "en-US") 1327 correlation_id: Unique ID for tracking this message through the pipeline 1328 1329 Returns: 1330 The response from sending the reply or None if failed 1331 """ 1332 # Generate correlation ID if not provided 1333 if correlation_id is None: 1334 correlation_id = str(uuid.uuid4())[:8] 1335 1336 logger.info(f"[{correlation_id}] Processing reply_to_notification", extra={ 1337 'correlation_id': correlation_id, 1338 'reply_length': len(reply_text), 1339 'lang': lang 1340 }) 1341 1342 try: 1343 # Get the post URI and CID from the notification (handle both dict and object) 1344 if isinstance(notification, dict): 1345 post_uri = notification.get('uri') 1346 post_cid = notification.get('cid') 1347 # Check if the notification record has reply info with root 1348 record = notification.get('record', {}) 1349 reply_info = record.get('reply') if isinstance(record, dict) else None 1350 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 1351 post_uri = notification.uri 1352 post_cid = notification.cid 1353 # Check if the notification record has reply info with root 1354 reply_info = None 1355 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 1356 reply_info = notification.record.reply 1357 else: 1358 post_uri = None 1359 post_cid = None 1360 reply_info = None 1361 1362 if not post_uri or not post_cid: 1363 logger.error("Notification doesn't have required uri/cid fields") 1364 return None 1365 1366 # Determine root: if post has reply info, use its root; otherwise this post IS the root 1367 if reply_info: 1368 # Extract root from the notification's reply structure 1369 if isinstance(reply_info, dict): 1370 root_ref = reply_info.get('root') 1371 if root_ref and isinstance(root_ref, dict): 1372 root_uri = root_ref.get('uri', post_uri) 1373 root_cid = root_ref.get('cid', post_cid) 1374 else: 1375 # No root in reply info, use post as root 1376 root_uri = post_uri 1377 root_cid = post_cid 1378 elif hasattr(reply_info, 'root'): 1379 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 1380 root_uri = reply_info.root.uri 1381 root_cid = reply_info.root.cid 1382 else: 1383 root_uri = post_uri 1384 root_cid = post_cid 1385 else: 1386 root_uri = post_uri 1387 root_cid = post_cid 1388 else: 1389 # No reply info means this post IS the root 1390 root_uri = post_uri 1391 root_cid = post_cid 1392 1393 # Reply to the notification 1394 return reply_to_post( 1395 client=client, 1396 text=reply_text, 1397 reply_to_uri=post_uri, 1398 reply_to_cid=post_cid, 1399 root_uri=root_uri, 1400 root_cid=root_cid, 1401 lang=lang, 1402 correlation_id=correlation_id 1403 ) 1404 1405 except Exception as e: 1406 logger.error(f"[{correlation_id}] Error replying to notification: {e}", extra={ 1407 'correlation_id': correlation_id, 1408 'error': str(e), 1409 'error_type': type(e).__name__ 1410 }) 1411 return None 1412 1413 1414def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[List[Dict[str, Any]]]: 1415 """ 1416 Reply to a notification with a threaded chain of messages (max 15). 1417 1418 Args: 1419 client: Authenticated Bluesky client 1420 notification: The notification object from list_notifications 1421 reply_messages: List of reply texts (max 15 messages, each max 300 chars) 1422 lang: Language code for the posts (defaults to "en-US") 1423 correlation_id: Unique ID for tracking this message through the pipeline 1424 1425 Returns: 1426 List of responses from sending the replies or None if failed 1427 """ 1428 # Generate correlation ID if not provided 1429 if correlation_id is None: 1430 correlation_id = str(uuid.uuid4())[:8] 1431 1432 logger.info(f"[{correlation_id}] Starting threaded reply", extra={ 1433 'correlation_id': correlation_id, 1434 'message_count': len(reply_messages), 1435 'total_length': sum(len(msg) for msg in reply_messages), 1436 'lang': lang 1437 }) 1438 1439 try: 1440 # Validate input 1441 if not reply_messages or len(reply_messages) == 0: 1442 logger.error(f"[{correlation_id}] Reply messages list cannot be empty") 1443 return None 1444 if len(reply_messages) > 15: 1445 logger.error(f"[{correlation_id}] Cannot send more than 15 reply messages (got {len(reply_messages)})") 1446 return None 1447 1448 # Get the post URI and CID from the notification (handle both dict and object) 1449 if isinstance(notification, dict): 1450 post_uri = notification.get('uri') 1451 post_cid = notification.get('cid') 1452 # Check if the notification record has reply info with root 1453 record = notification.get('record', {}) 1454 reply_info = record.get('reply') if isinstance(record, dict) else None 1455 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 1456 post_uri = notification.uri 1457 post_cid = notification.cid 1458 # Check if the notification record has reply info with root 1459 reply_info = None 1460 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 1461 reply_info = notification.record.reply 1462 else: 1463 post_uri = None 1464 post_cid = None 1465 reply_info = None 1466 1467 if not post_uri or not post_cid: 1468 logger.error("Notification doesn't have required uri/cid fields") 1469 return None 1470 1471 # Determine root: if post has reply info, use its root; otherwise this post IS the root 1472 if reply_info: 1473 # Extract root from the notification's reply structure 1474 if isinstance(reply_info, dict): 1475 root_ref = reply_info.get('root') 1476 if root_ref and isinstance(root_ref, dict): 1477 root_uri = root_ref.get('uri', post_uri) 1478 root_cid = root_ref.get('cid', post_cid) 1479 else: 1480 # No root in reply info, use post as root 1481 root_uri = post_uri 1482 root_cid = post_cid 1483 elif hasattr(reply_info, 'root'): 1484 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 1485 root_uri = reply_info.root.uri 1486 root_cid = reply_info.root.cid 1487 else: 1488 root_uri = post_uri 1489 root_cid = post_cid 1490 else: 1491 root_uri = post_uri 1492 root_cid = post_cid 1493 else: 1494 # No reply info means this post IS the root 1495 root_uri = post_uri 1496 root_cid = post_cid 1497 1498 # Send replies in sequence, creating a thread 1499 responses = [] 1500 current_parent_uri = post_uri 1501 current_parent_cid = post_cid 1502 1503 for i, message in enumerate(reply_messages): 1504 thread_correlation_id = f"{correlation_id}-{i+1}" 1505 logger.info(f"[{thread_correlation_id}] Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...") 1506 1507 # Send this reply 1508 response = reply_to_post( 1509 client=client, 1510 text=message, 1511 reply_to_uri=current_parent_uri, 1512 reply_to_cid=current_parent_cid, 1513 root_uri=root_uri, 1514 root_cid=root_cid, 1515 lang=lang, 1516 correlation_id=thread_correlation_id 1517 ) 1518 1519 if not response: 1520 logger.error(f"[{thread_correlation_id}] Failed to send reply {i+1}, posting system failure message") 1521 # Try to post a system failure message 1522 failure_response = reply_to_post( 1523 client=client, 1524 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]", 1525 reply_to_uri=current_parent_uri, 1526 reply_to_cid=current_parent_cid, 1527 root_uri=root_uri, 1528 root_cid=root_cid, 1529 lang=lang, 1530 correlation_id=f"{thread_correlation_id}-FAIL" 1531 ) 1532 if failure_response: 1533 responses.append(failure_response) 1534 current_parent_uri = failure_response.uri 1535 current_parent_cid = failure_response.cid 1536 else: 1537 logger.error(f"[{thread_correlation_id}] Could not even send system failure message, stopping thread") 1538 return responses if responses else None 1539 else: 1540 responses.append(response) 1541 # Update parent references for next reply (if any) 1542 if i < len(reply_messages) - 1: # Not the last message 1543 current_parent_uri = response.uri 1544 current_parent_cid = response.cid 1545 1546 logger.info(f"[{correlation_id}] Successfully sent {len(responses)} threaded replies", extra={ 1547 'correlation_id': correlation_id, 1548 'replies_sent': len(responses), 1549 'replies_requested': len(reply_messages) 1550 }) 1551 return responses 1552 1553 except Exception as e: 1554 logger.error(f"[{correlation_id}] Error sending threaded reply to notification: {e}", extra={ 1555 'correlation_id': correlation_id, 1556 'error': str(e), 1557 'error_type': type(e).__name__, 1558 'message_count': len(reply_messages) 1559 }) 1560 return None 1561 1562 1563def create_synthesis_ack(client: Client, note: str) -> Optional[Dict[str, Any]]: 1564 """ 1565 Create a stream.thought.ack record for synthesis without a target post. 1566 1567 This creates a synthesis acknowledgment with null subject field. 1568 1569 Args: 1570 client: Authenticated Bluesky client 1571 note: The synthesis note/content 1572 1573 Returns: 1574 The response from creating the acknowledgment record or None if failed 1575 """ 1576 try: 1577 import requests 1578 import json 1579 from datetime import datetime, timezone 1580 1581 # Get session info from the client 1582 access_token = None 1583 user_did = None 1584 1585 # Try different ways to get the session info 1586 if hasattr(client, '_session') and client._session: 1587 access_token = client._session.access_jwt 1588 user_did = client._session.did 1589 elif hasattr(client, 'access_jwt'): 1590 access_token = client.access_jwt 1591 user_did = client.did if hasattr(client, 'did') else None 1592 else: 1593 logger.error("Cannot access client session information") 1594 return None 1595 1596 if not access_token or not user_did: 1597 logger.error("Missing access token or DID from session") 1598 return None 1599 1600 # Get PDS URI from config instead of environment variables 1601 from config_loader import get_bluesky_config 1602 bluesky_config = get_bluesky_config() 1603 pds_host = bluesky_config['pds_uri'] 1604 1605 # Create acknowledgment record with null subject 1606 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 1607 ack_record = { 1608 "$type": "stream.thought.ack", 1609 "subject": None, # Null subject for synthesis 1610 "createdAt": now, 1611 "note": note 1612 } 1613 1614 # Create the record 1615 headers = {"Authorization": f"Bearer {access_token}"} 1616 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 1617 1618 create_data = { 1619 "repo": user_did, 1620 "collection": "stream.thought.ack", 1621 "record": ack_record 1622 } 1623 1624 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 1625 response.raise_for_status() 1626 result = response.json() 1627 1628 logger.info(f"Successfully created synthesis acknowledgment") 1629 return result 1630 1631 except Exception as e: 1632 logger.error(f"Error creating synthesis acknowledgment: {e}") 1633 return None 1634 1635 1636def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]: 1637 """ 1638 Create a stream.thought.ack record to acknowledge a post. 1639 1640 This creates a custom acknowledgment record instead of a standard Bluesky like, 1641 allowing void to track which posts it has engaged with. 1642 1643 Args: 1644 client: Authenticated Bluesky client 1645 post_uri: The URI of the post to acknowledge 1646 post_cid: The CID of the post to acknowledge 1647 note: Optional note to attach to the acknowledgment 1648 1649 Returns: 1650 The response from creating the acknowledgment record or None if failed 1651 """ 1652 try: 1653 import requests 1654 import json 1655 from datetime import datetime, timezone 1656 1657 # Get session info from the client 1658 # The atproto Client stores the session differently 1659 access_token = None 1660 user_did = None 1661 1662 # Try different ways to get the session info 1663 if hasattr(client, '_session') and client._session: 1664 access_token = client._session.access_jwt 1665 user_did = client._session.did 1666 elif hasattr(client, 'access_jwt'): 1667 access_token = client.access_jwt 1668 user_did = client.did if hasattr(client, 'did') else None 1669 else: 1670 logger.error("Cannot access client session information") 1671 return None 1672 1673 if not access_token or not user_did: 1674 logger.error("Missing access token or DID from session") 1675 return None 1676 1677 # Get PDS URI from config instead of environment variables 1678 from config_loader import get_bluesky_config 1679 bluesky_config = get_bluesky_config() 1680 pds_host = bluesky_config['pds_uri'] 1681 1682 # Create acknowledgment record with stream.thought.ack type 1683 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 1684 ack_record = { 1685 "$type": "stream.thought.ack", 1686 "subject": { 1687 "uri": post_uri, 1688 "cid": post_cid 1689 }, 1690 "createdAt": now, 1691 "note": note # Will be null if no note provided 1692 } 1693 1694 # Create the record 1695 headers = {"Authorization": f"Bearer {access_token}"} 1696 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 1697 1698 create_data = { 1699 "repo": user_did, 1700 "collection": "stream.thought.ack", 1701 "record": ack_record 1702 } 1703 1704 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 1705 response.raise_for_status() 1706 result = response.json() 1707 1708 logger.info(f"Successfully acknowledged post: {post_uri}") 1709 return result 1710 1711 except Exception as e: 1712 logger.error(f"Error acknowledging post: {e}") 1713 return None 1714 1715 1716def create_tool_call_record(client: Client, tool_name: str, arguments: str, tool_call_id: Optional[str] = None) -> Optional[Dict[str, Any]]: 1717 """ 1718 Create a stream.thought.tool_call record to track tool usage. 1719 1720 This creates a record of tool calls made by void during processing, 1721 allowing for analysis of tool usage patterns and debugging. 1722 1723 Args: 1724 client: Authenticated Bluesky client 1725 tool_name: Name of the tool being called 1726 arguments: Raw JSON string of the tool arguments 1727 tool_call_id: Optional ID of the tool call for correlation 1728 1729 Returns: 1730 The response from creating the tool call record or None if failed 1731 """ 1732 try: 1733 import requests 1734 import json 1735 from datetime import datetime, timezone 1736 1737 # Get session info from the client 1738 access_token = None 1739 user_did = None 1740 1741 # Try different ways to get the session info 1742 if hasattr(client, '_session') and client._session: 1743 access_token = client._session.access_jwt 1744 user_did = client._session.did 1745 elif hasattr(client, 'access_jwt'): 1746 access_token = client.access_jwt 1747 user_did = client.did if hasattr(client, 'did') else None 1748 else: 1749 logger.error("Cannot access client session information") 1750 return None 1751 1752 if not access_token or not user_did: 1753 logger.error("Missing access token or DID from session") 1754 return None 1755 1756 # Get PDS URI from config instead of environment variables 1757 from config_loader import get_bluesky_config 1758 bluesky_config = get_bluesky_config() 1759 pds_host = bluesky_config['pds_uri'] 1760 1761 # Create tool call record 1762 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 1763 tool_record = { 1764 "$type": "stream.thought.tool.call", 1765 "tool_name": tool_name, 1766 "arguments": arguments, # Store as string to avoid parsing issues 1767 "createdAt": now 1768 } 1769 1770 # Add tool_call_id if provided 1771 if tool_call_id: 1772 tool_record["tool_call_id"] = tool_call_id 1773 1774 # Create the record 1775 headers = {"Authorization": f"Bearer {access_token}"} 1776 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 1777 1778 create_data = { 1779 "repo": user_did, 1780 "collection": "stream.thought.tool.call", 1781 "record": tool_record 1782 } 1783 1784 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 1785 if response.status_code != 200: 1786 logger.error(f"Tool call record creation failed: {response.status_code} - {response.text}") 1787 response.raise_for_status() 1788 result = response.json() 1789 1790 logger.debug(f"Successfully recorded tool call: {tool_name}") 1791 return result 1792 1793 except Exception as e: 1794 logger.error(f"Error creating tool call record: {e}") 1795 return None 1796 1797 1798def create_reasoning_record(client: Client, reasoning_text: str) -> Optional[Dict[str, Any]]: 1799 """ 1800 Create a stream.thought.reasoning record to track agent reasoning. 1801 1802 This creates a record of void's reasoning during message processing, 1803 providing transparency into the decision-making process. 1804 1805 Args: 1806 client: Authenticated Bluesky client 1807 reasoning_text: The reasoning text from the agent 1808 1809 Returns: 1810 The response from creating the reasoning record or None if failed 1811 """ 1812 try: 1813 import requests 1814 import json 1815 from datetime import datetime, timezone 1816 1817 # Get session info from the client 1818 access_token = None 1819 user_did = None 1820 1821 # Try different ways to get the session info 1822 if hasattr(client, '_session') and client._session: 1823 access_token = client._session.access_jwt 1824 user_did = client._session.did 1825 elif hasattr(client, 'access_jwt'): 1826 access_token = client.access_jwt 1827 user_did = client.did if hasattr(client, 'did') else None 1828 else: 1829 logger.error("Cannot access client session information") 1830 return None 1831 1832 if not access_token or not user_did: 1833 logger.error("Missing access token or DID from session") 1834 return None 1835 1836 # Get PDS URI from config instead of environment variables 1837 from config_loader import get_bluesky_config 1838 bluesky_config = get_bluesky_config() 1839 pds_host = bluesky_config['pds_uri'] 1840 1841 # Create reasoning record 1842 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 1843 reasoning_record = { 1844 "$type": "stream.thought.reasoning", 1845 "reasoning": reasoning_text, 1846 "createdAt": now 1847 } 1848 1849 # Create the record 1850 headers = {"Authorization": f"Bearer {access_token}"} 1851 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 1852 1853 create_data = { 1854 "repo": user_did, 1855 "collection": "stream.thought.reasoning", 1856 "record": reasoning_record 1857 } 1858 1859 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 1860 response.raise_for_status() 1861 result = response.json() 1862 1863 logger.debug(f"Successfully recorded reasoning (length: {len(reasoning_text)} chars)") 1864 return result 1865 1866 except Exception as e: 1867 logger.error(f"Error creating reasoning record: {e}") 1868 return None 1869 1870 1871def create_memory_record(client: Client, content: str, tags: Optional[List[str]] = None) -> Optional[Dict[str, Any]]: 1872 """ 1873 Create a stream.thought.memory record to store archival memory insertions. 1874 1875 This creates a record of archival_memory_insert tool calls, preserving 1876 important memories and context in the AT Protocol. 1877 1878 Args: 1879 client: Authenticated Bluesky client 1880 content: The memory content being archived 1881 tags: Optional list of tags associated with this memory 1882 1883 Returns: 1884 The response from creating the memory record or None if failed 1885 """ 1886 try: 1887 import requests 1888 import json 1889 from datetime import datetime, timezone 1890 1891 # Get session info from the client 1892 access_token = None 1893 user_did = None 1894 1895 # Try different ways to get the session info 1896 if hasattr(client, '_session') and client._session: 1897 access_token = client._session.access_jwt 1898 user_did = client._session.did 1899 elif hasattr(client, 'access_jwt'): 1900 access_token = client.access_jwt 1901 user_did = client.did if hasattr(client, 'did') else None 1902 else: 1903 logger.error("Cannot access client session information") 1904 return None 1905 1906 if not access_token or not user_did: 1907 logger.error("Missing access token or DID from session") 1908 return None 1909 1910 # Get PDS URI from config instead of environment variables 1911 from config_loader import get_bluesky_config 1912 bluesky_config = get_bluesky_config() 1913 pds_host = bluesky_config['pds_uri'] 1914 1915 # Create memory record 1916 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 1917 memory_record = { 1918 "$type": "stream.thought.memory", 1919 "content": content, 1920 "createdAt": now 1921 } 1922 1923 # Add tags if provided (can be null) 1924 if tags is not None: 1925 memory_record["tags"] = tags 1926 1927 # Create the record 1928 headers = {"Authorization": f"Bearer {access_token}"} 1929 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 1930 1931 create_data = { 1932 "repo": user_did, 1933 "collection": "stream.thought.memory", 1934 "record": memory_record 1935 } 1936 1937 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 1938 response.raise_for_status() 1939 result = response.json() 1940 1941 tags_info = f" with {len(tags)} tags" if tags else " (no tags)" 1942 logger.debug(f"Successfully recorded memory (length: {len(content)} chars{tags_info})") 1943 return result 1944 1945 except Exception as e: 1946 logger.error(f"Error creating memory record: {e}") 1947 return None 1948 1949 1950def sync_followers(client: Client, dry_run: bool = False) -> Dict[str, Any]: 1951 """ 1952 Check who is following the bot and who the bot is following, 1953 then follow back users who aren't already followed. 1954 1955 This implements the autofollow feature by creating follow records 1956 (app.bsky.graph.follow) for users who follow the bot. 1957 1958 Args: 1959 client: Authenticated Bluesky client 1960 dry_run: If True, only report what would be done without actually following 1961 1962 Returns: 1963 Dict with stats: { 1964 'followers_count': int, 1965 'following_count': int, 1966 'to_follow': List[str], # List of handles to follow 1967 'newly_followed': List[str], # List of handles actually followed (empty if dry_run) 1968 'errors': List[str] # Any errors encountered 1969 } 1970 """ 1971 try: 1972 from datetime import datetime, timezone 1973 1974 # Get session info from the client 1975 access_token = None 1976 user_did = None 1977 1978 if hasattr(client, '_session') and client._session: 1979 access_token = client._session.access_jwt 1980 user_did = client._session.did 1981 elif hasattr(client, 'access_jwt'): 1982 access_token = client.access_jwt 1983 user_did = client.did if hasattr(client, 'did') else None 1984 else: 1985 logger.error("Cannot access client session information") 1986 return {'error': 'Cannot access client session'} 1987 1988 if not access_token or not user_did: 1989 logger.error("Missing access token or DID from session") 1990 return {'error': 'Missing access token or DID'} 1991 1992 # Get PDS URI from config 1993 from config_loader import get_bluesky_config 1994 bluesky_config = get_bluesky_config() 1995 pds_host = bluesky_config['pds_uri'] 1996 1997 # Get followers using the API 1998 followers_response = client.app.bsky.graph.get_followers({'actor': user_did}) 1999 followers = followers_response.followers if hasattr(followers_response, 'followers') else [] 2000 follower_dids = {f.did for f in followers} 2001 2002 # Get following using the API 2003 following_response = client.app.bsky.graph.get_follows({'actor': user_did}) 2004 following = following_response.follows if hasattr(following_response, 'follows') else [] 2005 following_dids = {f.did for f in following} 2006 2007 # Find users who follow us but we don't follow back 2008 to_follow_dids = follower_dids - following_dids 2009 2010 # Build result object 2011 result = { 2012 'followers_count': len(followers), 2013 'following_count': len(following), 2014 'to_follow': [], 2015 'newly_followed': [], 2016 'errors': [] 2017 } 2018 2019 # Get handles for users to follow 2020 to_follow_handles = [] 2021 for follower in followers: 2022 if follower.did in to_follow_dids: 2023 handle = follower.handle if hasattr(follower, 'handle') else follower.did 2024 to_follow_handles.append(handle) 2025 result['to_follow'].append(handle) 2026 2027 logger.info(f"Follower sync: {len(followers)} followers, {len(following)} following, {len(to_follow_dids)} to follow") 2028 2029 # If dry run, just return the stats 2030 if dry_run: 2031 logger.info(f"Dry run - would follow: {', '.join(to_follow_handles)}") 2032 return result 2033 2034 # Actually follow the users with rate limiting 2035 import requests 2036 headers = {"Authorization": f"Bearer {access_token}"} 2037 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 2038 2039 for i, did in enumerate(to_follow_dids): 2040 try: 2041 # Rate limiting: wait 2 seconds between follows to avoid spamming the server 2042 if i > 0: 2043 time.sleep(2) 2044 2045 # Create follow record 2046 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 2047 follow_record = { 2048 "$type": "app.bsky.graph.follow", 2049 "subject": did, 2050 "createdAt": now 2051 } 2052 2053 create_data = { 2054 "repo": user_did, 2055 "collection": "app.bsky.graph.follow", 2056 "record": follow_record 2057 } 2058 2059 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 2060 response.raise_for_status() 2061 2062 # Find handle for this DID 2063 handle = next((f.handle for f in followers if f.did == did), did) 2064 result['newly_followed'].append(handle) 2065 logger.info(f"Followed: {handle}") 2066 2067 except Exception as e: 2068 error_msg = f"Failed to follow {did}: {e}" 2069 logger.error(error_msg) 2070 result['errors'].append(error_msg) 2071 2072 return result 2073 2074 except Exception as e: 2075 logger.error(f"Error syncing followers: {e}") 2076 return {'error': str(e)} 2077 2078 2079if __name__ == "__main__": 2080 client = default_login() 2081 # do something with the client 2082 logger.info("Client is ready to use!")