a digital person for bluesky
1import os
2import logging
3from typing import Optional, Dict, Any, List
4from atproto_client import Client, Session, SessionEvent, models
5
6# Configure logging
7logging.basicConfig(
8 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
9)
10logger = logging.getLogger("bluesky_session_handler")
11
12# Load the environment variables
13import dotenv
14dotenv.load_dotenv(override=True)
15
16import yaml
17import json
18
19# Strip fields. A list of fields to remove from a JSON object
20STRIP_FIELDS = [
21 "cid",
22 "rev",
23 "did",
24 "uri",
25 "langs",
26 "threadgate",
27 "py_type",
28 "labels",
29 "facets",
30 "avatar",
31 "viewer",
32 "indexed_at",
33 "tags",
34 "associated",
35 "thread_context",
36 "aspect_ratio",
37 "thumb",
38 "fullsize",
39 "root",
40 "created_at",
41 "verification",
42 "like_count",
43 "quote_count",
44 "reply_count",
45 "repost_count",
46 "embedding_disabled",
47 "thread_muted",
48 "reply_disabled",
49 "pinned",
50 "like",
51 "repost",
52 "blocked_by",
53 "blocking",
54 "blocking_by_list",
55 "followed_by",
56 "following",
57 "known_followers",
58 "muted",
59 "muted_by_list",
60 "root_author_like",
61 "entities",
62 "ref",
63 "mime_type",
64 "size",
65]
66def convert_to_basic_types(obj):
67 """Convert complex Python objects to basic types for JSON/YAML serialization."""
68 if hasattr(obj, '__dict__'):
69 # Convert objects with __dict__ to their dictionary representation
70 return convert_to_basic_types(obj.__dict__)
71 elif isinstance(obj, dict):
72 return {key: convert_to_basic_types(value) for key, value in obj.items()}
73 elif isinstance(obj, list):
74 return [convert_to_basic_types(item) for item in obj]
75 elif isinstance(obj, (str, int, float, bool)) or obj is None:
76 return obj
77 else:
78 # For other types, try to convert to string
79 return str(obj)
80
81
82def strip_fields(obj, strip_field_list):
83 """Recursively strip fields from a JSON object."""
84 if isinstance(obj, dict):
85 keys_flagged_for_removal = []
86
87 # Remove fields from strip list and pydantic metadata
88 for field in list(obj.keys()):
89 if field in strip_field_list or field.startswith("__"):
90 keys_flagged_for_removal.append(field)
91
92 # Remove flagged keys
93 for key in keys_flagged_for_removal:
94 obj.pop(key, None)
95
96 # Recursively process remaining values
97 for key, value in list(obj.items()):
98 obj[key] = strip_fields(value, strip_field_list)
99 # Remove empty/null values after processing
100 if (
101 obj[key] is None
102 or (isinstance(obj[key], dict) and len(obj[key]) == 0)
103 or (isinstance(obj[key], list) and len(obj[key]) == 0)
104 or (isinstance(obj[key], str) and obj[key].strip() == "")
105 ):
106 obj.pop(key, None)
107
108 elif isinstance(obj, list):
109 for i, value in enumerate(obj):
110 obj[i] = strip_fields(value, strip_field_list)
111 # Remove None values from list
112 obj[:] = [item for item in obj if item is not None]
113
114 return obj
115
116
117def flatten_thread_structure(thread_data):
118 """
119 Flatten a nested thread structure into a list while preserving all data.
120
121 Args:
122 thread_data: The thread data from get_post_thread
123
124 Returns:
125 Dict with 'posts' key containing a list of posts in chronological order
126 """
127 posts = []
128
129 def traverse_thread(node):
130 """Recursively traverse the thread structure to collect posts."""
131 if not node:
132 return
133
134 # If this node has a parent, traverse it first (to maintain chronological order)
135 if hasattr(node, 'parent') and node.parent:
136 traverse_thread(node.parent)
137
138 # Then add this node's post
139 if hasattr(node, 'post') and node.post:
140 # Convert to dict if needed to ensure we can process it
141 if hasattr(node.post, '__dict__'):
142 post_dict = node.post.__dict__.copy()
143 elif isinstance(node.post, dict):
144 post_dict = node.post.copy()
145 else:
146 post_dict = {}
147
148 posts.append(post_dict)
149
150 # Handle the thread structure
151 if hasattr(thread_data, 'thread'):
152 # Start from the main thread node
153 traverse_thread(thread_data.thread)
154 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__:
155 traverse_thread(thread_data.__dict__['thread'])
156
157 # Return a simple structure with posts list
158 return {'posts': posts}
159
160
161def thread_to_yaml_string(thread, strip_metadata=True):
162 """
163 Convert thread data to a YAML-formatted string for LLM parsing.
164
165 Args:
166 thread: The thread data from get_post_thread
167 strip_metadata: Whether to strip metadata fields for cleaner output
168
169 Returns:
170 YAML-formatted string representation of the thread
171 """
172 # First flatten the thread structure to avoid deep nesting
173 flattened = flatten_thread_structure(thread)
174
175 # Convert complex objects to basic types
176 basic_thread = convert_to_basic_types(flattened)
177
178 if strip_metadata:
179 # Create a copy and strip unwanted fields
180 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS)
181 else:
182 cleaned_thread = basic_thread
183
184 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False)
185
186
187
188
189
190
191
192def get_session(username: str) -> Optional[str]:
193 try:
194 with open(f"session_{username}.txt", encoding="UTF-8") as f:
195 return f.read()
196 except FileNotFoundError:
197 logger.debug(f"No existing session found for {username}")
198 return None
199
200def save_session(username: str, session_string: str) -> None:
201 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f:
202 f.write(session_string)
203 logger.debug(f"Session saved for {username}")
204
205def on_session_change(username: str, event: SessionEvent, session: Session) -> None:
206 logger.debug(f"Session changed: {event} {repr(session)}")
207 if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
208 logger.debug(f"Saving changed session for {username}")
209 save_session(username, session.export())
210
211def init_client(username: str, password: str) -> Client:
212 pds_uri = os.getenv("PDS_URI")
213 if pds_uri is None:
214 logger.warning(
215 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable."
216 )
217 pds_uri = "https://bsky.social"
218
219 # Print the PDS URI
220 logger.debug(f"Using PDS URI: {pds_uri}")
221
222 client = Client(pds_uri)
223 client.on_session_change(
224 lambda event, session: on_session_change(username, event, session)
225 )
226
227 session_string = get_session(username)
228 if session_string:
229 logger.debug(f"Reusing existing session for {username}")
230 client.login(session_string=session_string)
231 else:
232 logger.debug(f"Creating new session for {username}")
233 client.login(username, password)
234
235 return client
236
237
238def default_login() -> Client:
239 username = os.getenv("BSKY_USERNAME")
240 password = os.getenv("BSKY_PASSWORD")
241
242 if username is None:
243 logger.error(
244 "No username provided. Please provide a username using the BSKY_USERNAME environment variable."
245 )
246 exit()
247
248 if password is None:
249 logger.error(
250 "No password provided. Please provide a password using the BSKY_PASSWORD environment variable."
251 )
252 exit()
253
254 return init_client(username, password)
255
256def remove_outside_quotes(text: str) -> str:
257 """
258 Remove outside double quotes from response text.
259
260 Only handles double quotes to avoid interfering with contractions:
261 - Double quotes: "text" → text
262 - Preserves single quotes and internal quotes
263
264 Args:
265 text: The text to process
266
267 Returns:
268 Text with outside double quotes removed
269 """
270 if not text or len(text) < 2:
271 return text
272
273 text = text.strip()
274
275 # Only remove double quotes from start and end
276 if text.startswith('"') and text.endswith('"'):
277 return text[1:-1]
278
279 return text
280
281def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None) -> Dict[str, Any]:
282 """
283 Reply to a post on Bluesky with rich text support.
284
285 Args:
286 client: Authenticated Bluesky client
287 text: The reply text
288 reply_to_uri: The URI of the post being replied to (parent)
289 reply_to_cid: The CID of the post being replied to (parent)
290 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri
291 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid
292 lang: Language code for the post (e.g., 'en-US', 'es', 'ja')
293
294 Returns:
295 The response from sending the post
296 """
297 import re
298
299 # If root is not provided, this is a reply to the root post
300 if root_uri is None:
301 root_uri = reply_to_uri
302 root_cid = reply_to_cid
303
304 # Create references for the reply
305 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid))
306 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid))
307
308 # Parse rich text facets (mentions and URLs)
309 facets = []
310 text_bytes = text.encode("UTF-8")
311
312 # Parse mentions - fixed to handle @ at start of text
313 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)"
314
315 for m in re.finditer(mention_regex, text_bytes):
316 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix
317 # Adjust byte positions to account for the optional prefix
318 mention_start = m.start(1)
319 mention_end = m.end(1)
320 try:
321 # Resolve handle to DID using the API
322 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle})
323 if resolve_resp and hasattr(resolve_resp, 'did'):
324 facets.append(
325 models.AppBskyRichtextFacet.Main(
326 index=models.AppBskyRichtextFacet.ByteSlice(
327 byteStart=mention_start,
328 byteEnd=mention_end
329 ),
330 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)]
331 )
332 )
333 except Exception as e:
334 logger.debug(f"Failed to resolve handle {handle}: {e}")
335 continue
336
337 # Parse URLs - fixed to handle URLs at start of text
338 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)"
339
340 for m in re.finditer(url_regex, text_bytes):
341 url = m.group(1).decode("UTF-8")
342 # Adjust byte positions to account for the optional prefix
343 url_start = m.start(1)
344 url_end = m.end(1)
345 facets.append(
346 models.AppBskyRichtextFacet.Main(
347 index=models.AppBskyRichtextFacet.ByteSlice(
348 byteStart=url_start,
349 byteEnd=url_end
350 ),
351 features=[models.AppBskyRichtextFacet.Link(uri=url)]
352 )
353 )
354
355 # Send the reply with facets if any were found
356 if facets:
357 response = client.send_post(
358 text=text,
359 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
360 facets=facets,
361 langs=[lang] if lang else None
362 )
363 else:
364 response = client.send_post(
365 text=text,
366 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
367 langs=[lang] if lang else None
368 )
369
370 logger.info(f"Reply sent successfully: {response.uri}")
371 return response
372
373
374def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]:
375 """
376 Get the thread containing a post to find root post information.
377
378 Args:
379 client: Authenticated Bluesky client
380 uri: The URI of the post
381
382 Returns:
383 The thread data or None if not found
384 """
385 try:
386 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10})
387 return thread
388 except Exception as e:
389 logger.error(f"Error fetching post thread: {e}")
390 return None
391
392
393def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US") -> Optional[Dict[str, Any]]:
394 """
395 Reply to a notification (mention or reply).
396
397 Args:
398 client: Authenticated Bluesky client
399 notification: The notification object from list_notifications
400 reply_text: The text to reply with
401 lang: Language code for the post (defaults to "en-US")
402
403 Returns:
404 The response from sending the reply or None if failed
405 """
406 try:
407 # Get the post URI and CID from the notification (handle both dict and object)
408 if isinstance(notification, dict):
409 post_uri = notification.get('uri')
410 post_cid = notification.get('cid')
411 # Check if the notification record has reply info with root
412 record = notification.get('record', {})
413 reply_info = record.get('reply') if isinstance(record, dict) else None
414 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
415 post_uri = notification.uri
416 post_cid = notification.cid
417 # Check if the notification record has reply info with root
418 reply_info = None
419 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
420 reply_info = notification.record.reply
421 else:
422 post_uri = None
423 post_cid = None
424 reply_info = None
425
426 if not post_uri or not post_cid:
427 logger.error("Notification doesn't have required uri/cid fields")
428 return None
429
430 # Determine root: if post has reply info, use its root; otherwise this post IS the root
431 if reply_info:
432 # Extract root from the notification's reply structure
433 if isinstance(reply_info, dict):
434 root_ref = reply_info.get('root')
435 if root_ref and isinstance(root_ref, dict):
436 root_uri = root_ref.get('uri', post_uri)
437 root_cid = root_ref.get('cid', post_cid)
438 else:
439 # No root in reply info, use post as root
440 root_uri = post_uri
441 root_cid = post_cid
442 elif hasattr(reply_info, 'root'):
443 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
444 root_uri = reply_info.root.uri
445 root_cid = reply_info.root.cid
446 else:
447 root_uri = post_uri
448 root_cid = post_cid
449 else:
450 root_uri = post_uri
451 root_cid = post_cid
452 else:
453 # No reply info means this post IS the root
454 root_uri = post_uri
455 root_cid = post_cid
456
457 # Reply to the notification
458 return reply_to_post(
459 client=client,
460 text=reply_text,
461 reply_to_uri=post_uri,
462 reply_to_cid=post_cid,
463 root_uri=root_uri,
464 root_cid=root_cid,
465 lang=lang
466 )
467
468 except Exception as e:
469 logger.error(f"Error replying to notification: {e}")
470 return None
471
472
473def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US") -> Optional[List[Dict[str, Any]]]:
474 """
475 Reply to a notification with a threaded chain of messages (max 15).
476
477 Args:
478 client: Authenticated Bluesky client
479 notification: The notification object from list_notifications
480 reply_messages: List of reply texts (max 15 messages, each max 300 chars)
481 lang: Language code for the posts (defaults to "en-US")
482
483 Returns:
484 List of responses from sending the replies or None if failed
485 """
486 try:
487 # Validate input
488 if not reply_messages or len(reply_messages) == 0:
489 logger.error("Reply messages list cannot be empty")
490 return None
491 if len(reply_messages) > 15:
492 logger.error(f"Cannot send more than 15 reply messages (got {len(reply_messages)})")
493 return None
494
495 # Get the post URI and CID from the notification (handle both dict and object)
496 if isinstance(notification, dict):
497 post_uri = notification.get('uri')
498 post_cid = notification.get('cid')
499 # Check if the notification record has reply info with root
500 record = notification.get('record', {})
501 reply_info = record.get('reply') if isinstance(record, dict) else None
502 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
503 post_uri = notification.uri
504 post_cid = notification.cid
505 # Check if the notification record has reply info with root
506 reply_info = None
507 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
508 reply_info = notification.record.reply
509 else:
510 post_uri = None
511 post_cid = None
512 reply_info = None
513
514 if not post_uri or not post_cid:
515 logger.error("Notification doesn't have required uri/cid fields")
516 return None
517
518 # Determine root: if post has reply info, use its root; otherwise this post IS the root
519 if reply_info:
520 # Extract root from the notification's reply structure
521 if isinstance(reply_info, dict):
522 root_ref = reply_info.get('root')
523 if root_ref and isinstance(root_ref, dict):
524 root_uri = root_ref.get('uri', post_uri)
525 root_cid = root_ref.get('cid', post_cid)
526 else:
527 # No root in reply info, use post as root
528 root_uri = post_uri
529 root_cid = post_cid
530 elif hasattr(reply_info, 'root'):
531 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
532 root_uri = reply_info.root.uri
533 root_cid = reply_info.root.cid
534 else:
535 root_uri = post_uri
536 root_cid = post_cid
537 else:
538 root_uri = post_uri
539 root_cid = post_cid
540 else:
541 # No reply info means this post IS the root
542 root_uri = post_uri
543 root_cid = post_cid
544
545 # Send replies in sequence, creating a thread
546 responses = []
547 current_parent_uri = post_uri
548 current_parent_cid = post_cid
549
550 for i, message in enumerate(reply_messages):
551 logger.info(f"Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...")
552
553 # Send this reply
554 response = reply_to_post(
555 client=client,
556 text=message,
557 reply_to_uri=current_parent_uri,
558 reply_to_cid=current_parent_cid,
559 root_uri=root_uri,
560 root_cid=root_cid,
561 lang=lang
562 )
563
564 if not response:
565 logger.error(f"Failed to send reply {i+1}, posting system failure message")
566 # Try to post a system failure message
567 failure_response = reply_to_post(
568 client=client,
569 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]",
570 reply_to_uri=current_parent_uri,
571 reply_to_cid=current_parent_cid,
572 root_uri=root_uri,
573 root_cid=root_cid,
574 lang=lang
575 )
576 if failure_response:
577 responses.append(failure_response)
578 current_parent_uri = failure_response.uri
579 current_parent_cid = failure_response.cid
580 else:
581 logger.error("Could not even send system failure message, stopping thread")
582 return responses if responses else None
583 else:
584 responses.append(response)
585 # Update parent references for next reply (if any)
586 if i < len(reply_messages) - 1: # Not the last message
587 current_parent_uri = response.uri
588 current_parent_cid = response.cid
589
590 logger.info(f"Successfully sent {len(responses)} threaded replies")
591 return responses
592
593 except Exception as e:
594 logger.error(f"Error sending threaded reply to notification: {e}")
595 return None
596
597
598def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]:
599 """
600 Create a stream.thought.ack record to acknowledge a post.
601
602 This creates a custom acknowledgment record instead of a standard Bluesky like,
603 allowing void to track which posts it has engaged with.
604
605 Args:
606 client: Authenticated Bluesky client
607 post_uri: The URI of the post to acknowledge
608 post_cid: The CID of the post to acknowledge
609 note: Optional note to attach to the acknowledgment
610
611 Returns:
612 The response from creating the acknowledgment record or None if failed
613 """
614 try:
615 import requests
616 from datetime import datetime, timezone
617
618 # Get session info from the client
619 # The atproto Client stores the session differently
620 access_token = None
621 user_did = None
622
623 # Try different ways to get the session info
624 if hasattr(client, '_session') and client._session:
625 access_token = client._session.access_jwt
626 user_did = client._session.did
627 elif hasattr(client, 'access_jwt'):
628 access_token = client.access_jwt
629 user_did = client.did if hasattr(client, 'did') else None
630 else:
631 logger.error("Cannot access client session information")
632 return None
633
634 if not access_token or not user_did:
635 logger.error("Missing access token or DID from session")
636 return None
637
638 pds_host = os.getenv("PDS_URI", "https://bsky.social")
639
640 # Create acknowledgment record with stream.thought.ack type
641 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
642 ack_record = {
643 "$type": "stream.thought.ack",
644 "subject": {
645 "uri": post_uri,
646 "cid": post_cid
647 },
648 "createdAt": now,
649 "note": note # Will be null if no note provided
650 }
651
652 # Create the record
653 headers = {"Authorization": f"Bearer {access_token}"}
654 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
655
656 create_data = {
657 "repo": user_did,
658 "collection": "stream.thought.ack",
659 "record": ack_record
660 }
661
662 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
663 response.raise_for_status()
664 result = response.json()
665
666 logger.info(f"Successfully acknowledged post: {post_uri}")
667 return result
668
669 except Exception as e:
670 logger.error(f"Error acknowledging post: {e}")
671 return None
672
673
674if __name__ == "__main__":
675 client = default_login()
676 # do something with the client
677 logger.info("Client is ready to use!")