a digital person for bluesky
1import os
2import logging
3import uuid
4import time
5from typing import Optional, Dict, Any, List
6from atproto_client import Client, Session, SessionEvent, models
7
8# Configure logging
9logging.basicConfig(
10 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
11)
12logger = logging.getLogger("bluesky_session_handler")
13
14# Load the environment variables
15import dotenv
16dotenv.load_dotenv(override=True)
17
18import yaml
19import json
20
21# Strip fields. A list of fields to remove from a JSON object
22STRIP_FIELDS = [
23 "cid",
24 "rev",
25 "did",
26 "uri",
27 "langs",
28 "threadgate",
29 "py_type",
30 "labels",
31 "avatar",
32 "viewer",
33 "indexed_at",
34 "tags",
35 "associated",
36 "thread_context",
37 "aspect_ratio",
38 "thumb",
39 "fullsize",
40 "root",
41 "created_at",
42 "verification",
43 "like_count",
44 "quote_count",
45 "reply_count",
46 "repost_count",
47 "embedding_disabled",
48 "thread_muted",
49 "reply_disabled",
50 "pinned",
51 "like",
52 "repost",
53 "blocked_by",
54 "blocking",
55 "blocking_by_list",
56 "followed_by",
57 "following",
58 "known_followers",
59 "muted",
60 "muted_by_list",
61 "root_author_like",
62 "entities",
63 "ref",
64 "mime_type",
65 "size",
66]
67def convert_to_basic_types(obj):
68 """Convert complex Python objects to basic types for JSON/YAML serialization."""
69 if hasattr(obj, '__dict__'):
70 # Convert objects with __dict__ to their dictionary representation
71 return convert_to_basic_types(obj.__dict__)
72 elif isinstance(obj, dict):
73 return {key: convert_to_basic_types(value) for key, value in obj.items()}
74 elif isinstance(obj, list):
75 return [convert_to_basic_types(item) for item in obj]
76 elif isinstance(obj, (str, int, float, bool)) or obj is None:
77 return obj
78 else:
79 # For other types, try to convert to string
80 return str(obj)
81
82
83def strip_fields(obj, strip_field_list):
84 """Recursively strip fields from a JSON object."""
85 if isinstance(obj, dict):
86 keys_flagged_for_removal = []
87
88 # Remove fields from strip list and pydantic metadata
89 for field in list(obj.keys()):
90 if field in strip_field_list or field.startswith("__"):
91 keys_flagged_for_removal.append(field)
92
93 # Remove flagged keys
94 for key in keys_flagged_for_removal:
95 obj.pop(key, None)
96
97 # Recursively process remaining values
98 for key, value in list(obj.items()):
99 obj[key] = strip_fields(value, strip_field_list)
100 # Remove empty/null values after processing
101 if (
102 obj[key] is None
103 or (isinstance(obj[key], dict) and len(obj[key]) == 0)
104 or (isinstance(obj[key], list) and len(obj[key]) == 0)
105 or (isinstance(obj[key], str) and obj[key].strip() == "")
106 ):
107 obj.pop(key, None)
108
109 elif isinstance(obj, list):
110 for i, value in enumerate(obj):
111 obj[i] = strip_fields(value, strip_field_list)
112 # Remove None values from list
113 obj[:] = [item for item in obj if item is not None]
114
115 return obj
116
117
118def flatten_thread_structure(thread_data):
119 """
120 Flatten a nested thread structure into a list while preserving all data.
121
122 Args:
123 thread_data: The thread data from get_post_thread
124
125 Returns:
126 Dict with 'posts' key containing a list of posts in chronological order
127 """
128 posts = []
129
130 def traverse_thread(node):
131 """Recursively traverse the thread structure to collect posts."""
132 if not node:
133 return
134
135 # If this node has a parent, traverse it first (to maintain chronological order)
136 if hasattr(node, 'parent') and node.parent:
137 traverse_thread(node.parent)
138
139 # Then add this node's post
140 if hasattr(node, 'post') and node.post:
141 # Convert to dict if needed to ensure we can process it
142 if hasattr(node.post, '__dict__'):
143 post_dict = node.post.__dict__.copy()
144 elif isinstance(node.post, dict):
145 post_dict = node.post.copy()
146 else:
147 post_dict = {}
148
149 posts.append(post_dict)
150
151 # Handle the thread structure
152 if hasattr(thread_data, 'thread'):
153 # Start from the main thread node
154 traverse_thread(thread_data.thread)
155 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__:
156 traverse_thread(thread_data.__dict__['thread'])
157
158 # Return a simple structure with posts list
159 return {'posts': posts}
160
161
162def thread_to_yaml_string(thread, strip_metadata=True):
163 """
164 Convert thread data to a YAML-formatted string for LLM parsing.
165
166 Args:
167 thread: The thread data from get_post_thread
168 strip_metadata: Whether to strip metadata fields for cleaner output
169
170 Returns:
171 YAML-formatted string representation of the thread
172 """
173 # First flatten the thread structure to avoid deep nesting
174 flattened = flatten_thread_structure(thread)
175
176 # Convert complex objects to basic types
177 basic_thread = convert_to_basic_types(flattened)
178
179 if strip_metadata:
180 # Create a copy and strip unwanted fields
181 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS)
182 else:
183 cleaned_thread = basic_thread
184
185 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False)
186
187
188
189
190
191
192
193def get_session(username: str) -> Optional[str]:
194 try:
195 with open(f"session_{username}.txt", encoding="UTF-8") as f:
196 return f.read()
197 except FileNotFoundError:
198 logger.debug(f"No existing session found for {username}")
199 return None
200
201def save_session(username: str, session_string: str) -> None:
202 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f:
203 f.write(session_string)
204 logger.debug(f"Session saved for {username}")
205
206def on_session_change(username: str, event: SessionEvent, session: Session) -> None:
207 logger.debug(f"Session changed: {event} {repr(session)}")
208 if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
209 logger.debug(f"Saving changed session for {username}")
210 save_session(username, session.export())
211
212def init_client(username: str, password: str) -> Client:
213 pds_uri = os.getenv("PDS_URI")
214 if pds_uri is None:
215 logger.warning(
216 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable."
217 )
218 pds_uri = "https://bsky.social"
219
220 # Print the PDS URI
221 logger.debug(f"Using PDS URI: {pds_uri}")
222
223 client = Client(pds_uri)
224 client.on_session_change(
225 lambda event, session: on_session_change(username, event, session)
226 )
227
228 session_string = get_session(username)
229 if session_string:
230 logger.debug(f"Reusing existing session for {username}")
231 client.login(session_string=session_string)
232 else:
233 logger.debug(f"Creating new session for {username}")
234 client.login(username, password)
235
236 return client
237
238
239def default_login() -> Client:
240 """Login using configuration from config.yaml or environment variables."""
241 try:
242 from config_loader import get_bluesky_config
243 bluesky_config = get_bluesky_config()
244
245 username = bluesky_config['username']
246 password = bluesky_config['password']
247 pds_uri = bluesky_config.get('pds_uri', 'https://bsky.social')
248
249 logger.info(f"Logging into Bluesky as {username} via {pds_uri}")
250
251 # Use pds_uri from config
252 client = Client(base_url=pds_uri)
253 client.login(username, password)
254 return client
255
256 except Exception as e:
257 logger.error(f"Failed to load Bluesky configuration: {e}")
258 logger.error("Please check your config.yaml file or environment variables")
259 exit(1)
260
261def remove_outside_quotes(text: str) -> str:
262 """
263 Remove outside double quotes from response text.
264
265 Only handles double quotes to avoid interfering with contractions:
266 - Double quotes: "text" → text
267 - Preserves single quotes and internal quotes
268
269 Args:
270 text: The text to process
271
272 Returns:
273 Text with outside double quotes removed
274 """
275 if not text or len(text) < 2:
276 return text
277
278 text = text.strip()
279
280 # Only remove double quotes from start and end
281 if text.startswith('"') and text.endswith('"'):
282 return text[1:-1]
283
284 return text
285
286def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None, correlation_id: Optional[str] = None) -> Dict[str, Any]:
287 """
288 Reply to a post on Bluesky with rich text support.
289
290 Args:
291 client: Authenticated Bluesky client
292 text: The reply text
293 reply_to_uri: The URI of the post being replied to (parent)
294 reply_to_cid: The CID of the post being replied to (parent)
295 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri
296 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid
297 lang: Language code for the post (e.g., 'en-US', 'es', 'ja')
298 correlation_id: Unique ID for tracking this message through the pipeline
299
300 Returns:
301 The response from sending the post
302 """
303 import re
304
305 # Generate correlation ID if not provided
306 if correlation_id is None:
307 correlation_id = str(uuid.uuid4())[:8]
308
309 # Enhanced logging with structured data
310 logger.info(f"[{correlation_id}] Starting reply_to_post", extra={
311 'correlation_id': correlation_id,
312 'text_length': len(text),
313 'text_preview': text[:100] + '...' if len(text) > 100 else text,
314 'reply_to_uri': reply_to_uri,
315 'root_uri': root_uri,
316 'lang': lang
317 })
318
319 start_time = time.time()
320
321 # If root is not provided, this is a reply to the root post
322 if root_uri is None:
323 root_uri = reply_to_uri
324 root_cid = reply_to_cid
325
326 # Create references for the reply
327 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid))
328 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid))
329
330 # Parse rich text facets (mentions and URLs)
331 facets = []
332 text_bytes = text.encode("UTF-8")
333 mentions_found = []
334 urls_found = []
335
336 logger.debug(f"[{correlation_id}] Parsing facets from text (length: {len(text_bytes)} bytes)")
337
338 # Parse mentions - fixed to handle @ at start of text
339 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)"
340
341 for m in re.finditer(mention_regex, text_bytes):
342 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix
343 mentions_found.append(handle)
344 # Adjust byte positions to account for the optional prefix
345 mention_start = m.start(1)
346 mention_end = m.end(1)
347 try:
348 # Resolve handle to DID using the API
349 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle})
350 if resolve_resp and hasattr(resolve_resp, 'did'):
351 facets.append(
352 models.AppBskyRichtextFacet.Main(
353 index=models.AppBskyRichtextFacet.ByteSlice(
354 byteStart=mention_start,
355 byteEnd=mention_end
356 ),
357 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)]
358 )
359 )
360 logger.debug(f"[{correlation_id}] Resolved mention @{handle} -> {resolve_resp.did}")
361 except Exception as e:
362 logger.warning(f"[{correlation_id}] Failed to resolve handle @{handle}: {e}")
363 continue
364
365 # Parse URLs - fixed to handle URLs at start of text
366 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)"
367
368 for m in re.finditer(url_regex, text_bytes):
369 url = m.group(1).decode("UTF-8")
370 urls_found.append(url)
371 # Adjust byte positions to account for the optional prefix
372 url_start = m.start(1)
373 url_end = m.end(1)
374 facets.append(
375 models.AppBskyRichtextFacet.Main(
376 index=models.AppBskyRichtextFacet.ByteSlice(
377 byteStart=url_start,
378 byteEnd=url_end
379 ),
380 features=[models.AppBskyRichtextFacet.Link(uri=url)]
381 )
382 )
383 logger.debug(f"[{correlation_id}] Found URL: {url}")
384
385 # Parse hashtags
386 hashtag_regex = rb"(?:^|[$|\s])#([a-zA-Z0-9_]+)"
387 hashtags_found = []
388
389 for m in re.finditer(hashtag_regex, text_bytes):
390 tag = m.group(1).decode("UTF-8") # Get tag without # prefix
391 hashtags_found.append(tag)
392 # Get byte positions for the entire hashtag including #
393 tag_start = m.start(0)
394 # Adjust start if there's a space/prefix
395 if text_bytes[tag_start:tag_start+1] in (b' ', b'$'):
396 tag_start += 1
397 tag_end = m.end(0)
398 facets.append(
399 models.AppBskyRichtextFacet.Main(
400 index=models.AppBskyRichtextFacet.ByteSlice(
401 byteStart=tag_start,
402 byteEnd=tag_end
403 ),
404 features=[models.AppBskyRichtextFacet.Tag(tag=tag)]
405 )
406 )
407 logger.debug(f"[{correlation_id}] Found hashtag: #{tag}")
408
409 logger.debug(f"[{correlation_id}] Facet parsing complete", extra={
410 'correlation_id': correlation_id,
411 'mentions_count': len(mentions_found),
412 'mentions': mentions_found,
413 'urls_count': len(urls_found),
414 'urls': urls_found,
415 'hashtags_count': len(hashtags_found),
416 'hashtags': hashtags_found,
417 'total_facets': len(facets)
418 })
419
420 # Send the reply with facets if any were found
421 logger.info(f"[{correlation_id}] Sending reply to Bluesky API", extra={
422 'correlation_id': correlation_id,
423 'has_facets': bool(facets),
424 'facet_count': len(facets),
425 'lang': lang
426 })
427
428 try:
429 if facets:
430 response = client.send_post(
431 text=text,
432 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
433 facets=facets,
434 langs=[lang] if lang else None
435 )
436 else:
437 response = client.send_post(
438 text=text,
439 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
440 langs=[lang] if lang else None
441 )
442
443 # Calculate response time
444 response_time = time.time() - start_time
445
446 # Extract post URL for user-friendly logging
447 post_url = None
448 if hasattr(response, 'uri') and response.uri:
449 # Convert AT-URI to web URL
450 # Format: at://did:plc:xxx/app.bsky.feed.post/xxx -> https://bsky.app/profile/handle/post/xxx
451 try:
452 uri_parts = response.uri.split('/')
453 if len(uri_parts) >= 4 and uri_parts[3] == 'app.bsky.feed.post':
454 rkey = uri_parts[4]
455 # We'd need to resolve DID to handle, but for now just use the URI
456 post_url = f"bsky://post/{rkey}"
457 except:
458 pass
459
460 logger.info(f"[{correlation_id}] Reply sent successfully ({response_time:.3f}s) - URI: {response.uri}" +
461 (f" - URL: {post_url}" if post_url else ""), extra={
462 'correlation_id': correlation_id,
463 'response_time': round(response_time, 3),
464 'post_uri': response.uri,
465 'post_url': post_url,
466 'post_cid': getattr(response, 'cid', None),
467 'text_length': len(text)
468 })
469
470 return response
471
472 except Exception as e:
473 response_time = time.time() - start_time
474 logger.error(f"[{correlation_id}] Failed to send reply", extra={
475 'correlation_id': correlation_id,
476 'error': str(e),
477 'error_type': type(e).__name__,
478 'response_time': round(response_time, 3),
479 'text_length': len(text)
480 })
481 raise
482
483
484def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]:
485 """
486 Get the thread containing a post to find root post information.
487
488 Args:
489 client: Authenticated Bluesky client
490 uri: The URI of the post
491
492 Returns:
493 The thread data or None if not found
494 """
495 try:
496 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10})
497 return thread
498 except Exception as e:
499 logger.error(f"Error fetching post thread: {e}")
500 return None
501
502
503def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
504 """
505 Reply to a notification (mention or reply).
506
507 Args:
508 client: Authenticated Bluesky client
509 notification: The notification object from list_notifications
510 reply_text: The text to reply with
511 lang: Language code for the post (defaults to "en-US")
512 correlation_id: Unique ID for tracking this message through the pipeline
513
514 Returns:
515 The response from sending the reply or None if failed
516 """
517 # Generate correlation ID if not provided
518 if correlation_id is None:
519 correlation_id = str(uuid.uuid4())[:8]
520
521 logger.info(f"[{correlation_id}] Processing reply_to_notification", extra={
522 'correlation_id': correlation_id,
523 'reply_length': len(reply_text),
524 'lang': lang
525 })
526
527 try:
528 # Get the post URI and CID from the notification (handle both dict and object)
529 if isinstance(notification, dict):
530 post_uri = notification.get('uri')
531 post_cid = notification.get('cid')
532 # Check if the notification record has reply info with root
533 record = notification.get('record', {})
534 reply_info = record.get('reply') if isinstance(record, dict) else None
535 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
536 post_uri = notification.uri
537 post_cid = notification.cid
538 # Check if the notification record has reply info with root
539 reply_info = None
540 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
541 reply_info = notification.record.reply
542 else:
543 post_uri = None
544 post_cid = None
545 reply_info = None
546
547 if not post_uri or not post_cid:
548 logger.error("Notification doesn't have required uri/cid fields")
549 return None
550
551 # Determine root: if post has reply info, use its root; otherwise this post IS the root
552 if reply_info:
553 # Extract root from the notification's reply structure
554 if isinstance(reply_info, dict):
555 root_ref = reply_info.get('root')
556 if root_ref and isinstance(root_ref, dict):
557 root_uri = root_ref.get('uri', post_uri)
558 root_cid = root_ref.get('cid', post_cid)
559 else:
560 # No root in reply info, use post as root
561 root_uri = post_uri
562 root_cid = post_cid
563 elif hasattr(reply_info, 'root'):
564 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
565 root_uri = reply_info.root.uri
566 root_cid = reply_info.root.cid
567 else:
568 root_uri = post_uri
569 root_cid = post_cid
570 else:
571 root_uri = post_uri
572 root_cid = post_cid
573 else:
574 # No reply info means this post IS the root
575 root_uri = post_uri
576 root_cid = post_cid
577
578 # Reply to the notification
579 return reply_to_post(
580 client=client,
581 text=reply_text,
582 reply_to_uri=post_uri,
583 reply_to_cid=post_cid,
584 root_uri=root_uri,
585 root_cid=root_cid,
586 lang=lang,
587 correlation_id=correlation_id
588 )
589
590 except Exception as e:
591 logger.error(f"[{correlation_id}] Error replying to notification: {e}", extra={
592 'correlation_id': correlation_id,
593 'error': str(e),
594 'error_type': type(e).__name__
595 })
596 return None
597
598
599def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[List[Dict[str, Any]]]:
600 """
601 Reply to a notification with a threaded chain of messages (max 15).
602
603 Args:
604 client: Authenticated Bluesky client
605 notification: The notification object from list_notifications
606 reply_messages: List of reply texts (max 15 messages, each max 300 chars)
607 lang: Language code for the posts (defaults to "en-US")
608 correlation_id: Unique ID for tracking this message through the pipeline
609
610 Returns:
611 List of responses from sending the replies or None if failed
612 """
613 # Generate correlation ID if not provided
614 if correlation_id is None:
615 correlation_id = str(uuid.uuid4())[:8]
616
617 logger.info(f"[{correlation_id}] Starting threaded reply", extra={
618 'correlation_id': correlation_id,
619 'message_count': len(reply_messages),
620 'total_length': sum(len(msg) for msg in reply_messages),
621 'lang': lang
622 })
623
624 try:
625 # Validate input
626 if not reply_messages or len(reply_messages) == 0:
627 logger.error(f"[{correlation_id}] Reply messages list cannot be empty")
628 return None
629 if len(reply_messages) > 15:
630 logger.error(f"[{correlation_id}] Cannot send more than 15 reply messages (got {len(reply_messages)})")
631 return None
632
633 # Get the post URI and CID from the notification (handle both dict and object)
634 if isinstance(notification, dict):
635 post_uri = notification.get('uri')
636 post_cid = notification.get('cid')
637 # Check if the notification record has reply info with root
638 record = notification.get('record', {})
639 reply_info = record.get('reply') if isinstance(record, dict) else None
640 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
641 post_uri = notification.uri
642 post_cid = notification.cid
643 # Check if the notification record has reply info with root
644 reply_info = None
645 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
646 reply_info = notification.record.reply
647 else:
648 post_uri = None
649 post_cid = None
650 reply_info = None
651
652 if not post_uri or not post_cid:
653 logger.error("Notification doesn't have required uri/cid fields")
654 return None
655
656 # Determine root: if post has reply info, use its root; otherwise this post IS the root
657 if reply_info:
658 # Extract root from the notification's reply structure
659 if isinstance(reply_info, dict):
660 root_ref = reply_info.get('root')
661 if root_ref and isinstance(root_ref, dict):
662 root_uri = root_ref.get('uri', post_uri)
663 root_cid = root_ref.get('cid', post_cid)
664 else:
665 # No root in reply info, use post as root
666 root_uri = post_uri
667 root_cid = post_cid
668 elif hasattr(reply_info, 'root'):
669 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
670 root_uri = reply_info.root.uri
671 root_cid = reply_info.root.cid
672 else:
673 root_uri = post_uri
674 root_cid = post_cid
675 else:
676 root_uri = post_uri
677 root_cid = post_cid
678 else:
679 # No reply info means this post IS the root
680 root_uri = post_uri
681 root_cid = post_cid
682
683 # Send replies in sequence, creating a thread
684 responses = []
685 current_parent_uri = post_uri
686 current_parent_cid = post_cid
687
688 for i, message in enumerate(reply_messages):
689 thread_correlation_id = f"{correlation_id}-{i+1}"
690 logger.info(f"[{thread_correlation_id}] Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...")
691
692 # Send this reply
693 response = reply_to_post(
694 client=client,
695 text=message,
696 reply_to_uri=current_parent_uri,
697 reply_to_cid=current_parent_cid,
698 root_uri=root_uri,
699 root_cid=root_cid,
700 lang=lang,
701 correlation_id=thread_correlation_id
702 )
703
704 if not response:
705 logger.error(f"[{thread_correlation_id}] Failed to send reply {i+1}, posting system failure message")
706 # Try to post a system failure message
707 failure_response = reply_to_post(
708 client=client,
709 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]",
710 reply_to_uri=current_parent_uri,
711 reply_to_cid=current_parent_cid,
712 root_uri=root_uri,
713 root_cid=root_cid,
714 lang=lang,
715 correlation_id=f"{thread_correlation_id}-FAIL"
716 )
717 if failure_response:
718 responses.append(failure_response)
719 current_parent_uri = failure_response.uri
720 current_parent_cid = failure_response.cid
721 else:
722 logger.error(f"[{thread_correlation_id}] Could not even send system failure message, stopping thread")
723 return responses if responses else None
724 else:
725 responses.append(response)
726 # Update parent references for next reply (if any)
727 if i < len(reply_messages) - 1: # Not the last message
728 current_parent_uri = response.uri
729 current_parent_cid = response.cid
730
731 logger.info(f"[{correlation_id}] Successfully sent {len(responses)} threaded replies", extra={
732 'correlation_id': correlation_id,
733 'replies_sent': len(responses),
734 'replies_requested': len(reply_messages)
735 })
736 return responses
737
738 except Exception as e:
739 logger.error(f"[{correlation_id}] Error sending threaded reply to notification: {e}", extra={
740 'correlation_id': correlation_id,
741 'error': str(e),
742 'error_type': type(e).__name__,
743 'message_count': len(reply_messages)
744 })
745 return None
746
747
748def create_synthesis_ack(client: Client, note: str) -> Optional[Dict[str, Any]]:
749 """
750 Create a stream.thought.ack record for synthesis without a target post.
751
752 This creates a synthesis acknowledgment with null subject field.
753
754 Args:
755 client: Authenticated Bluesky client
756 note: The synthesis note/content
757
758 Returns:
759 The response from creating the acknowledgment record or None if failed
760 """
761 try:
762 import requests
763 import json
764 from datetime import datetime, timezone
765
766 # Get session info from the client
767 access_token = None
768 user_did = None
769
770 # Try different ways to get the session info
771 if hasattr(client, '_session') and client._session:
772 access_token = client._session.access_jwt
773 user_did = client._session.did
774 elif hasattr(client, 'access_jwt'):
775 access_token = client.access_jwt
776 user_did = client.did if hasattr(client, 'did') else None
777 else:
778 logger.error("Cannot access client session information")
779 return None
780
781 if not access_token or not user_did:
782 logger.error("Missing access token or DID from session")
783 return None
784
785 # Get PDS URI from config instead of environment variables
786 from config_loader import get_bluesky_config
787 bluesky_config = get_bluesky_config()
788 pds_host = bluesky_config['pds_uri']
789
790 # Create acknowledgment record with null subject
791 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
792 ack_record = {
793 "$type": "stream.thought.ack",
794 "subject": None, # Null subject for synthesis
795 "createdAt": now,
796 "note": note
797 }
798
799 # Create the record
800 headers = {"Authorization": f"Bearer {access_token}"}
801 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
802
803 create_data = {
804 "repo": user_did,
805 "collection": "stream.thought.ack",
806 "record": ack_record
807 }
808
809 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
810 response.raise_for_status()
811 result = response.json()
812
813 logger.info(f"Successfully created synthesis acknowledgment")
814 return result
815
816 except Exception as e:
817 logger.error(f"Error creating synthesis acknowledgment: {e}")
818 return None
819
820
821def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]:
822 """
823 Create a stream.thought.ack record to acknowledge a post.
824
825 This creates a custom acknowledgment record instead of a standard Bluesky like,
826 allowing void to track which posts it has engaged with.
827
828 Args:
829 client: Authenticated Bluesky client
830 post_uri: The URI of the post to acknowledge
831 post_cid: The CID of the post to acknowledge
832 note: Optional note to attach to the acknowledgment
833
834 Returns:
835 The response from creating the acknowledgment record or None if failed
836 """
837 try:
838 import requests
839 import json
840 from datetime import datetime, timezone
841
842 # Get session info from the client
843 # The atproto Client stores the session differently
844 access_token = None
845 user_did = None
846
847 # Try different ways to get the session info
848 if hasattr(client, '_session') and client._session:
849 access_token = client._session.access_jwt
850 user_did = client._session.did
851 elif hasattr(client, 'access_jwt'):
852 access_token = client.access_jwt
853 user_did = client.did if hasattr(client, 'did') else None
854 else:
855 logger.error("Cannot access client session information")
856 return None
857
858 if not access_token or not user_did:
859 logger.error("Missing access token or DID from session")
860 return None
861
862 # Get PDS URI from config instead of environment variables
863 from config_loader import get_bluesky_config
864 bluesky_config = get_bluesky_config()
865 pds_host = bluesky_config['pds_uri']
866
867 # Create acknowledgment record with stream.thought.ack type
868 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
869 ack_record = {
870 "$type": "stream.thought.ack",
871 "subject": {
872 "uri": post_uri,
873 "cid": post_cid
874 },
875 "createdAt": now,
876 "note": note # Will be null if no note provided
877 }
878
879 # Create the record
880 headers = {"Authorization": f"Bearer {access_token}"}
881 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
882
883 create_data = {
884 "repo": user_did,
885 "collection": "stream.thought.ack",
886 "record": ack_record
887 }
888
889 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
890 response.raise_for_status()
891 result = response.json()
892
893 logger.info(f"Successfully acknowledged post: {post_uri}")
894 return result
895
896 except Exception as e:
897 logger.error(f"Error acknowledging post: {e}")
898 return None
899
900
901def create_tool_call_record(client: Client, tool_name: str, arguments: str, tool_call_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
902 """
903 Create a stream.thought.tool_call record to track tool usage.
904
905 This creates a record of tool calls made by void during processing,
906 allowing for analysis of tool usage patterns and debugging.
907
908 Args:
909 client: Authenticated Bluesky client
910 tool_name: Name of the tool being called
911 arguments: Raw JSON string of the tool arguments
912 tool_call_id: Optional ID of the tool call for correlation
913
914 Returns:
915 The response from creating the tool call record or None if failed
916 """
917 try:
918 import requests
919 import json
920 from datetime import datetime, timezone
921
922 # Get session info from the client
923 access_token = None
924 user_did = None
925
926 # Try different ways to get the session info
927 if hasattr(client, '_session') and client._session:
928 access_token = client._session.access_jwt
929 user_did = client._session.did
930 elif hasattr(client, 'access_jwt'):
931 access_token = client.access_jwt
932 user_did = client.did if hasattr(client, 'did') else None
933 else:
934 logger.error("Cannot access client session information")
935 return None
936
937 if not access_token or not user_did:
938 logger.error("Missing access token or DID from session")
939 return None
940
941 # Get PDS URI from config instead of environment variables
942 from config_loader import get_bluesky_config
943 bluesky_config = get_bluesky_config()
944 pds_host = bluesky_config['pds_uri']
945
946 # Create tool call record
947 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
948 tool_record = {
949 "$type": "stream.thought.tool.call",
950 "tool_name": tool_name,
951 "arguments": arguments, # Store as string to avoid parsing issues
952 "createdAt": now
953 }
954
955 # Add tool_call_id if provided
956 if tool_call_id:
957 tool_record["tool_call_id"] = tool_call_id
958
959 # Create the record
960 headers = {"Authorization": f"Bearer {access_token}"}
961 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
962
963 create_data = {
964 "repo": user_did,
965 "collection": "stream.thought.tool.call",
966 "record": tool_record
967 }
968
969 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
970 if response.status_code != 200:
971 logger.error(f"Tool call record creation failed: {response.status_code} - {response.text}")
972 response.raise_for_status()
973 result = response.json()
974
975 logger.debug(f"Successfully recorded tool call: {tool_name}")
976 return result
977
978 except Exception as e:
979 logger.error(f"Error creating tool call record: {e}")
980 return None
981
982
983def create_reasoning_record(client: Client, reasoning_text: str) -> Optional[Dict[str, Any]]:
984 """
985 Create a stream.thought.reasoning record to track agent reasoning.
986
987 This creates a record of void's reasoning during message processing,
988 providing transparency into the decision-making process.
989
990 Args:
991 client: Authenticated Bluesky client
992 reasoning_text: The reasoning text from the agent
993
994 Returns:
995 The response from creating the reasoning record or None if failed
996 """
997 try:
998 import requests
999 import json
1000 from datetime import datetime, timezone
1001
1002 # Get session info from the client
1003 access_token = None
1004 user_did = None
1005
1006 # Try different ways to get the session info
1007 if hasattr(client, '_session') and client._session:
1008 access_token = client._session.access_jwt
1009 user_did = client._session.did
1010 elif hasattr(client, 'access_jwt'):
1011 access_token = client.access_jwt
1012 user_did = client.did if hasattr(client, 'did') else None
1013 else:
1014 logger.error("Cannot access client session information")
1015 return None
1016
1017 if not access_token or not user_did:
1018 logger.error("Missing access token or DID from session")
1019 return None
1020
1021 # Get PDS URI from config instead of environment variables
1022 from config_loader import get_bluesky_config
1023 bluesky_config = get_bluesky_config()
1024 pds_host = bluesky_config['pds_uri']
1025
1026 # Create reasoning record
1027 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
1028 reasoning_record = {
1029 "$type": "stream.thought.reasoning",
1030 "reasoning": reasoning_text,
1031 "createdAt": now
1032 }
1033
1034 # Create the record
1035 headers = {"Authorization": f"Bearer {access_token}"}
1036 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
1037
1038 create_data = {
1039 "repo": user_did,
1040 "collection": "stream.thought.reasoning",
1041 "record": reasoning_record
1042 }
1043
1044 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
1045 response.raise_for_status()
1046 result = response.json()
1047
1048 logger.debug(f"Successfully recorded reasoning (length: {len(reasoning_text)} chars)")
1049 return result
1050
1051 except Exception as e:
1052 logger.error(f"Error creating reasoning record: {e}")
1053 return None
1054
1055
1056def create_memory_record(client: Client, content: str, tags: Optional[List[str]] = None) -> Optional[Dict[str, Any]]:
1057 """
1058 Create a stream.thought.memory record to store archival memory insertions.
1059
1060 This creates a record of archival_memory_insert tool calls, preserving
1061 important memories and context in the AT Protocol.
1062
1063 Args:
1064 client: Authenticated Bluesky client
1065 content: The memory content being archived
1066 tags: Optional list of tags associated with this memory
1067
1068 Returns:
1069 The response from creating the memory record or None if failed
1070 """
1071 try:
1072 import requests
1073 import json
1074 from datetime import datetime, timezone
1075
1076 # Get session info from the client
1077 access_token = None
1078 user_did = None
1079
1080 # Try different ways to get the session info
1081 if hasattr(client, '_session') and client._session:
1082 access_token = client._session.access_jwt
1083 user_did = client._session.did
1084 elif hasattr(client, 'access_jwt'):
1085 access_token = client.access_jwt
1086 user_did = client.did if hasattr(client, 'did') else None
1087 else:
1088 logger.error("Cannot access client session information")
1089 return None
1090
1091 if not access_token or not user_did:
1092 logger.error("Missing access token or DID from session")
1093 return None
1094
1095 # Get PDS URI from config instead of environment variables
1096 from config_loader import get_bluesky_config
1097 bluesky_config = get_bluesky_config()
1098 pds_host = bluesky_config['pds_uri']
1099
1100 # Create memory record
1101 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
1102 memory_record = {
1103 "$type": "stream.thought.memory",
1104 "content": content,
1105 "createdAt": now
1106 }
1107
1108 # Add tags if provided (can be null)
1109 if tags is not None:
1110 memory_record["tags"] = tags
1111
1112 # Create the record
1113 headers = {"Authorization": f"Bearer {access_token}"}
1114 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
1115
1116 create_data = {
1117 "repo": user_did,
1118 "collection": "stream.thought.memory",
1119 "record": memory_record
1120 }
1121
1122 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
1123 response.raise_for_status()
1124 result = response.json()
1125
1126 tags_info = f" with {len(tags)} tags" if tags else " (no tags)"
1127 logger.debug(f"Successfully recorded memory (length: {len(content)} chars{tags_info})")
1128 return result
1129
1130 except Exception as e:
1131 logger.error(f"Error creating memory record: {e}")
1132 return None
1133
1134
1135def sync_followers(client: Client, dry_run: bool = False) -> Dict[str, Any]:
1136 """
1137 Check who is following the bot and who the bot is following,
1138 then follow back users who aren't already followed.
1139
1140 This implements the autofollow feature by creating follow records
1141 (app.bsky.graph.follow) for users who follow the bot.
1142
1143 Args:
1144 client: Authenticated Bluesky client
1145 dry_run: If True, only report what would be done without actually following
1146
1147 Returns:
1148 Dict with stats: {
1149 'followers_count': int,
1150 'following_count': int,
1151 'to_follow': List[str], # List of handles to follow
1152 'newly_followed': List[str], # List of handles actually followed (empty if dry_run)
1153 'errors': List[str] # Any errors encountered
1154 }
1155 """
1156 try:
1157 from datetime import datetime, timezone
1158
1159 # Get session info from the client
1160 access_token = None
1161 user_did = None
1162
1163 if hasattr(client, '_session') and client._session:
1164 access_token = client._session.access_jwt
1165 user_did = client._session.did
1166 elif hasattr(client, 'access_jwt'):
1167 access_token = client.access_jwt
1168 user_did = client.did if hasattr(client, 'did') else None
1169 else:
1170 logger.error("Cannot access client session information")
1171 return {'error': 'Cannot access client session'}
1172
1173 if not access_token or not user_did:
1174 logger.error("Missing access token or DID from session")
1175 return {'error': 'Missing access token or DID'}
1176
1177 # Get PDS URI from config
1178 from config_loader import get_bluesky_config
1179 bluesky_config = get_bluesky_config()
1180 pds_host = bluesky_config['pds_uri']
1181
1182 # Get followers using the API
1183 followers_response = client.app.bsky.graph.get_followers({'actor': user_did})
1184 followers = followers_response.followers if hasattr(followers_response, 'followers') else []
1185 follower_dids = {f.did for f in followers}
1186
1187 # Get following using the API
1188 following_response = client.app.bsky.graph.get_follows({'actor': user_did})
1189 following = following_response.follows if hasattr(following_response, 'follows') else []
1190 following_dids = {f.did for f in following}
1191
1192 # Find users who follow us but we don't follow back
1193 to_follow_dids = follower_dids - following_dids
1194
1195 # Build result object
1196 result = {
1197 'followers_count': len(followers),
1198 'following_count': len(following),
1199 'to_follow': [],
1200 'newly_followed': [],
1201 'errors': []
1202 }
1203
1204 # Get handles for users to follow
1205 to_follow_handles = []
1206 for follower in followers:
1207 if follower.did in to_follow_dids:
1208 handle = follower.handle if hasattr(follower, 'handle') else follower.did
1209 to_follow_handles.append(handle)
1210 result['to_follow'].append(handle)
1211
1212 logger.info(f"Follower sync: {len(followers)} followers, {len(following)} following, {len(to_follow_dids)} to follow")
1213
1214 # If dry run, just return the stats
1215 if dry_run:
1216 logger.info(f"Dry run - would follow: {', '.join(to_follow_handles)}")
1217 return result
1218
1219 # Actually follow the users with rate limiting
1220 import requests
1221 headers = {"Authorization": f"Bearer {access_token}"}
1222 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
1223
1224 for i, did in enumerate(to_follow_dids):
1225 try:
1226 # Rate limiting: wait 2 seconds between follows to avoid spamming the server
1227 if i > 0:
1228 time.sleep(2)
1229
1230 # Create follow record
1231 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
1232 follow_record = {
1233 "$type": "app.bsky.graph.follow",
1234 "subject": did,
1235 "createdAt": now
1236 }
1237
1238 create_data = {
1239 "repo": user_did,
1240 "collection": "app.bsky.graph.follow",
1241 "record": follow_record
1242 }
1243
1244 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
1245 response.raise_for_status()
1246
1247 # Find handle for this DID
1248 handle = next((f.handle for f in followers if f.did == did), did)
1249 result['newly_followed'].append(handle)
1250 logger.info(f"Followed: {handle}")
1251
1252 except Exception as e:
1253 error_msg = f"Failed to follow {did}: {e}"
1254 logger.error(error_msg)
1255 result['errors'].append(error_msg)
1256
1257 return result
1258
1259 except Exception as e:
1260 logger.error(f"Error syncing followers: {e}")
1261 return {'error': str(e)}
1262
1263
1264if __name__ == "__main__":
1265 client = default_login()
1266 # do something with the client
1267 logger.info("Client is ready to use!")