a digital person for bluesky
42
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 6098130687ebe189b37e30ed8d7b23bd14075761 578 lines 20 kB view raw
1import os 2import logging 3from typing import Optional, Dict, Any, List 4from atproto_client import Client, Session, SessionEvent, models 5 6# Configure logging 7logging.basicConfig( 8 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 9) 10logger = logging.getLogger("bluesky_session_handler") 11 12# Load the environment variables 13import dotenv 14dotenv.load_dotenv(override=True) 15 16import yaml 17import json 18 19# Strip fields. A list of fields to remove from a JSON object 20STRIP_FIELDS = [ 21 "cid", 22 "rev", 23 "did", 24 "uri", 25 "langs", 26 "threadgate", 27 "py_type", 28 "labels", 29 "facets", 30 "avatar", 31 "viewer", 32 "indexed_at", 33 "tags", 34 "associated", 35 "thread_context", 36 "aspect_ratio", 37 "thumb", 38 "fullsize", 39 "root", 40 "created_at", 41 "verification", 42 "like_count", 43 "quote_count", 44 "reply_count", 45 "repost_count", 46 "embedding_disabled", 47 "thread_muted", 48 "reply_disabled", 49 "pinned", 50 "like", 51 "repost", 52 "blocked_by", 53 "blocking", 54 "blocking_by_list", 55 "followed_by", 56 "following", 57 "known_followers", 58 "muted", 59 "muted_by_list", 60 "root_author_like", 61 "entities", 62 "ref", 63 "mime_type", 64 "size", 65] 66def convert_to_basic_types(obj): 67 """Convert complex Python objects to basic types for JSON/YAML serialization.""" 68 if hasattr(obj, '__dict__'): 69 # Convert objects with __dict__ to their dictionary representation 70 return convert_to_basic_types(obj.__dict__) 71 elif isinstance(obj, dict): 72 return {key: convert_to_basic_types(value) for key, value in obj.items()} 73 elif isinstance(obj, list): 74 return [convert_to_basic_types(item) for item in obj] 75 elif isinstance(obj, (str, int, float, bool)) or obj is None: 76 return obj 77 else: 78 # For other types, try to convert to string 79 return str(obj) 80 81 82def strip_fields(obj, strip_field_list): 83 """Recursively strip fields from a JSON object.""" 84 if isinstance(obj, dict): 85 keys_flagged_for_removal = [] 86 87 # Remove fields from strip list and pydantic metadata 88 for field in list(obj.keys()): 89 if field in strip_field_list or field.startswith("__"): 90 keys_flagged_for_removal.append(field) 91 92 # Remove flagged keys 93 for key in keys_flagged_for_removal: 94 obj.pop(key, None) 95 96 # Recursively process remaining values 97 for key, value in list(obj.items()): 98 obj[key] = strip_fields(value, strip_field_list) 99 # Remove empty/null values after processing 100 if ( 101 obj[key] is None 102 or (isinstance(obj[key], dict) and len(obj[key]) == 0) 103 or (isinstance(obj[key], list) and len(obj[key]) == 0) 104 or (isinstance(obj[key], str) and obj[key].strip() == "") 105 ): 106 obj.pop(key, None) 107 108 elif isinstance(obj, list): 109 for i, value in enumerate(obj): 110 obj[i] = strip_fields(value, strip_field_list) 111 # Remove None values from list 112 obj[:] = [item for item in obj if item is not None] 113 114 return obj 115 116 117def flatten_thread_structure(thread_data): 118 """ 119 Flatten a nested thread structure into a list while preserving all data. 120 121 Args: 122 thread_data: The thread data from get_post_thread 123 124 Returns: 125 Dict with 'posts' key containing a list of posts in chronological order 126 """ 127 posts = [] 128 129 def traverse_thread(node): 130 """Recursively traverse the thread structure to collect posts.""" 131 if not node: 132 return 133 134 # If this node has a parent, traverse it first (to maintain chronological order) 135 if hasattr(node, 'parent') and node.parent: 136 traverse_thread(node.parent) 137 138 # Then add this node's post 139 if hasattr(node, 'post') and node.post: 140 # Convert to dict if needed to ensure we can process it 141 if hasattr(node.post, '__dict__'): 142 post_dict = node.post.__dict__.copy() 143 elif isinstance(node.post, dict): 144 post_dict = node.post.copy() 145 else: 146 post_dict = {} 147 148 posts.append(post_dict) 149 150 # Handle the thread structure 151 if hasattr(thread_data, 'thread'): 152 # Start from the main thread node 153 traverse_thread(thread_data.thread) 154 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__: 155 traverse_thread(thread_data.__dict__['thread']) 156 157 # Return a simple structure with posts list 158 return {'posts': posts} 159 160 161def thread_to_yaml_string(thread, strip_metadata=True): 162 """ 163 Convert thread data to a YAML-formatted string for LLM parsing. 164 165 Args: 166 thread: The thread data from get_post_thread 167 strip_metadata: Whether to strip metadata fields for cleaner output 168 169 Returns: 170 YAML-formatted string representation of the thread 171 """ 172 # First flatten the thread structure to avoid deep nesting 173 flattened = flatten_thread_structure(thread) 174 175 # Convert complex objects to basic types 176 basic_thread = convert_to_basic_types(flattened) 177 178 if strip_metadata: 179 # Create a copy and strip unwanted fields 180 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS) 181 else: 182 cleaned_thread = basic_thread 183 184 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False) 185 186 187 188 189 190 191 192def get_session(username: str) -> Optional[str]: 193 try: 194 with open(f"session_{username}.txt", encoding="UTF-8") as f: 195 return f.read() 196 except FileNotFoundError: 197 logger.debug(f"No existing session found for {username}") 198 return None 199 200def save_session(username: str, session_string: str) -> None: 201 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f: 202 f.write(session_string) 203 logger.debug(f"Session saved for {username}") 204 205def on_session_change(username: str, event: SessionEvent, session: Session) -> None: 206 logger.debug(f"Session changed: {event} {repr(session)}") 207 if event in (SessionEvent.CREATE, SessionEvent.REFRESH): 208 logger.debug(f"Saving changed session for {username}") 209 save_session(username, session.export()) 210 211def init_client(username: str, password: str) -> Client: 212 pds_uri = os.getenv("PDS_URI") 213 if pds_uri is None: 214 logger.warning( 215 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable." 216 ) 217 pds_uri = "https://bsky.social" 218 219 # Print the PDS URI 220 logger.debug(f"Using PDS URI: {pds_uri}") 221 222 client = Client(pds_uri) 223 client.on_session_change( 224 lambda event, session: on_session_change(username, event, session) 225 ) 226 227 session_string = get_session(username) 228 if session_string: 229 logger.debug(f"Reusing existing session for {username}") 230 client.login(session_string=session_string) 231 else: 232 logger.debug(f"Creating new session for {username}") 233 client.login(username, password) 234 235 return client 236 237 238def default_login() -> Client: 239 username = os.getenv("BSKY_USERNAME") 240 password = os.getenv("BSKY_PASSWORD") 241 242 if username is None: 243 logger.error( 244 "No username provided. Please provide a username using the BSKY_USERNAME environment variable." 245 ) 246 exit() 247 248 if password is None: 249 logger.error( 250 "No password provided. Please provide a password using the BSKY_PASSWORD environment variable." 251 ) 252 exit() 253 254 return init_client(username, password) 255 256def remove_outside_quotes(text: str) -> str: 257 """ 258 Remove outside double quotes from response text. 259 260 Only handles double quotes to avoid interfering with contractions: 261 - Double quotes: "text" → text 262 - Preserves single quotes and internal quotes 263 264 Args: 265 text: The text to process 266 267 Returns: 268 Text with outside double quotes removed 269 """ 270 if not text or len(text) < 2: 271 return text 272 273 text = text.strip() 274 275 # Only remove double quotes from start and end 276 if text.startswith('"') and text.endswith('"'): 277 return text[1:-1] 278 279 return text 280 281def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None) -> Dict[str, Any]: 282 """ 283 Reply to a post on Bluesky with rich text support. 284 285 Args: 286 client: Authenticated Bluesky client 287 text: The reply text 288 reply_to_uri: The URI of the post being replied to (parent) 289 reply_to_cid: The CID of the post being replied to (parent) 290 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri 291 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid 292 lang: Language code for the post (e.g., 'en-US', 'es', 'ja') 293 294 Returns: 295 The response from sending the post 296 """ 297 import re 298 299 # If root is not provided, this is a reply to the root post 300 if root_uri is None: 301 root_uri = reply_to_uri 302 root_cid = reply_to_cid 303 304 # Create references for the reply 305 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid)) 306 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid)) 307 308 # Parse rich text facets (mentions and URLs) 309 facets = [] 310 text_bytes = text.encode("UTF-8") 311 312 # Parse mentions - fixed to handle @ at start of text 313 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" 314 315 for m in re.finditer(mention_regex, text_bytes): 316 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix 317 # Adjust byte positions to account for the optional prefix 318 mention_start = m.start(1) 319 mention_end = m.end(1) 320 try: 321 # Resolve handle to DID using the API 322 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle}) 323 if resolve_resp and hasattr(resolve_resp, 'did'): 324 facets.append( 325 models.AppBskyRichtextFacet.Main( 326 index=models.AppBskyRichtextFacet.ByteSlice( 327 byteStart=mention_start, 328 byteEnd=mention_end 329 ), 330 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)] 331 ) 332 ) 333 except Exception as e: 334 logger.debug(f"Failed to resolve handle {handle}: {e}") 335 continue 336 337 # Parse URLs - fixed to handle URLs at start of text 338 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" 339 340 for m in re.finditer(url_regex, text_bytes): 341 url = m.group(1).decode("UTF-8") 342 # Adjust byte positions to account for the optional prefix 343 url_start = m.start(1) 344 url_end = m.end(1) 345 facets.append( 346 models.AppBskyRichtextFacet.Main( 347 index=models.AppBskyRichtextFacet.ByteSlice( 348 byteStart=url_start, 349 byteEnd=url_end 350 ), 351 features=[models.AppBskyRichtextFacet.Link(uri=url)] 352 ) 353 ) 354 355 # Send the reply with facets if any were found 356 if facets: 357 response = client.send_post( 358 text=text, 359 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 360 facets=facets, 361 langs=[lang] if lang else None 362 ) 363 else: 364 response = client.send_post( 365 text=text, 366 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), 367 langs=[lang] if lang else None 368 ) 369 370 logger.info(f"Reply sent successfully: {response.uri}") 371 return response 372 373 374def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]: 375 """ 376 Get the thread containing a post to find root post information. 377 378 Args: 379 client: Authenticated Bluesky client 380 uri: The URI of the post 381 382 Returns: 383 The thread data or None if not found 384 """ 385 try: 386 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10}) 387 return thread 388 except Exception as e: 389 logger.error(f"Error fetching post thread: {e}") 390 return None 391 392 393def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US") -> Optional[Dict[str, Any]]: 394 """ 395 Reply to a notification (mention or reply). 396 397 Args: 398 client: Authenticated Bluesky client 399 notification: The notification object from list_notifications 400 reply_text: The text to reply with 401 lang: Language code for the post (defaults to "en-US") 402 403 Returns: 404 The response from sending the reply or None if failed 405 """ 406 try: 407 # Get the post URI and CID from the notification (handle both dict and object) 408 if isinstance(notification, dict): 409 post_uri = notification.get('uri') 410 post_cid = notification.get('cid') 411 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 412 post_uri = notification.uri 413 post_cid = notification.cid 414 else: 415 post_uri = None 416 post_cid = None 417 418 if not post_uri or not post_cid: 419 logger.error("Notification doesn't have required uri/cid fields") 420 return None 421 422 # Get the thread to find the root post 423 thread_data = get_post_thread(client, post_uri) 424 425 if thread_data and hasattr(thread_data, 'thread'): 426 thread = thread_data.thread 427 428 # Find root post 429 root_uri = post_uri 430 root_cid = post_cid 431 432 # If this has a parent, find the root 433 if hasattr(thread, 'parent') and thread.parent: 434 # Keep going up until we find the root 435 current = thread 436 while hasattr(current, 'parent') and current.parent: 437 current = current.parent 438 if hasattr(current, 'post') and hasattr(current.post, 'uri') and hasattr(current.post, 'cid'): 439 root_uri = current.post.uri 440 root_cid = current.post.cid 441 442 # Reply to the notification 443 return reply_to_post( 444 client=client, 445 text=reply_text, 446 reply_to_uri=post_uri, 447 reply_to_cid=post_cid, 448 root_uri=root_uri, 449 root_cid=root_cid, 450 lang=lang 451 ) 452 else: 453 # If we can't get thread data, just reply directly 454 return reply_to_post( 455 client=client, 456 text=reply_text, 457 reply_to_uri=post_uri, 458 reply_to_cid=post_cid, 459 lang=lang 460 ) 461 462 except Exception as e: 463 logger.error(f"Error replying to notification: {e}") 464 return None 465 466 467def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US") -> Optional[List[Dict[str, Any]]]: 468 """ 469 Reply to a notification with a threaded chain of messages (max 4). 470 471 Args: 472 client: Authenticated Bluesky client 473 notification: The notification object from list_notifications 474 reply_messages: List of reply texts (max 4 messages, each max 300 chars) 475 lang: Language code for the posts (defaults to "en-US") 476 477 Returns: 478 List of responses from sending the replies or None if failed 479 """ 480 try: 481 # Validate input 482 if not reply_messages or len(reply_messages) == 0: 483 logger.error("Reply messages list cannot be empty") 484 return None 485 if len(reply_messages) > 4: 486 logger.error(f"Cannot send more than 4 reply messages (got {len(reply_messages)})") 487 return None 488 489 # Get the post URI and CID from the notification (handle both dict and object) 490 if isinstance(notification, dict): 491 post_uri = notification.get('uri') 492 post_cid = notification.get('cid') 493 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): 494 post_uri = notification.uri 495 post_cid = notification.cid 496 else: 497 post_uri = None 498 post_cid = None 499 500 if not post_uri or not post_cid: 501 logger.error("Notification doesn't have required uri/cid fields") 502 return None 503 504 # Get the thread to find the root post 505 thread_data = get_post_thread(client, post_uri) 506 507 root_uri = post_uri 508 root_cid = post_cid 509 510 if thread_data and hasattr(thread_data, 'thread'): 511 thread = thread_data.thread 512 # If this has a parent, find the root 513 if hasattr(thread, 'parent') and thread.parent: 514 # Keep going up until we find the root 515 current = thread 516 while hasattr(current, 'parent') and current.parent: 517 current = current.parent 518 if hasattr(current, 'post') and hasattr(current.post, 'uri') and hasattr(current.post, 'cid'): 519 root_uri = current.post.uri 520 root_cid = current.post.cid 521 522 # Send replies in sequence, creating a thread 523 responses = [] 524 current_parent_uri = post_uri 525 current_parent_cid = post_cid 526 527 for i, message in enumerate(reply_messages): 528 logger.info(f"Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...") 529 530 # Send this reply 531 response = reply_to_post( 532 client=client, 533 text=message, 534 reply_to_uri=current_parent_uri, 535 reply_to_cid=current_parent_cid, 536 root_uri=root_uri, 537 root_cid=root_cid, 538 lang=lang 539 ) 540 541 if not response: 542 logger.error(f"Failed to send reply {i+1}, posting system failure message") 543 # Try to post a system failure message 544 failure_response = reply_to_post( 545 client=client, 546 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]", 547 reply_to_uri=current_parent_uri, 548 reply_to_cid=current_parent_cid, 549 root_uri=root_uri, 550 root_cid=root_cid, 551 lang=lang 552 ) 553 if failure_response: 554 responses.append(failure_response) 555 current_parent_uri = failure_response.uri 556 current_parent_cid = failure_response.cid 557 else: 558 logger.error("Could not even send system failure message, stopping thread") 559 return responses if responses else None 560 else: 561 responses.append(response) 562 # Update parent references for next reply (if any) 563 if i < len(reply_messages) - 1: # Not the last message 564 current_parent_uri = response.uri 565 current_parent_cid = response.cid 566 567 logger.info(f"Successfully sent {len(responses)} threaded replies") 568 return responses 569 570 except Exception as e: 571 logger.error(f"Error sending threaded reply to notification: {e}") 572 return None 573 574 575if __name__ == "__main__": 576 client = default_login() 577 # do something with the client 578 logger.info("Client is ready to use!")