a digital person for bluesky
42
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 455f7bbcdae8b7e5113d2a3c23f4cce87207642e 677 lines 24 kB view raw
1import os 2import logging 3from typing import Optional, Dict, Any, List 4from atproto_client import Client, Session, SessionEvent, models 5 6# Configure logging 7logging.basicConfig( 8 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 9) 10logger = logging.getLogger("bluesky_session_handler") 11 12# Load the environment variables 13import dotenv 14dotenv.load_dotenv(override=True) 15 16import yaml 17import json 18 19# Strip fields. A list of fields to remove from a JSON object 20STRIP_FIELDS = [ 21 "cid", 22 "rev", 23 "did", 24 "uri", 25 "langs", 26 "threadgate", 27 "py_type", 28 "labels", 29 "facets", 30 "avatar", 31 "viewer", 32 "indexed_at", 33 "tags", 34 "associated", 35 "thread_context", 36 "aspect_ratio", 37 "thumb", 38 "fullsize", 39 "root", 40 "created_at", 41 "verification", 42 "like_count", 43 "quote_count", 44 "reply_count", 45 "repost_count", 46 "embedding_disabled", 47 "thread_muted", 48 "reply_disabled", 49 "pinned", 50 "like", 51 "repost", 52 "blocked_by", 53 "blocking", 54 "blocking_by_list", 55 "followed_by", 56 "following", 57 "known_followers", 58 "muted", 59 "muted_by_list", 60 "root_author_like", 61 "entities", 62 "ref", 63 "mime_type", 64 "size", 65] 66def convert_to_basic_types(obj): 67 """Convert complex Python objects to basic types for JSON/YAML serialization.""" 68 if hasattr(obj, '__dict__'): 69 # Convert objects with __dict__ to their dictionary representation 70 return convert_to_basic_types(obj.__dict__) 71 elif isinstance(obj, dict): 72 return {key: convert_to_basic_types(value) for key, value in obj.items()} 73 elif isinstance(obj, list): 74 return [convert_to_basic_types(item) for item in obj] 75 elif isinstance(obj, (str, int, float, bool)) or obj is None: 76 return obj 77 else: 78 # For other types, try to convert to string 79 return str(obj) 80 81 82def strip_fields(obj, strip_field_list): 83 """Recursively strip fields from a JSON object.""" 84 if isinstance(obj, dict): 85 keys_flagged_for_removal = [] 86 87 # Remove fields from strip list and pydantic metadata 88 for field in list(obj.keys()): 89 if field in strip_field_list or field.startswith("__"): 90 keys_flagged_for_removal.append(field) 91 92 # Remove flagged keys 93 for key in keys_flagged_for_removal: 94 obj.pop(key, None) 95 96 # Recursively process remaining values 97 for key, value in list(obj.items()): 98 obj[key] = strip_fields(value, strip_field_list) 99 # Remove empty/null values after processing 100 if ( 101 obj[key] is None 102 or (isinstance(obj[key], dict) and len(obj[key]) == 0) 103 or (isinstance(obj[key], list) and len(obj[key]) == 0) 104 or (isinstance(obj[key], str) and obj[key].strip() == "") 105 ): 106 obj.pop(key, None) 107 108 elif isinstance(obj, list): 109 for i, value in enumerate(obj): 110 obj[i] = strip_fields(value, strip_field_list) 111 # Remove None values from list 112 obj[:] = [item for item in obj if item is not None] 113 114 return obj 115 116 117def flatten_thread_structure(thread_data): 118 """ 119 Flatten a nested thread structure into a list while preserving all data. 120 121 Args: 122 thread_data: The thread data from get_post_thread 123 124 Returns: 125 Dict with 'posts' key containing a list of posts in chronological order 126 """ 127 posts = [] 128 129 def traverse_thread(node): 130 """Recursively traverse the thread structure to collect posts.""" 131 if not node: 132 return 133 134 # If this node has a parent, traverse it first (to maintain chronological order) 135 if hasattr(node, 'parent') and node.parent: 136 traverse_thread(node.parent) 137 138 # Then add this node's post 139 if hasattr(node, 'post') and node.post: 140 # Convert to dict if needed to ensure we can process it 141 if hasattr(node.post, '__dict__'): 142 post_dict = node.post.__dict__.copy() 143 elif isinstance(node.post, dict): 144 post_dict = node.post.copy() 145 else: 146 post_dict = {} 147 148 posts.append(post_dict) 149 150 # Handle the thread structure 151 if hasattr(thread_data, 'thread'): 152 # Start from the main thread node 153 traverse_thread(thread_data.thread) 154 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__: 155 traverse_thread(thread_data.__dict__['thread']) 156 157 # Return a simple structure with posts list 158 return {'posts': posts} 159 160 161def thread_to_yaml_string(thread, strip_metadata=True): 162 """ 163 Convert thread data to a YAML-formatted string for LLM parsing. 164 165 Args: 166 thread: The thread data from get_post_thread 167 strip_metadata: Whether to strip metadata fields for cleaner output 168 169 Returns: 170 YAML-formatted string representation of the thread 171 """ 172 # First flatten the thread structure to avoid deep nesting 173 flattened = flatten_thread_structure(thread) 174 175 # Convert complex objects to basic types 176 basic_thread = convert_to_basic_types(flattened) 177 178 if strip_metadata: 179 # Create a copy and strip unwanted fields 180 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS) 181 else: 182 cleaned_thread = basic_thread 183 184 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False) 185 186 187 188 189 190 191 192def get_session(username: str) -> Optional[str]: 193 try: 194 with open(f"session_{username}.txt", encoding="UTF-8") as f: 195 return f.read() 196 except FileNotFoundError: 197 logger.debug(f"No existing session found for {username}") 198 return None 199 200def save_session(username: str, session_string: str) -> None: 201 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f: 202 f.write(session_string) 203 logger.debug(f"Session saved for {username}") 204 205def on_session_change(username: str, event: SessionEvent, session: Session) -> None: 206 logger.debug(f"Session changed: {event} {repr(session)}") 207 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 208 logger.debug(f"Saving changed session for {username}") 209 save_session(username, session.export()) 210 211def init_client(username: str, password: str) -> Client: 212 pds_uri = os.getenv("PDS_URI") 213 if pds_uri is None: 214 logger.warning( 215 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable." 216 ) 217 pds_uri = "https://bsky.social" 218 219 # Print the PDS URI 220 logger.debug(f"Using PDS URI: {pds_uri}") 221 222 client = Client(pds_uri) 223 client.on_session_change( 224 lambda event, session: on_session_change(username, event, session) 225 ) 226 227 session_string = get_session(username) 228 if session_string: 229 logger.debug(f"Reusing existing session for {username}") 230 client.login(session_string=session_string) 231 else: 232 logger.debug(f"Creating new session for {username}") 233 client.login(username, password) 234 235 return client 236 237 238def default_login() -> Client: 239 username = os.getenv("BSKY_USERNAME") 240 password = os.getenv("BSKY_PASSWORD") 241 242 if username is None: 243 logger.error( 244 "No username provided. Please provide a username using the BSKY_USERNAME environment variable." 245 ) 246 exit() 247 248 if password is None: 249 logger.error( 250 "No password provided. Please provide a password using the BSKY_PASSWORD environment variable." 251 ) 252 exit() 253 254 return init_client(username, password) 255 256def remove_outside_quotes(text: str) -> str: 257 """ 258 Remove outside double quotes from response text. 259 260 Only handles double quotes to avoid interfering with contractions: 261 - Double quotes: "text" → text 262 - Preserves single quotes and internal quotes 263 264 Args: 265 text: The text to process 266 267 Returns: 268 Text with outside double quotes removed 269 """ 270 if not text or len(text) < 2: 271 return text 272 273 text = text.strip() 274 275 # Only remove double quotes from start and end 276 if text.startswith('"') and text.endswith('"'): 277 return text[1:-1] 278 279 return text 280 281def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None) -> Dict[str, Any]: 282 """ 283 Reply to a post on Bluesky with rich text support. 284 285 Args: 286 client: Authenticated Bluesky client 287 text: The reply text 288 reply_to_uri: The URI of the post being replied to (parent) 289 reply_to_cid: The CID of the post being replied to (parent) 290 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri 291 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid 292 lang: Language code for the post (e.g., 'en-US', 'es', 'ja') 293 294 Returns: 295 The response from sending the post 296 """ 297 import re 298 299 # If root is not provided, this is a reply to the root post 300 if root_uri is None: 301 root_uri = reply_to_uri 302 root_cid = reply_to_cid 303 304 # Create references for the reply 305 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid)) 306 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid)) 307 308 # Parse rich text facets (mentions and URLs) 309 facets = [] 310 text_bytes = text.encode("UTF-8") 311 312 # Parse mentions - fixed to handle @ at start of text 313 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 314 315 for m in re.finditer(mention_regex, text_bytes): 316 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix 317 # Adjust byte positions to account for the optional prefix 318 mention_start = m.start(1) 319 mention_end = m.end(1) 320 try: 321 # Resolve handle to DID using the API 322 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle}) 323 if resolve_resp and hasattr(resolve_resp, 'did'): 324 facets.append( 325 models.AppBskyRichtextFacet.Main( 326 index=models.AppBskyRichtextFacet.ByteSlice( 327 byteStart=mention_start, 328 byteEnd=mention_end 329 ), 330 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)] 331 ) 332 ) 333 except Exception as e: 334 logger.debug(f"Failed to resolve handle {handle}: {e}") 335 continue 336 337 # Parse URLs - fixed to handle URLs at start of text 338 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 339 340 for m in re.finditer(url_regex, text_bytes): 341 url = m.group(1).decode("UTF-8") 342 # Adjust byte positions to account for the optional prefix 343 url_start = m.start(1) 344 url_end = m.end(1) 345 facets.append( 346 models.AppBskyRichtextFacet.Main( 347 index=models.AppBskyRichtextFacet.ByteSlice( 348 byteStart=url_start, 349 byteEnd=url_end 350 ), 351 features=[models.AppBskyRichtextFacet.Link(uri=url)] 352 ) 353 ) 354 355 # Send the reply with facets if any were found 356 if facets: 357 response = client.send_post( 358 text=text, 359 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 360 facets=facets, 361 langs=[lang] if lang else None 362 ) 363 else: 364 response = client.send_post( 365 text=text, 366 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 367 langs=[lang] if lang else None 368 ) 369 370 logger.info(f"Reply sent successfully: {response.uri}") 371 return response 372 373 374def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]: 375 """ 376 Get the thread containing a post to find root post information. 377 378 Args: 379 client: Authenticated Bluesky client 380 uri: The URI of the post 381 382 Returns: 383 The thread data or None if not found 384 """ 385 try: 386 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10}) 387 return thread 388 except Exception as e: 389 logger.error(f"Error fetching post thread: {e}") 390 return None 391 392 393def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US") -> Optional[Dict[str, Any]]: 394 """ 395 Reply to a notification (mention or reply). 396 397 Args: 398 client: Authenticated Bluesky client 399 notification: The notification object from list_notifications 400 reply_text: The text to reply with 401 lang: Language code for the post (defaults to "en-US") 402 403 Returns: 404 The response from sending the reply or None if failed 405 """ 406 try: 407 # Get the post URI and CID from the notification (handle both dict and object) 408 if isinstance(notification, dict): 409 post_uri = notification.get('uri') 410 post_cid = notification.get('cid') 411 # Check if the notification record has reply info with root 412 record = notification.get('record', {}) 413 reply_info = record.get('reply') if isinstance(record, dict) else None 414 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 415 post_uri = notification.uri 416 post_cid = notification.cid 417 # Check if the notification record has reply info with root 418 reply_info = None 419 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 420 reply_info = notification.record.reply 421 else: 422 post_uri = None 423 post_cid = None 424 reply_info = None 425 426 if not post_uri or not post_cid: 427 logger.error("Notification doesn't have required uri/cid fields") 428 return None 429 430 # Determine root: if post has reply info, use its root; otherwise this post IS the root 431 if reply_info: 432 # Extract root from the notification's reply structure 433 if isinstance(reply_info, dict): 434 root_ref = reply_info.get('root') 435 if root_ref and isinstance(root_ref, dict): 436 root_uri = root_ref.get('uri', post_uri) 437 root_cid = root_ref.get('cid', post_cid) 438 else: 439 # No root in reply info, use post as root 440 root_uri = post_uri 441 root_cid = post_cid 442 elif hasattr(reply_info, 'root'): 443 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 444 root_uri = reply_info.root.uri 445 root_cid = reply_info.root.cid 446 else: 447 root_uri = post_uri 448 root_cid = post_cid 449 else: 450 root_uri = post_uri 451 root_cid = post_cid 452 else: 453 # No reply info means this post IS the root 454 root_uri = post_uri 455 root_cid = post_cid 456 457 # Reply to the notification 458 return reply_to_post( 459 client=client, 460 text=reply_text, 461 reply_to_uri=post_uri, 462 reply_to_cid=post_cid, 463 root_uri=root_uri, 464 root_cid=root_cid, 465 lang=lang 466 ) 467 468 except Exception as e: 469 logger.error(f"Error replying to notification: {e}") 470 return None 471 472 473def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US") -> Optional[List[Dict[str, Any]]]: 474 """ 475 Reply to a notification with a threaded chain of messages (max 15). 476 477 Args: 478 client: Authenticated Bluesky client 479 notification: The notification object from list_notifications 480 reply_messages: List of reply texts (max 15 messages, each max 300 chars) 481 lang: Language code for the posts (defaults to "en-US") 482 483 Returns: 484 List of responses from sending the replies or None if failed 485 """ 486 try: 487 # Validate input 488 if not reply_messages or len(reply_messages) == 0: 489 logger.error("Reply messages list cannot be empty") 490 return None 491 if len(reply_messages) > 15: 492 logger.error(f"Cannot send more than 15 reply messages (got {len(reply_messages)})") 493 return None 494 495 # Get the post URI and CID from the notification (handle both dict and object) 496 if isinstance(notification, dict): 497 post_uri = notification.get('uri') 498 post_cid = notification.get('cid') 499 # Check if the notification record has reply info with root 500 record = notification.get('record', {}) 501 reply_info = record.get('reply') if isinstance(record, dict) else None 502 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 503 post_uri = notification.uri 504 post_cid = notification.cid 505 # Check if the notification record has reply info with root 506 reply_info = None 507 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'): 508 reply_info = notification.record.reply 509 else: 510 post_uri = None 511 post_cid = None 512 reply_info = None 513 514 if not post_uri or not post_cid: 515 logger.error("Notification doesn't have required uri/cid fields") 516 return None 517 518 # Determine root: if post has reply info, use its root; otherwise this post IS the root 519 if reply_info: 520 # Extract root from the notification's reply structure 521 if isinstance(reply_info, dict): 522 root_ref = reply_info.get('root') 523 if root_ref and isinstance(root_ref, dict): 524 root_uri = root_ref.get('uri', post_uri) 525 root_cid = root_ref.get('cid', post_cid) 526 else: 527 # No root in reply info, use post as root 528 root_uri = post_uri 529 root_cid = post_cid 530 elif hasattr(reply_info, 'root'): 531 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'): 532 root_uri = reply_info.root.uri 533 root_cid = reply_info.root.cid 534 else: 535 root_uri = post_uri 536 root_cid = post_cid 537 else: 538 root_uri = post_uri 539 root_cid = post_cid 540 else: 541 # No reply info means this post IS the root 542 root_uri = post_uri 543 root_cid = post_cid 544 545 # Send replies in sequence, creating a thread 546 responses = [] 547 current_parent_uri = post_uri 548 current_parent_cid = post_cid 549 550 for i, message in enumerate(reply_messages): 551 logger.info(f"Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...") 552 553 # Send this reply 554 response = reply_to_post( 555 client=client, 556 text=message, 557 reply_to_uri=current_parent_uri, 558 reply_to_cid=current_parent_cid, 559 root_uri=root_uri, 560 root_cid=root_cid, 561 lang=lang 562 ) 563 564 if not response: 565 logger.error(f"Failed to send reply {i+1}, posting system failure message") 566 # Try to post a system failure message 567 failure_response = reply_to_post( 568 client=client, 569 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]", 570 reply_to_uri=current_parent_uri, 571 reply_to_cid=current_parent_cid, 572 root_uri=root_uri, 573 root_cid=root_cid, 574 lang=lang 575 ) 576 if failure_response: 577 responses.append(failure_response) 578 current_parent_uri = failure_response.uri 579 current_parent_cid = failure_response.cid 580 else: 581 logger.error("Could not even send system failure message, stopping thread") 582 return responses if responses else None 583 else: 584 responses.append(response) 585 # Update parent references for next reply (if any) 586 if i < len(reply_messages) - 1: # Not the last message 587 current_parent_uri = response.uri 588 current_parent_cid = response.cid 589 590 logger.info(f"Successfully sent {len(responses)} threaded replies") 591 return responses 592 593 except Exception as e: 594 logger.error(f"Error sending threaded reply to notification: {e}") 595 return None 596 597 598def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]: 599 """ 600 Create a stream.thought.ack record to acknowledge a post. 601 602 This creates a custom acknowledgment record instead of a standard Bluesky like, 603 allowing void to track which posts it has engaged with. 604 605 Args: 606 client: Authenticated Bluesky client 607 post_uri: The URI of the post to acknowledge 608 post_cid: The CID of the post to acknowledge 609 note: Optional note to attach to the acknowledgment 610 611 Returns: 612 The response from creating the acknowledgment record or None if failed 613 """ 614 try: 615 import requests 616 from datetime import datetime, timezone 617 618 # Get session info from the client 619 # The atproto Client stores the session differently 620 access_token = None 621 user_did = None 622 623 # Try different ways to get the session info 624 if hasattr(client, '_session') and client._session: 625 access_token = client._session.access_jwt 626 user_did = client._session.did 627 elif hasattr(client, 'access_jwt'): 628 access_token = client.access_jwt 629 user_did = client.did if hasattr(client, 'did') else None 630 else: 631 logger.error("Cannot access client session information") 632 return None 633 634 if not access_token or not user_did: 635 logger.error("Missing access token or DID from session") 636 return None 637 638 pds_host = os.getenv("PDS_URI", "https://bsky.social") 639 640 # Create acknowledgment record with stream.thought.ack type 641 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") 642 ack_record = { 643 "$type": "stream.thought.ack", 644 "subject": { 645 "uri": post_uri, 646 "cid": post_cid 647 }, 648 "createdAt": now, 649 "note": note # Will be null if no note provided 650 } 651 652 # Create the record 653 headers = {"Authorization": f"Bearer {access_token}"} 654 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord" 655 656 create_data = { 657 "repo": user_did, 658 "collection": "stream.thought.ack", 659 "record": ack_record 660 } 661 662 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10) 663 response.raise_for_status() 664 result = response.json() 665 666 logger.info(f"Successfully acknowledged post: {post_uri}") 667 return result 668 669 except Exception as e: 670 logger.error(f"Error acknowledging post: {e}") 671 return None 672 673 674if __name__ == "__main__": 675 client = default_login() 676 # do something with the client 677 logger.info("Client is ready to use!")