a digital person for bluesky
1import os 2import logging 3from typing import Optional, Dict, Any, List 4from atproto_client import Client, Session, SessionEvent, models 5 6# Configure logging 7logging.basicConfig( 8 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 9) 10logger = logging.getLogger("bluesky_session_handler") 11 12# Load the environment variables 13import dotenv 14dotenv.load_dotenv(override=True) 15 16import yaml 17import json 18 19# Strip fields. A list of fields to remove from a JSON object 20STRIP_FIELDS = [ 21 "cid", 22 "rev", 23 "did", 24 "uri", 25 "langs", 26 "threadgate", 27 "py_type", 28 "labels", 29 "facets", 30 "avatar", 31 "viewer", 32 "indexed_at", 33 "tags", 34 "associated", 35 "thread_context", 36 "aspect_ratio", 37 "thumb", 38 "fullsize", 39 "root", 40 "created_at", 41 "verification", 42 "like_count", 43 "quote_count", 44 "reply_count", 45 "repost_count", 46 "embedding_disabled", 47 "thread_muted", 48 "reply_disabled", 49 "pinned", 50 "like", 51 "repost", 52 "blocked_by", 53 "blocking", 54 "blocking_by_list", 55 "followed_by", 56 "following", 57 "known_followers", 58 "muted", 59 "muted_by_list", 60 "root_author_like", 61 "entities", 62 "ref", 63 "mime_type", 64 "size", 65] 66def convert_to_basic_types(obj): 67 """Convert complex Python objects to basic types for JSON/YAML serialization.""" 68 if hasattr(obj, '__dict__'): 69 # Convert objects with __dict__ to their dictionary representation 70 return convert_to_basic_types(obj.__dict__) 71 elif isinstance(obj, dict): 72 return {key: convert_to_basic_types(value) for key, value in obj.items()} 73 elif isinstance(obj, list): 74 return [convert_to_basic_types(item) for item in obj] 75 elif isinstance(obj, (str, int, float, bool)) or obj is None: 76 return obj 77 else: 78 # For other types, try to convert to string 79 return str(obj) 80 81 82def strip_fields(obj, strip_field_list): 83 """Recursively strip fields from a JSON object.""" 84 if isinstance(obj, dict): 85 keys_flagged_for_removal = [] 86 87 # Remove fields from strip list and pydantic metadata 88 for field in list(obj.keys()): 89 if field in strip_field_list or field.startswith("__"): 90 keys_flagged_for_removal.append(field) 91 92 # Remove flagged keys 93 for key in keys_flagged_for_removal: 94 obj.pop(key, None) 95 96 # Recursively process remaining values 97 for key, value in list(obj.items()): 98 obj[key] = strip_fields(value, strip_field_list) 99 # Remove empty/null values after processing 100 if ( 101 obj[key] is None 102 or (isinstance(obj[key], dict) and len(obj[key]) == 0) 103 or (isinstance(obj[key], list) and len(obj[key]) == 0) 104 or (isinstance(obj[key], str) and obj[key].strip() == "") 105 ): 106 obj.pop(key, None) 107 108 elif isinstance(obj, list): 109 for i, value in enumerate(obj): 110 obj[i] = strip_fields(value, strip_field_list) 111 # Remove None values from list 112 obj[:] = [item for item in obj if item is not None] 113 114 return obj 115 116 117def flatten_thread_structure(thread_data): 118 """ 119 Flatten a nested thread structure into a list while preserving all data. 120 121 Args: 122 thread_data: The thread data from get_post_thread 123 124 Returns: 125 Dict with 'posts' key containing a list of posts in chronological order 126 """ 127 posts = [] 128 129 def traverse_thread(node): 130 """Recursively traverse the thread structure to collect posts.""" 131 if not node: 132 return 133 134 # If this node has a parent, traverse it first (to maintain chronological order) 135 if hasattr(node, 'parent') and node.parent: 136 traverse_thread(node.parent) 137 138 # Then add this node's post 139 if hasattr(node, 'post') and node.post: 140 # Convert to dict if needed to ensure we can process it 141 if hasattr(node.post, '__dict__'): 142 post_dict = node.post.__dict__.copy() 143 elif isinstance(node.post, dict): 144 post_dict = node.post.copy() 145 else: 146 post_dict = {} 147 148 posts.append(post_dict) 149 150 # Handle the thread structure 151 if hasattr(thread_data, 'thread'): 152 # Start from the main thread node 153 traverse_thread(thread_data.thread) 154 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__: 155 traverse_thread(thread_data.__dict__['thread']) 156 157 # Return a simple structure with posts list 158 return {'posts': posts} 159 160 161def thread_to_yaml_string(thread, strip_metadata=True): 162 """ 163 Convert thread data to a YAML-formatted string for LLM parsing. 164 165 Args: 166 thread: The thread data from get_post_thread 167 strip_metadata: Whether to strip metadata fields for cleaner output 168 169 Returns: 170 YAML-formatted string representation of the thread 171 """ 172 # First flatten the thread structure to avoid deep nesting 173 flattened = flatten_thread_structure(thread) 174 175 # Convert complex objects to basic types 176 basic_thread = convert_to_basic_types(flattened) 177 178 if strip_metadata: 179 # Create a copy and strip unwanted fields 180 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS) 181 else: 182 cleaned_thread = basic_thread 183 184 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False) 185 186 187 188 189 190 191 192def get_session(username: str) -> Optional[str]: 193 try: 194 with open(f"session_{username}.txt", encoding="UTF-8") as f: 195 return f.read() 196 except FileNotFoundError: 197 logger.debug(f"No existing session found for {username}") 198 return None 199 200def save_session(username: str, session_string: str) -> None: 201 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f: 202 f.write(session_string) 203 logger.debug(f"Session saved for {username}") 204 205def on_session_change(username: str, event: SessionEvent, session: Session) -> None: 206 logger.debug(f"Session changed: {event} {repr(session)}") 207 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 208 logger.debug(f"Saving changed session for {username}") 209 save_session(username, session.export()) 210 211def init_client(username: str, password: str) -> Client: 212 pds_uri = os.getenv("PDS_URI") 213 if pds_uri is None: 214 logger.warning( 215 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable." 216 ) 217 pds_uri = "https://bsky.social" 218 219 # Print the PDS URI 220 logger.debug(f"Using PDS URI: {pds_uri}") 221 222 client = Client(pds_uri) 223 client.on_session_change( 224 lambda event, session: on_session_change(username, event, session) 225 ) 226 227 session_string = get_session(username) 228 if session_string: 229 logger.debug(f"Reusing existing session for {username}") 230 client.login(session_string=session_string) 231 else: 232 logger.debug(f"Creating new session for {username}") 233 client.login(username, password) 234 235 return client 236 237 238def default_login() -> Client: 239 username = os.getenv("BSKY_USERNAME") 240 password = os.getenv("BSKY_PASSWORD") 241 242 if username is None: 243 logger.error( 244 "No username provided. Please provide a username using the BSKY_USERNAME environment variable." 245 ) 246 exit() 247 248 if password is None: 249 logger.error( 250 "No password provided. Please provide a password using the BSKY_PASSWORD environment variable." 251 ) 252 exit() 253 254 return init_client(username, password) 255 256def remove_outside_quotes(text: str) -> str: 257 """ 258 Remove outside double quotes from response text. 259 260 Only handles double quotes to avoid interfering with contractions: 261 - Double quotes: "text" → text 262 - Preserves single quotes and internal quotes 263 264 Args: 265 text: The text to process 266 267 Returns: 268 Text with outside double quotes removed 269 """ 270 if not text or len(text) < 2: 271 return text 272 273 text = text.strip() 274 275 # Only remove double quotes from start and end 276 if text.startswith('"') and text.endswith('"'): 277 return text[1:-1] 278 279 return text 280 281def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None) -> Dict[str, Any]: 282 """ 283 Reply to a post on Bluesky with rich text support. 284 285 Args: 286 client: Authenticated Bluesky client 287 text: The reply text 288 reply_to_uri: The URI of the post being replied to (parent) 289 reply_to_cid: The CID of the post being replied to (parent) 290 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri 291 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid 292 lang: Language code for the post (e.g., 'en-US', 'es', 'ja') 293 294 Returns: 295 The response from sending the post 296 """ 297 import re 298 299 # If root is not provided, this is a reply to the root post 300 if root_uri is None: 301 root_uri = reply_to_uri 302 root_cid = reply_to_cid 303 304 # Create references for the reply 305 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid)) 306 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid)) 307 308 # Parse rich text facets (mentions and URLs) 309 facets = [] 310 text_bytes = text.encode("UTF-8") 311 312 # Parse mentions - fixed to handle @ at start of text 313 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 314 315 for m in re.finditer(mention_regex, text_bytes): 316 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix 317 # Adjust byte positions to account for the optional prefix 318 mention_start = m.start(1) 319 mention_end = m.end(1) 320 try: 321 # Resolve handle to DID using the API 322 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle}) 323 if resolve_resp and hasattr(resolve_resp, 'did'): 324 facets.append( 325 models.AppBskyRichtextFacet.Main( 326 index=models.AppBskyRichtextFacet.ByteSlice( 327 byteStart=mention_start, 328 byteEnd=mention_end 329 ), 330 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)] 331 ) 332 ) 333 except Exception as e: 334 logger.debug(f"Failed to resolve handle {handle}: {e}") 335 continue 336 337 # Parse URLs - fixed to handle URLs at start of text 338 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 339 340 for m in re.finditer(url_regex, text_bytes): 341 url = m.group(1).decode("UTF-8") 342 # Adjust byte positions to account for the optional prefix 343 url_start = m.start(1) 344 url_end = m.end(1) 345 facets.append( 346 models.AppBskyRichtextFacet.Main( 347 index=models.AppBskyRichtextFacet.ByteSlice( 348 byteStart=url_start, 349 byteEnd=url_end 350 ), 351 features=[models.AppBskyRichtextFacet.Link(uri=url)] 352 ) 353 ) 354 355 # Send the reply with facets if any were found 356 if facets: 357 response = client.send_post( 358 text=text, 359 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 360 facets=facets, 361 langs=[lang] if lang else None 362 ) 363 else: 364 response = client.send_post( 365 text=text, 366 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 367 langs=[lang] if lang else None 368 ) 369 370 logger.info(f"Reply sent successfully: {response.uri}") 371 return response 372 373 374def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]: 375 """ 376 Get the thread containing a post to find root post information. 377 378 Args: 379 client: Authenticated Bluesky client 380 uri: The URI of the post 381 382 Returns: 383 The thread data or None if not found 384 """ 385 try: 386 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10}) 387 return thread 388 except Exception as e: 389 logger.error(f"Error fetching post thread: {e}") 390 return None 391 392 393def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US") -> Optional[Dict[str, Any]]: 394 """ 395 Reply to a notification (mention or reply). 396 397 Args: 398 client: Authenticated Bluesky client 399 notification: The notification object from list_notifications 400 reply_text: The text to reply with 401 lang: Language code for the post (defaults to "en-US") 402 403 Returns: 404 The response from sending the reply or None if failed 405 """ 406 try: 407 # Get the post URI and CID from the notification (handle both dict and object) 408 if isinstance(notification, dict): 409 post_uri = notification.get('uri') 410 post_cid = notification.get('cid') 411 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 412 post_uri = notification.uri 413 post_cid = notification.cid 414 else: 415 post_uri = None 416 post_cid = None 417 418 if not post_uri or not post_cid: 419 logger.error("Notification doesn't have required uri/cid fields") 420 return None 421 422 # Get the thread to find the root post 423 thread_data = get_post_thread(client, post_uri) 424 425 if thread_data and hasattr(thread_data, 'thread'): 426 thread = thread_data.thread 427 428 # Find root post 429 root_uri = post_uri 430 root_cid = post_cid 431 432 # If this has a parent, find the root 433 if hasattr(thread, 'parent') and thread.parent: 434 # Keep going up until we find the root 435 current = thread 436 while hasattr(current, 'parent') and current.parent: 437 current = current.parent 438 if hasattr(current, 'post') and hasattr(current.post, 'uri') and hasattr(current.post, 'cid'): 439 root_uri = current.post.uri 440 root_cid = current.post.cid 441 442 # Reply to the notification 443 return reply_to_post( 444 client=client, 445 text=reply_text, 446 reply_to_uri=post_uri, 447 reply_to_cid=post_cid, 448 root_uri=root_uri, 449 root_cid=root_cid, 450 lang=lang 451 ) 452 else: 453 # If we can't get thread data, just reply directly 454 return reply_to_post( 455 client=client, 456 text=reply_text, 457 reply_to_uri=post_uri, 458 reply_to_cid=post_cid, 459 lang=lang 460 ) 461 462 except Exception as e: 463 logger.error(f"Error replying to notification: {e}") 464 return None 465 466 467def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US") -> Optional[List[Dict[str, Any]]]: 468 """ 469 Reply to a notification with a threaded chain of messages (max 15). 470 471 Args: 472 client: Authenticated Bluesky client 473 notification: The notification object from list_notifications 474 reply_messages: List of reply texts (max 15 messages, each max 300 chars) 475 lang: Language code for the posts (defaults to "en-US") 476 477 Returns: 478 List of responses from sending the replies or None if failed 479 """ 480 try: 481 # Validate input 482 if not reply_messages or len(reply_messages) == 0: 483 logger.error("Reply messages list cannot be empty") 484 return None 485 if len(reply_messages) > 15: 486 logger.error(f"Cannot send more than 15 reply messages (got {len(reply_messages)})") 487 return None 488 489 # Get the post URI and CID from the notification (handle both dict and object) 490 if isinstance(notification, dict): 491 post_uri = notification.get('uri') 492 post_cid = notification.get('cid') 493 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 494 post_uri = notification.uri 495 post_cid = notification.cid 496 else: 497 post_uri = None 498 post_cid = None 499 500 if not post_uri or not post_cid: 501 logger.error("Notification doesn't have required uri/cid fields") 502 return None 503 504 # Get the thread to find the root post 505 thread_data = get_post_thread(client, post_uri) 506 507 root_uri = post_uri 508 root_cid = post_cid 509 510 if thread_data and hasattr(thread_data, 'thread'): 511 thread = thread_data.thread 512 # If this has a parent, find the root 513 if hasattr(thread, 'parent') and thread.parent: 514 # Keep going up until we find the root 515 current = thread 516 while hasattr(current, 'parent') and current.parent: 517 current = current.parent 518 if hasattr(current, 'post') and hasattr(current.post, 'uri') and hasattr(current.post, 'cid'): 519 root_uri = current.post.uri 520 root_cid = current.post.cid 521 522 # Send replies in sequence, creating a thread 523 responses = [] 524 current_parent_uri = post_uri 525 current_parent_cid = post_cid 526 527 for i, message in enumerate(reply_messages): 528 logger.info(f"Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...") 529 530 # Send this reply 531 response = reply_to_post( 532 client=client, 533 text=message, 534 reply_to_uri=current_parent_uri, 535 reply_to_cid=current_parent_cid, 536 root_uri=root_uri, 537 root_cid=root_cid, 538 lang=lang 539 ) 540 541 if not response: 542 logger.error(f"Failed to send reply {i+1}, posting system failure message") 543 # Try to post a system failure message 544 failure_response = reply_to_post( 545 client=client, 546 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]", 547 reply_to_uri=current_parent_uri, 548 reply_to_cid=current_parent_cid, 549 root_uri=root_uri, 550 root_cid=root_cid, 551 lang=lang 552 ) 553 if failure_response: 554 responses.append(failure_response) 555 current_parent_uri = failure_response.uri 556 current_parent_cid = failure_response.cid 557 else: 558 logger.error("Could not even send system failure message, stopping thread") 559 return responses if responses else None 560 else: 561 responses.append(response) 562 # Update parent references for next reply (if any) 563 if i < len(reply_messages) - 1: # Not the last message 564 current_parent_uri = response.uri 565 current_parent_cid = response.cid 566 567 logger.info(f"Successfully sent {len(responses)} threaded replies") 568 return responses 569 570 except Exception as e: 571 logger.error(f"Error sending threaded reply to notification: {e}") 572 return None 573 574 575if __name__ == "__main__": 576 client = default_login() 577 # do something with the client 578 logger.info("Client is ready to use!")