a digital person for bluesky
1import os
2import logging
3import uuid
4import time
5from typing import Optional, Dict, Any, List
6from atproto_client import Client, Session, SessionEvent, models
7
8# Configure logging
9logging.basicConfig(
10 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
11)
12logger = logging.getLogger("bluesky_session_handler")
13
14# Load the environment variables
15import dotenv
16dotenv.load_dotenv(override=True)
17
18import yaml
19import json
20
21# Strip fields. A list of fields to remove from a JSON object
22STRIP_FIELDS = [
23 "cid",
24 "rev",
25 "did",
26 "uri",
27 "langs",
28 "threadgate",
29 "py_type",
30 "labels",
31 "avatar",
32 "viewer",
33 "indexed_at",
34 "tags",
35 "associated",
36 "thread_context",
37 "aspect_ratio",
38 "thumb",
39 "fullsize",
40 "root",
41 "created_at",
42 "verification",
43 "like_count",
44 "quote_count",
45 "reply_count",
46 "repost_count",
47 "embedding_disabled",
48 "thread_muted",
49 "reply_disabled",
50 "pinned",
51 "like",
52 "repost",
53 "blocked_by",
54 "blocking",
55 "blocking_by_list",
56 "followed_by",
57 "following",
58 "known_followers",
59 "muted",
60 "muted_by_list",
61 "root_author_like",
62 "entities",
63 "ref",
64 "mime_type",
65 "size",
66]
67def convert_to_basic_types(obj):
68 """Convert complex Python objects to basic types for JSON/YAML serialization."""
69 if hasattr(obj, '__dict__'):
70 # Convert objects with __dict__ to their dictionary representation
71 return convert_to_basic_types(obj.__dict__)
72 elif isinstance(obj, dict):
73 return {key: convert_to_basic_types(value) for key, value in obj.items()}
74 elif isinstance(obj, list):
75 return [convert_to_basic_types(item) for item in obj]
76 elif isinstance(obj, (str, int, float, bool)) or obj is None:
77 return obj
78 else:
79 # For other types, try to convert to string
80 return str(obj)
81
82
83def strip_fields(obj, strip_field_list):
84 """Recursively strip fields from a JSON object."""
85 if isinstance(obj, dict):
86 keys_flagged_for_removal = []
87
88 # Remove fields from strip list and pydantic metadata
89 for field in list(obj.keys()):
90 if field in strip_field_list or field.startswith("__"):
91 keys_flagged_for_removal.append(field)
92
93 # Remove flagged keys
94 for key in keys_flagged_for_removal:
95 obj.pop(key, None)
96
97 # Recursively process remaining values
98 for key, value in list(obj.items()):
99 obj[key] = strip_fields(value, strip_field_list)
100 # Remove empty/null values after processing
101 if (
102 obj[key] is None
103 or (isinstance(obj[key], dict) and len(obj[key]) == 0)
104 or (isinstance(obj[key], list) and len(obj[key]) == 0)
105 or (isinstance(obj[key], str) and obj[key].strip() == "")
106 ):
107 obj.pop(key, None)
108
109 elif isinstance(obj, list):
110 for i, value in enumerate(obj):
111 obj[i] = strip_fields(value, strip_field_list)
112 # Remove None values from list
113 obj[:] = [item for item in obj if item is not None]
114
115 return obj
116
117
118def flatten_thread_structure(thread_data):
119 """
120 Flatten a nested thread structure into a list while preserving all data.
121
122 Args:
123 thread_data: The thread data from get_post_thread
124
125 Returns:
126 Dict with 'posts' key containing a list of posts in chronological order
127 """
128 posts = []
129
130 def traverse_thread(node):
131 """Recursively traverse the thread structure to collect posts."""
132 if not node:
133 return
134
135 # If this node has a parent, traverse it first (to maintain chronological order)
136 if hasattr(node, 'parent') and node.parent:
137 traverse_thread(node.parent)
138
139 # Then add this node's post
140 if hasattr(node, 'post') and node.post:
141 # Convert to dict if needed to ensure we can process it
142 if hasattr(node.post, '__dict__'):
143 post_dict = node.post.__dict__.copy()
144 elif isinstance(node.post, dict):
145 post_dict = node.post.copy()
146 else:
147 post_dict = {}
148
149 posts.append(post_dict)
150
151 # Handle the thread structure
152 if hasattr(thread_data, 'thread'):
153 # Start from the main thread node
154 traverse_thread(thread_data.thread)
155 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__:
156 traverse_thread(thread_data.__dict__['thread'])
157
158 # Return a simple structure with posts list
159 return {'posts': posts}
160
161
162def count_thread_posts(thread):
163 """
164 Count the number of posts in a thread.
165
166 Args:
167 thread: The thread data from get_post_thread
168
169 Returns:
170 Integer count of posts in the thread
171 """
172 flattened = flatten_thread_structure(thread)
173 return len(flattened.get('posts', []))
174
175
176def thread_to_yaml_string(thread, strip_metadata=True):
177 """
178 Convert thread data to a YAML-formatted string for LLM parsing.
179
180 Args:
181 thread: The thread data from get_post_thread
182 strip_metadata: Whether to strip metadata fields for cleaner output
183
184 Returns:
185 YAML-formatted string representation of the thread
186 """
187 # First flatten the thread structure to avoid deep nesting
188 flattened = flatten_thread_structure(thread)
189
190 # Convert complex objects to basic types
191 basic_thread = convert_to_basic_types(flattened)
192
193 if strip_metadata:
194 # Create a copy and strip unwanted fields
195 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS)
196 else:
197 cleaned_thread = basic_thread
198
199 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False)
200
201
202
203
204
205
206
207def get_session(username: str) -> Optional[str]:
208 try:
209 with open(f"session_{username}.txt", encoding="UTF-8") as f:
210 return f.read()
211 except FileNotFoundError:
212 logger.debug(f"No existing session found for {username}")
213 return None
214
215def save_session(username: str, session_string: str) -> None:
216 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f:
217 f.write(session_string)
218 logger.debug(f"Session saved for {username}")
219
220def on_session_change(username: str, event: SessionEvent, session: Session) -> None:
221 logger.debug(f"Session changed: {event} {repr(session)}")
222 if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
223 logger.debug(f"Saving changed session for {username}")
224 save_session(username, session.export())
225
226def init_client(username: str, password: str) -> Client:
227 pds_uri = os.getenv("PDS_URI")
228 if pds_uri is None:
229 logger.warning(
230 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable."
231 )
232 pds_uri = "https://bsky.social"
233
234 # Print the PDS URI
235 logger.debug(f"Using PDS URI: {pds_uri}")
236
237 client = Client(pds_uri)
238 client.on_session_change(
239 lambda event, session: on_session_change(username, event, session)
240 )
241
242 session_string = get_session(username)
243 if session_string:
244 logger.debug(f"Reusing existing session for {username}")
245 client.login(session_string=session_string)
246 else:
247 logger.debug(f"Creating new session for {username}")
248 client.login(username, password)
249
250 return client
251
252
253def default_login() -> Client:
254 """Login using configuration from config.yaml or environment variables."""
255 try:
256 from config_loader import get_bluesky_config
257 bluesky_config = get_bluesky_config()
258
259 username = bluesky_config['username']
260 password = bluesky_config['password']
261 pds_uri = bluesky_config.get('pds_uri', 'https://bsky.social')
262
263 logger.info(f"Logging into Bluesky as {username} via {pds_uri}")
264
265 # Use pds_uri from config
266 client = Client(base_url=pds_uri)
267 client.login(username, password)
268 return client
269
270 except Exception as e:
271 logger.error(f"Failed to load Bluesky configuration: {e}")
272 logger.error("Please check your config.yaml file or environment variables")
273 exit(1)
274
275def remove_outside_quotes(text: str) -> str:
276 """
277 Remove outside double quotes from response text.
278
279 Only handles double quotes to avoid interfering with contractions:
280 - Double quotes: "text" → text
281 - Preserves single quotes and internal quotes
282
283 Args:
284 text: The text to process
285
286 Returns:
287 Text with outside double quotes removed
288 """
289 if not text or len(text) < 2:
290 return text
291
292 text = text.strip()
293
294 # Only remove double quotes from start and end
295 if text.startswith('"') and text.endswith('"'):
296 return text[1:-1]
297
298 return text
299
300def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None, correlation_id: Optional[str] = None) -> Dict[str, Any]:
301 """
302 Reply to a post on Bluesky with rich text support.
303
304 Args:
305 client: Authenticated Bluesky client
306 text: The reply text
307 reply_to_uri: The URI of the post being replied to (parent)
308 reply_to_cid: The CID of the post being replied to (parent)
309 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri
310 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid
311 lang: Language code for the post (e.g., 'en-US', 'es', 'ja')
312 correlation_id: Unique ID for tracking this message through the pipeline
313
314 Returns:
315 The response from sending the post
316 """
317 import re
318
319 # Generate correlation ID if not provided
320 if correlation_id is None:
321 correlation_id = str(uuid.uuid4())[:8]
322
323 # Enhanced logging with structured data
324 logger.info(f"[{correlation_id}] Starting reply_to_post", extra={
325 'correlation_id': correlation_id,
326 'text_length': len(text),
327 'text_preview': text[:100] + '...' if len(text) > 100 else text,
328 'reply_to_uri': reply_to_uri,
329 'root_uri': root_uri,
330 'lang': lang
331 })
332
333 start_time = time.time()
334
335 # If root is not provided, this is a reply to the root post
336 if root_uri is None:
337 root_uri = reply_to_uri
338 root_cid = reply_to_cid
339
340 # Create references for the reply
341 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid))
342 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid))
343
344 # Parse rich text facets (mentions and URLs)
345 facets = []
346 text_bytes = text.encode("UTF-8")
347 mentions_found = []
348 urls_found = []
349
350 logger.debug(f"[{correlation_id}] Parsing facets from text (length: {len(text_bytes)} bytes)")
351
352 # Parse mentions - fixed to handle @ at start of text
353 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)"
354
355 for m in re.finditer(mention_regex, text_bytes):
356 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix
357 mentions_found.append(handle)
358 # Adjust byte positions to account for the optional prefix
359 mention_start = m.start(1)
360 mention_end = m.end(1)
361 try:
362 # Resolve handle to DID using the API
363 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle})
364 if resolve_resp and hasattr(resolve_resp, 'did'):
365 facets.append(
366 models.AppBskyRichtextFacet.Main(
367 index=models.AppBskyRichtextFacet.ByteSlice(
368 byteStart=mention_start,
369 byteEnd=mention_end
370 ),
371 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)]
372 )
373 )
374 logger.debug(f"[{correlation_id}] Resolved mention @{handle} -> {resolve_resp.did}")
375 except Exception as e:
376 logger.warning(f"[{correlation_id}] Failed to resolve handle @{handle}: {e}")
377 continue
378
379 # Parse URLs - fixed to handle URLs at start of text
380 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)"
381
382 for m in re.finditer(url_regex, text_bytes):
383 url = m.group(1).decode("UTF-8")
384 urls_found.append(url)
385 # Adjust byte positions to account for the optional prefix
386 url_start = m.start(1)
387 url_end = m.end(1)
388 facets.append(
389 models.AppBskyRichtextFacet.Main(
390 index=models.AppBskyRichtextFacet.ByteSlice(
391 byteStart=url_start,
392 byteEnd=url_end
393 ),
394 features=[models.AppBskyRichtextFacet.Link(uri=url)]
395 )
396 )
397 logger.debug(f"[{correlation_id}] Found URL: {url}")
398
399 # Parse hashtags
400 hashtag_regex = rb"(?:^|[$|\s])#([a-zA-Z0-9_]+)"
401 hashtags_found = []
402
403 for m in re.finditer(hashtag_regex, text_bytes):
404 tag = m.group(1).decode("UTF-8") # Get tag without # prefix
405 hashtags_found.append(tag)
406 # Get byte positions for the entire hashtag including #
407 tag_start = m.start(0)
408 # Adjust start if there's a space/prefix
409 if text_bytes[tag_start:tag_start+1] in (b' ', b'$'):
410 tag_start += 1
411 tag_end = m.end(0)
412 facets.append(
413 models.AppBskyRichtextFacet.Main(
414 index=models.AppBskyRichtextFacet.ByteSlice(
415 byteStart=tag_start,
416 byteEnd=tag_end
417 ),
418 features=[models.AppBskyRichtextFacet.Tag(tag=tag)]
419 )
420 )
421 logger.debug(f"[{correlation_id}] Found hashtag: #{tag}")
422
423 logger.debug(f"[{correlation_id}] Facet parsing complete", extra={
424 'correlation_id': correlation_id,
425 'mentions_count': len(mentions_found),
426 'mentions': mentions_found,
427 'urls_count': len(urls_found),
428 'urls': urls_found,
429 'hashtags_count': len(hashtags_found),
430 'hashtags': hashtags_found,
431 'total_facets': len(facets)
432 })
433
434 # Send the reply with facets if any were found
435 logger.info(f"[{correlation_id}] Sending reply to Bluesky API", extra={
436 'correlation_id': correlation_id,
437 'has_facets': bool(facets),
438 'facet_count': len(facets),
439 'lang': lang
440 })
441
442 try:
443 if facets:
444 response = client.send_post(
445 text=text,
446 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
447 facets=facets,
448 langs=[lang] if lang else None
449 )
450 else:
451 response = client.send_post(
452 text=text,
453 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
454 langs=[lang] if lang else None
455 )
456
457 # Calculate response time
458 response_time = time.time() - start_time
459
460 # Extract post URL for user-friendly logging
461 post_url = None
462 if hasattr(response, 'uri') and response.uri:
463 # Convert AT-URI to web URL
464 # Format: at://did:plc:xxx/app.bsky.feed.post/xxx -> https://bsky.app/profile/handle/post/xxx
465 try:
466 uri_parts = response.uri.split('/')
467 if len(uri_parts) >= 4 and uri_parts[3] == 'app.bsky.feed.post':
468 rkey = uri_parts[4]
469 # We'd need to resolve DID to handle, but for now just use the URI
470 post_url = f"bsky://post/{rkey}"
471 except:
472 pass
473
474 logger.info(f"[{correlation_id}] Reply sent successfully ({response_time:.3f}s) - URI: {response.uri}" +
475 (f" - URL: {post_url}" if post_url else ""), extra={
476 'correlation_id': correlation_id,
477 'response_time': round(response_time, 3),
478 'post_uri': response.uri,
479 'post_url': post_url,
480 'post_cid': getattr(response, 'cid', None),
481 'text_length': len(text)
482 })
483
484 return response
485
486 except Exception as e:
487 response_time = time.time() - start_time
488 logger.error(f"[{correlation_id}] Failed to send reply", extra={
489 'correlation_id': correlation_id,
490 'error': str(e),
491 'error_type': type(e).__name__,
492 'response_time': round(response_time, 3),
493 'text_length': len(text)
494 })
495 raise
496
497
498def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]:
499 """
500 Get the thread containing a post to find root post information.
501
502 Args:
503 client: Authenticated Bluesky client
504 uri: The URI of the post
505
506 Returns:
507 The thread data or None if not found
508 """
509 try:
510 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10})
511 return thread
512 except Exception as e:
513 logger.error(f"Error fetching post thread: {e}")
514 return None
515
516
517def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
518 """
519 Reply to a notification (mention or reply).
520
521 Args:
522 client: Authenticated Bluesky client
523 notification: The notification object from list_notifications
524 reply_text: The text to reply with
525 lang: Language code for the post (defaults to "en-US")
526 correlation_id: Unique ID for tracking this message through the pipeline
527
528 Returns:
529 The response from sending the reply or None if failed
530 """
531 # Generate correlation ID if not provided
532 if correlation_id is None:
533 correlation_id = str(uuid.uuid4())[:8]
534
535 logger.info(f"[{correlation_id}] Processing reply_to_notification", extra={
536 'correlation_id': correlation_id,
537 'reply_length': len(reply_text),
538 'lang': lang
539 })
540
541 try:
542 # Get the post URI and CID from the notification (handle both dict and object)
543 if isinstance(notification, dict):
544 post_uri = notification.get('uri')
545 post_cid = notification.get('cid')
546 # Check if the notification record has reply info with root
547 record = notification.get('record', {})
548 reply_info = record.get('reply') if isinstance(record, dict) else None
549 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
550 post_uri = notification.uri
551 post_cid = notification.cid
552 # Check if the notification record has reply info with root
553 reply_info = None
554 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
555 reply_info = notification.record.reply
556 else:
557 post_uri = None
558 post_cid = None
559 reply_info = None
560
561 if not post_uri or not post_cid:
562 logger.error("Notification doesn't have required uri/cid fields")
563 return None
564
565 # Determine root: if post has reply info, use its root; otherwise this post IS the root
566 if reply_info:
567 # Extract root from the notification's reply structure
568 if isinstance(reply_info, dict):
569 root_ref = reply_info.get('root')
570 if root_ref and isinstance(root_ref, dict):
571 root_uri = root_ref.get('uri', post_uri)
572 root_cid = root_ref.get('cid', post_cid)
573 else:
574 # No root in reply info, use post as root
575 root_uri = post_uri
576 root_cid = post_cid
577 elif hasattr(reply_info, 'root'):
578 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
579 root_uri = reply_info.root.uri
580 root_cid = reply_info.root.cid
581 else:
582 root_uri = post_uri
583 root_cid = post_cid
584 else:
585 root_uri = post_uri
586 root_cid = post_cid
587 else:
588 # No reply info means this post IS the root
589 root_uri = post_uri
590 root_cid = post_cid
591
592 # Reply to the notification
593 return reply_to_post(
594 client=client,
595 text=reply_text,
596 reply_to_uri=post_uri,
597 reply_to_cid=post_cid,
598 root_uri=root_uri,
599 root_cid=root_cid,
600 lang=lang,
601 correlation_id=correlation_id
602 )
603
604 except Exception as e:
605 logger.error(f"[{correlation_id}] Error replying to notification: {e}", extra={
606 'correlation_id': correlation_id,
607 'error': str(e),
608 'error_type': type(e).__name__
609 })
610 return None
611
612
613def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[List[Dict[str, Any]]]:
614 """
615 Reply to a notification with a threaded chain of messages (max 15).
616
617 Args:
618 client: Authenticated Bluesky client
619 notification: The notification object from list_notifications
620 reply_messages: List of reply texts (max 15 messages, each max 300 chars)
621 lang: Language code for the posts (defaults to "en-US")
622 correlation_id: Unique ID for tracking this message through the pipeline
623
624 Returns:
625 List of responses from sending the replies or None if failed
626 """
627 # Generate correlation ID if not provided
628 if correlation_id is None:
629 correlation_id = str(uuid.uuid4())[:8]
630
631 logger.info(f"[{correlation_id}] Starting threaded reply", extra={
632 'correlation_id': correlation_id,
633 'message_count': len(reply_messages),
634 'total_length': sum(len(msg) for msg in reply_messages),
635 'lang': lang
636 })
637
638 try:
639 # Validate input
640 if not reply_messages or len(reply_messages) == 0:
641 logger.error(f"[{correlation_id}] Reply messages list cannot be empty")
642 return None
643 if len(reply_messages) > 15:
644 logger.error(f"[{correlation_id}] Cannot send more than 15 reply messages (got {len(reply_messages)})")
645 return None
646
647 # Get the post URI and CID from the notification (handle both dict and object)
648 if isinstance(notification, dict):
649 post_uri = notification.get('uri')
650 post_cid = notification.get('cid')
651 # Check if the notification record has reply info with root
652 record = notification.get('record', {})
653 reply_info = record.get('reply') if isinstance(record, dict) else None
654 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
655 post_uri = notification.uri
656 post_cid = notification.cid
657 # Check if the notification record has reply info with root
658 reply_info = None
659 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
660 reply_info = notification.record.reply
661 else:
662 post_uri = None
663 post_cid = None
664 reply_info = None
665
666 if not post_uri or not post_cid:
667 logger.error("Notification doesn't have required uri/cid fields")
668 return None
669
670 # Determine root: if post has reply info, use its root; otherwise this post IS the root
671 if reply_info:
672 # Extract root from the notification's reply structure
673 if isinstance(reply_info, dict):
674 root_ref = reply_info.get('root')
675 if root_ref and isinstance(root_ref, dict):
676 root_uri = root_ref.get('uri', post_uri)
677 root_cid = root_ref.get('cid', post_cid)
678 else:
679 # No root in reply info, use post as root
680 root_uri = post_uri
681 root_cid = post_cid
682 elif hasattr(reply_info, 'root'):
683 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
684 root_uri = reply_info.root.uri
685 root_cid = reply_info.root.cid
686 else:
687 root_uri = post_uri
688 root_cid = post_cid
689 else:
690 root_uri = post_uri
691 root_cid = post_cid
692 else:
693 # No reply info means this post IS the root
694 root_uri = post_uri
695 root_cid = post_cid
696
697 # Send replies in sequence, creating a thread
698 responses = []
699 current_parent_uri = post_uri
700 current_parent_cid = post_cid
701
702 for i, message in enumerate(reply_messages):
703 thread_correlation_id = f"{correlation_id}-{i+1}"
704 logger.info(f"[{thread_correlation_id}] Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...")
705
706 # Send this reply
707 response = reply_to_post(
708 client=client,
709 text=message,
710 reply_to_uri=current_parent_uri,
711 reply_to_cid=current_parent_cid,
712 root_uri=root_uri,
713 root_cid=root_cid,
714 lang=lang,
715 correlation_id=thread_correlation_id
716 )
717
718 if not response:
719 logger.error(f"[{thread_correlation_id}] Failed to send reply {i+1}, posting system failure message")
720 # Try to post a system failure message
721 failure_response = reply_to_post(
722 client=client,
723 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]",
724 reply_to_uri=current_parent_uri,
725 reply_to_cid=current_parent_cid,
726 root_uri=root_uri,
727 root_cid=root_cid,
728 lang=lang,
729 correlation_id=f"{thread_correlation_id}-FAIL"
730 )
731 if failure_response:
732 responses.append(failure_response)
733 current_parent_uri = failure_response.uri
734 current_parent_cid = failure_response.cid
735 else:
736 logger.error(f"[{thread_correlation_id}] Could not even send system failure message, stopping thread")
737 return responses if responses else None
738 else:
739 responses.append(response)
740 # Update parent references for next reply (if any)
741 if i < len(reply_messages) - 1: # Not the last message
742 current_parent_uri = response.uri
743 current_parent_cid = response.cid
744
745 logger.info(f"[{correlation_id}] Successfully sent {len(responses)} threaded replies", extra={
746 'correlation_id': correlation_id,
747 'replies_sent': len(responses),
748 'replies_requested': len(reply_messages)
749 })
750 return responses
751
752 except Exception as e:
753 logger.error(f"[{correlation_id}] Error sending threaded reply to notification: {e}", extra={
754 'correlation_id': correlation_id,
755 'error': str(e),
756 'error_type': type(e).__name__,
757 'message_count': len(reply_messages)
758 })
759 return None
760
761
762def create_synthesis_ack(client: Client, note: str) -> Optional[Dict[str, Any]]:
763 """
764 Create a stream.thought.ack record for synthesis without a target post.
765
766 This creates a synthesis acknowledgment with null subject field.
767
768 Args:
769 client: Authenticated Bluesky client
770 note: The synthesis note/content
771
772 Returns:
773 The response from creating the acknowledgment record or None if failed
774 """
775 try:
776 import requests
777 import json
778 from datetime import datetime, timezone
779
780 # Get session info from the client
781 access_token = None
782 user_did = None
783
784 # Try different ways to get the session info
785 if hasattr(client, '_session') and client._session:
786 access_token = client._session.access_jwt
787 user_did = client._session.did
788 elif hasattr(client, 'access_jwt'):
789 access_token = client.access_jwt
790 user_did = client.did if hasattr(client, 'did') else None
791 else:
792 logger.error("Cannot access client session information")
793 return None
794
795 if not access_token or not user_did:
796 logger.error("Missing access token or DID from session")
797 return None
798
799 # Get PDS URI from config instead of environment variables
800 from config_loader import get_bluesky_config
801 bluesky_config = get_bluesky_config()
802 pds_host = bluesky_config['pds_uri']
803
804 # Create acknowledgment record with null subject
805 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
806 ack_record = {
807 "$type": "stream.thought.ack",
808 "subject": None, # Null subject for synthesis
809 "createdAt": now,
810 "note": note
811 }
812
813 # Create the record
814 headers = {"Authorization": f"Bearer {access_token}"}
815 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
816
817 create_data = {
818 "repo": user_did,
819 "collection": "stream.thought.ack",
820 "record": ack_record
821 }
822
823 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
824 response.raise_for_status()
825 result = response.json()
826
827 logger.info(f"Successfully created synthesis acknowledgment")
828 return result
829
830 except Exception as e:
831 logger.error(f"Error creating synthesis acknowledgment: {e}")
832 return None
833
834
835def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]:
836 """
837 Create a stream.thought.ack record to acknowledge a post.
838
839 This creates a custom acknowledgment record instead of a standard Bluesky like,
840 allowing void to track which posts it has engaged with.
841
842 Args:
843 client: Authenticated Bluesky client
844 post_uri: The URI of the post to acknowledge
845 post_cid: The CID of the post to acknowledge
846 note: Optional note to attach to the acknowledgment
847
848 Returns:
849 The response from creating the acknowledgment record or None if failed
850 """
851 try:
852 import requests
853 import json
854 from datetime import datetime, timezone
855
856 # Get session info from the client
857 # The atproto Client stores the session differently
858 access_token = None
859 user_did = None
860
861 # Try different ways to get the session info
862 if hasattr(client, '_session') and client._session:
863 access_token = client._session.access_jwt
864 user_did = client._session.did
865 elif hasattr(client, 'access_jwt'):
866 access_token = client.access_jwt
867 user_did = client.did if hasattr(client, 'did') else None
868 else:
869 logger.error("Cannot access client session information")
870 return None
871
872 if not access_token or not user_did:
873 logger.error("Missing access token or DID from session")
874 return None
875
876 # Get PDS URI from config instead of environment variables
877 from config_loader import get_bluesky_config
878 bluesky_config = get_bluesky_config()
879 pds_host = bluesky_config['pds_uri']
880
881 # Create acknowledgment record with stream.thought.ack type
882 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
883 ack_record = {
884 "$type": "stream.thought.ack",
885 "subject": {
886 "uri": post_uri,
887 "cid": post_cid
888 },
889 "createdAt": now,
890 "note": note # Will be null if no note provided
891 }
892
893 # Create the record
894 headers = {"Authorization": f"Bearer {access_token}"}
895 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
896
897 create_data = {
898 "repo": user_did,
899 "collection": "stream.thought.ack",
900 "record": ack_record
901 }
902
903 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
904 response.raise_for_status()
905 result = response.json()
906
907 logger.info(f"Successfully acknowledged post: {post_uri}")
908 return result
909
910 except Exception as e:
911 logger.error(f"Error acknowledging post: {e}")
912 return None
913
914
915def create_tool_call_record(client: Client, tool_name: str, arguments: str, tool_call_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
916 """
917 Create a stream.thought.tool_call record to track tool usage.
918
919 This creates a record of tool calls made by void during processing,
920 allowing for analysis of tool usage patterns and debugging.
921
922 Args:
923 client: Authenticated Bluesky client
924 tool_name: Name of the tool being called
925 arguments: Raw JSON string of the tool arguments
926 tool_call_id: Optional ID of the tool call for correlation
927
928 Returns:
929 The response from creating the tool call record or None if failed
930 """
931 try:
932 import requests
933 import json
934 from datetime import datetime, timezone
935
936 # Get session info from the client
937 access_token = None
938 user_did = None
939
940 # Try different ways to get the session info
941 if hasattr(client, '_session') and client._session:
942 access_token = client._session.access_jwt
943 user_did = client._session.did
944 elif hasattr(client, 'access_jwt'):
945 access_token = client.access_jwt
946 user_did = client.did if hasattr(client, 'did') else None
947 else:
948 logger.error("Cannot access client session information")
949 return None
950
951 if not access_token or not user_did:
952 logger.error("Missing access token or DID from session")
953 return None
954
955 # Get PDS URI from config instead of environment variables
956 from config_loader import get_bluesky_config
957 bluesky_config = get_bluesky_config()
958 pds_host = bluesky_config['pds_uri']
959
960 # Create tool call record
961 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
962 tool_record = {
963 "$type": "stream.thought.tool.call",
964 "tool_name": tool_name,
965 "arguments": arguments, # Store as string to avoid parsing issues
966 "createdAt": now
967 }
968
969 # Add tool_call_id if provided
970 if tool_call_id:
971 tool_record["tool_call_id"] = tool_call_id
972
973 # Create the record
974 headers = {"Authorization": f"Bearer {access_token}"}
975 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
976
977 create_data = {
978 "repo": user_did,
979 "collection": "stream.thought.tool.call",
980 "record": tool_record
981 }
982
983 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
984 if response.status_code != 200:
985 logger.error(f"Tool call record creation failed: {response.status_code} - {response.text}")
986 response.raise_for_status()
987 result = response.json()
988
989 logger.debug(f"Successfully recorded tool call: {tool_name}")
990 return result
991
992 except Exception as e:
993 logger.error(f"Error creating tool call record: {e}")
994 return None
995
996
997def create_reasoning_record(client: Client, reasoning_text: str) -> Optional[Dict[str, Any]]:
998 """
999 Create a stream.thought.reasoning record to track agent reasoning.
1000
1001 This creates a record of void's reasoning during message processing,
1002 providing transparency into the decision-making process.
1003
1004 Args:
1005 client: Authenticated Bluesky client
1006 reasoning_text: The reasoning text from the agent
1007
1008 Returns:
1009 The response from creating the reasoning record or None if failed
1010 """
1011 try:
1012 import requests
1013 import json
1014 from datetime import datetime, timezone
1015
1016 # Get session info from the client
1017 access_token = None
1018 user_did = None
1019
1020 # Try different ways to get the session info
1021 if hasattr(client, '_session') and client._session:
1022 access_token = client._session.access_jwt
1023 user_did = client._session.did
1024 elif hasattr(client, 'access_jwt'):
1025 access_token = client.access_jwt
1026 user_did = client.did if hasattr(client, 'did') else None
1027 else:
1028 logger.error("Cannot access client session information")
1029 return None
1030
1031 if not access_token or not user_did:
1032 logger.error("Missing access token or DID from session")
1033 return None
1034
1035 # Get PDS URI from config instead of environment variables
1036 from config_loader import get_bluesky_config
1037 bluesky_config = get_bluesky_config()
1038 pds_host = bluesky_config['pds_uri']
1039
1040 # Create reasoning record
1041 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
1042 reasoning_record = {
1043 "$type": "stream.thought.reasoning",
1044 "reasoning": reasoning_text,
1045 "createdAt": now
1046 }
1047
1048 # Create the record
1049 headers = {"Authorization": f"Bearer {access_token}"}
1050 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
1051
1052 create_data = {
1053 "repo": user_did,
1054 "collection": "stream.thought.reasoning",
1055 "record": reasoning_record
1056 }
1057
1058 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
1059 response.raise_for_status()
1060 result = response.json()
1061
1062 logger.debug(f"Successfully recorded reasoning (length: {len(reasoning_text)} chars)")
1063 return result
1064
1065 except Exception as e:
1066 logger.error(f"Error creating reasoning record: {e}")
1067 return None
1068
1069
1070def create_memory_record(client: Client, content: str, tags: Optional[List[str]] = None) -> Optional[Dict[str, Any]]:
1071 """
1072 Create a stream.thought.memory record to store archival memory insertions.
1073
1074 This creates a record of archival_memory_insert tool calls, preserving
1075 important memories and context in the AT Protocol.
1076
1077 Args:
1078 client: Authenticated Bluesky client
1079 content: The memory content being archived
1080 tags: Optional list of tags associated with this memory
1081
1082 Returns:
1083 The response from creating the memory record or None if failed
1084 """
1085 try:
1086 import requests
1087 import json
1088 from datetime import datetime, timezone
1089
1090 # Get session info from the client
1091 access_token = None
1092 user_did = None
1093
1094 # Try different ways to get the session info
1095 if hasattr(client, '_session') and client._session:
1096 access_token = client._session.access_jwt
1097 user_did = client._session.did
1098 elif hasattr(client, 'access_jwt'):
1099 access_token = client.access_jwt
1100 user_did = client.did if hasattr(client, 'did') else None
1101 else:
1102 logger.error("Cannot access client session information")
1103 return None
1104
1105 if not access_token or not user_did:
1106 logger.error("Missing access token or DID from session")
1107 return None
1108
1109 # Get PDS URI from config instead of environment variables
1110 from config_loader import get_bluesky_config
1111 bluesky_config = get_bluesky_config()
1112 pds_host = bluesky_config['pds_uri']
1113
1114 # Create memory record
1115 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
1116 memory_record = {
1117 "$type": "stream.thought.memory",
1118 "content": content,
1119 "createdAt": now
1120 }
1121
1122 # Add tags if provided (can be null)
1123 if tags is not None:
1124 memory_record["tags"] = tags
1125
1126 # Create the record
1127 headers = {"Authorization": f"Bearer {access_token}"}
1128 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
1129
1130 create_data = {
1131 "repo": user_did,
1132 "collection": "stream.thought.memory",
1133 "record": memory_record
1134 }
1135
1136 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
1137 response.raise_for_status()
1138 result = response.json()
1139
1140 tags_info = f" with {len(tags)} tags" if tags else " (no tags)"
1141 logger.debug(f"Successfully recorded memory (length: {len(content)} chars{tags_info})")
1142 return result
1143
1144 except Exception as e:
1145 logger.error(f"Error creating memory record: {e}")
1146 return None
1147
1148
1149def sync_followers(client: Client, dry_run: bool = False) -> Dict[str, Any]:
1150 """
1151 Check who is following the bot and who the bot is following,
1152 then follow back users who aren't already followed.
1153
1154 This implements the autofollow feature by creating follow records
1155 (app.bsky.graph.follow) for users who follow the bot.
1156
1157 Args:
1158 client: Authenticated Bluesky client
1159 dry_run: If True, only report what would be done without actually following
1160
1161 Returns:
1162 Dict with stats: {
1163 'followers_count': int,
1164 'following_count': int,
1165 'to_follow': List[str], # List of handles to follow
1166 'newly_followed': List[str], # List of handles actually followed (empty if dry_run)
1167 'errors': List[str] # Any errors encountered
1168 }
1169 """
1170 try:
1171 from datetime import datetime, timezone
1172
1173 # Get session info from the client
1174 access_token = None
1175 user_did = None
1176
1177 if hasattr(client, '_session') and client._session:
1178 access_token = client._session.access_jwt
1179 user_did = client._session.did
1180 elif hasattr(client, 'access_jwt'):
1181 access_token = client.access_jwt
1182 user_did = client.did if hasattr(client, 'did') else None
1183 else:
1184 logger.error("Cannot access client session information")
1185 return {'error': 'Cannot access client session'}
1186
1187 if not access_token or not user_did:
1188 logger.error("Missing access token or DID from session")
1189 return {'error': 'Missing access token or DID'}
1190
1191 # Get PDS URI from config
1192 from config_loader import get_bluesky_config
1193 bluesky_config = get_bluesky_config()
1194 pds_host = bluesky_config['pds_uri']
1195
1196 # Get followers using the API
1197 followers_response = client.app.bsky.graph.get_followers({'actor': user_did})
1198 followers = followers_response.followers if hasattr(followers_response, 'followers') else []
1199 follower_dids = {f.did for f in followers}
1200
1201 # Get following using the API
1202 following_response = client.app.bsky.graph.get_follows({'actor': user_did})
1203 following = following_response.follows if hasattr(following_response, 'follows') else []
1204 following_dids = {f.did for f in following}
1205
1206 # Find users who follow us but we don't follow back
1207 to_follow_dids = follower_dids - following_dids
1208
1209 # Build result object
1210 result = {
1211 'followers_count': len(followers),
1212 'following_count': len(following),
1213 'to_follow': [],
1214 'newly_followed': [],
1215 'errors': []
1216 }
1217
1218 # Get handles for users to follow
1219 to_follow_handles = []
1220 for follower in followers:
1221 if follower.did in to_follow_dids:
1222 handle = follower.handle if hasattr(follower, 'handle') else follower.did
1223 to_follow_handles.append(handle)
1224 result['to_follow'].append(handle)
1225
1226 logger.info(f"Follower sync: {len(followers)} followers, {len(following)} following, {len(to_follow_dids)} to follow")
1227
1228 # If dry run, just return the stats
1229 if dry_run:
1230 logger.info(f"Dry run - would follow: {', '.join(to_follow_handles)}")
1231 return result
1232
1233 # Actually follow the users with rate limiting
1234 import requests
1235 headers = {"Authorization": f"Bearer {access_token}"}
1236 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
1237
1238 for i, did in enumerate(to_follow_dids):
1239 try:
1240 # Rate limiting: wait 2 seconds between follows to avoid spamming the server
1241 if i > 0:
1242 time.sleep(2)
1243
1244 # Create follow record
1245 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
1246 follow_record = {
1247 "$type": "app.bsky.graph.follow",
1248 "subject": did,
1249 "createdAt": now
1250 }
1251
1252 create_data = {
1253 "repo": user_did,
1254 "collection": "app.bsky.graph.follow",
1255 "record": follow_record
1256 }
1257
1258 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
1259 response.raise_for_status()
1260
1261 # Find handle for this DID
1262 handle = next((f.handle for f in followers if f.did == did), did)
1263 result['newly_followed'].append(handle)
1264 logger.info(f"Followed: {handle}")
1265
1266 except Exception as e:
1267 error_msg = f"Failed to follow {did}: {e}"
1268 logger.error(error_msg)
1269 result['errors'].append(error_msg)
1270
1271 return result
1272
1273 except Exception as e:
1274 logger.error(f"Error syncing followers: {e}")
1275 return {'error': str(e)}
1276
1277
1278if __name__ == "__main__":
1279 client = default_login()
1280 # do something with the client
1281 logger.info("Client is ready to use!")