a digital person for bluesky
1import os
2import logging
3import uuid
4import time
5from typing import Optional, Dict, Any, List
6from atproto_client import Client, Session, SessionEvent, models
7
8# Configure logging
9logging.basicConfig(
10 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
11)
12logger = logging.getLogger("bluesky_session_handler")
13
14# Load the environment variables
15import dotenv
16dotenv.load_dotenv(override=True)
17
18import yaml
19import json
20
21# Strip fields. A list of fields to remove from a JSON object
22STRIP_FIELDS = [
23 "cid",
24 "rev",
25 "did",
26 "uri",
27 "langs",
28 "threadgate",
29 "py_type",
30 "labels",
31 "avatar",
32 "viewer",
33 "indexed_at",
34 "tags",
35 "associated",
36 "thread_context",
37 "aspect_ratio",
38 "thumb",
39 "fullsize",
40 "root",
41 "created_at",
42 "verification",
43 "like_count",
44 "quote_count",
45 "reply_count",
46 "repost_count",
47 "embedding_disabled",
48 "thread_muted",
49 "reply_disabled",
50 "pinned",
51 "like",
52 "repost",
53 "blocked_by",
54 "blocking",
55 "blocking_by_list",
56 "followed_by",
57 "following",
58 "known_followers",
59 "muted",
60 "muted_by_list",
61 "root_author_like",
62 "entities",
63 "ref",
64 "mime_type",
65 "size",
66]
67def convert_to_basic_types(obj):
68 """Convert complex Python objects to basic types for JSON/YAML serialization."""
69 if hasattr(obj, '__dict__'):
70 # Convert objects with __dict__ to their dictionary representation
71 return convert_to_basic_types(obj.__dict__)
72 elif isinstance(obj, dict):
73 return {key: convert_to_basic_types(value) for key, value in obj.items()}
74 elif isinstance(obj, list):
75 return [convert_to_basic_types(item) for item in obj]
76 elif isinstance(obj, (str, int, float, bool)) or obj is None:
77 return obj
78 else:
79 # For other types, try to convert to string
80 return str(obj)
81
82
83def strip_fields(obj, strip_field_list):
84 """Recursively strip fields from a JSON object."""
85 if isinstance(obj, dict):
86 keys_flagged_for_removal = []
87
88 # Remove fields from strip list and pydantic metadata
89 for field in list(obj.keys()):
90 if field in strip_field_list or field.startswith("__"):
91 keys_flagged_for_removal.append(field)
92
93 # Remove flagged keys
94 for key in keys_flagged_for_removal:
95 obj.pop(key, None)
96
97 # Recursively process remaining values
98 for key, value in list(obj.items()):
99 obj[key] = strip_fields(value, strip_field_list)
100 # Remove empty/null values after processing
101 if (
102 obj[key] is None
103 or (isinstance(obj[key], dict) and len(obj[key]) == 0)
104 or (isinstance(obj[key], list) and len(obj[key]) == 0)
105 or (isinstance(obj[key], str) and obj[key].strip() == "")
106 ):
107 obj.pop(key, None)
108
109 elif isinstance(obj, list):
110 for i, value in enumerate(obj):
111 obj[i] = strip_fields(value, strip_field_list)
112 # Remove None values from list
113 obj[:] = [item for item in obj if item is not None]
114
115 return obj
116
117
118def flatten_thread_structure(thread_data):
119 """
120 Flatten a nested thread structure into a list while preserving all data.
121
122 Args:
123 thread_data: The thread data from get_post_thread
124
125 Returns:
126 Dict with 'posts' key containing a list of posts in chronological order
127 """
128 posts = []
129
130 def traverse_thread(node):
131 """Recursively traverse the thread structure to collect posts."""
132 if not node:
133 return
134
135 # If this node has a parent, traverse it first (to maintain chronological order)
136 if hasattr(node, 'parent') and node.parent:
137 traverse_thread(node.parent)
138
139 # Then add this node's post
140 if hasattr(node, 'post') and node.post:
141 # Convert to dict if needed to ensure we can process it
142 if hasattr(node.post, '__dict__'):
143 post_dict = node.post.__dict__.copy()
144 elif isinstance(node.post, dict):
145 post_dict = node.post.copy()
146 else:
147 post_dict = {}
148
149 posts.append(post_dict)
150
151 # Handle the thread structure
152 if hasattr(thread_data, 'thread'):
153 # Start from the main thread node
154 traverse_thread(thread_data.thread)
155 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__:
156 traverse_thread(thread_data.__dict__['thread'])
157
158 # Return a simple structure with posts list
159 return {'posts': posts}
160
161
162def thread_to_yaml_string(thread, strip_metadata=True):
163 """
164 Convert thread data to a YAML-formatted string for LLM parsing.
165
166 Args:
167 thread: The thread data from get_post_thread
168 strip_metadata: Whether to strip metadata fields for cleaner output
169
170 Returns:
171 YAML-formatted string representation of the thread
172 """
173 # First flatten the thread structure to avoid deep nesting
174 flattened = flatten_thread_structure(thread)
175
176 # Convert complex objects to basic types
177 basic_thread = convert_to_basic_types(flattened)
178
179 if strip_metadata:
180 # Create a copy and strip unwanted fields
181 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS)
182 else:
183 cleaned_thread = basic_thread
184
185 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False)
186
187
188
189
190
191
192
193def get_session(username: str) -> Optional[str]:
194 try:
195 with open(f"session_{username}.txt", encoding="UTF-8") as f:
196 return f.read()
197 except FileNotFoundError:
198 logger.debug(f"No existing session found for {username}")
199 return None
200
201def save_session(username: str, session_string: str) -> None:
202 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f:
203 f.write(session_string)
204 logger.debug(f"Session saved for {username}")
205
206def on_session_change(username: str, event: SessionEvent, session: Session) -> None:
207 logger.debug(f"Session changed: {event} {repr(session)}")
208 if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
209 logger.debug(f"Saving changed session for {username}")
210 save_session(username, session.export())
211
212def init_client(username: str, password: str) -> Client:
213 pds_uri = os.getenv("PDS_URI")
214 if pds_uri is None:
215 logger.warning(
216 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable."
217 )
218 pds_uri = "https://bsky.social"
219
220 # Print the PDS URI
221 logger.debug(f"Using PDS URI: {pds_uri}")
222
223 client = Client(pds_uri)
224 client.on_session_change(
225 lambda event, session: on_session_change(username, event, session)
226 )
227
228 session_string = get_session(username)
229 if session_string:
230 logger.debug(f"Reusing existing session for {username}")
231 client.login(session_string=session_string)
232 else:
233 logger.debug(f"Creating new session for {username}")
234 client.login(username, password)
235
236 return client
237
238
239def default_login() -> Client:
240 """Login using configuration from config.yaml or environment variables."""
241 try:
242 from config_loader import get_bluesky_config
243 bluesky_config = get_bluesky_config()
244
245 username = bluesky_config['username']
246 password = bluesky_config['password']
247 pds_uri = bluesky_config.get('pds_uri', 'https://bsky.social')
248
249 logger.info(f"Logging into Bluesky as {username} via {pds_uri}")
250
251 # Use pds_uri from config
252 client = Client(base_url=pds_uri)
253 client.login(username, password)
254 return client
255
256 except Exception as e:
257 logger.error(f"Failed to load Bluesky configuration: {e}")
258 logger.error("Please check your config.yaml file or environment variables")
259 exit(1)
260
261def remove_outside_quotes(text: str) -> str:
262 """
263 Remove outside double quotes from response text.
264
265 Only handles double quotes to avoid interfering with contractions:
266 - Double quotes: "text" → text
267 - Preserves single quotes and internal quotes
268
269 Args:
270 text: The text to process
271
272 Returns:
273 Text with outside double quotes removed
274 """
275 if not text or len(text) < 2:
276 return text
277
278 text = text.strip()
279
280 # Only remove double quotes from start and end
281 if text.startswith('"') and text.endswith('"'):
282 return text[1:-1]
283
284 return text
285
286def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None, correlation_id: Optional[str] = None) -> Dict[str, Any]:
287 """
288 Reply to a post on Bluesky with rich text support.
289
290 Args:
291 client: Authenticated Bluesky client
292 text: The reply text
293 reply_to_uri: The URI of the post being replied to (parent)
294 reply_to_cid: The CID of the post being replied to (parent)
295 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri
296 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid
297 lang: Language code for the post (e.g., 'en-US', 'es', 'ja')
298 correlation_id: Unique ID for tracking this message through the pipeline
299
300 Returns:
301 The response from sending the post
302 """
303 import re
304
305 # Generate correlation ID if not provided
306 if correlation_id is None:
307 correlation_id = str(uuid.uuid4())[:8]
308
309 # Enhanced logging with structured data
310 logger.info(f"[{correlation_id}] Starting reply_to_post", extra={
311 'correlation_id': correlation_id,
312 'text_length': len(text),
313 'text_preview': text[:100] + '...' if len(text) > 100 else text,
314 'reply_to_uri': reply_to_uri,
315 'root_uri': root_uri,
316 'lang': lang
317 })
318
319 start_time = time.time()
320
321 # If root is not provided, this is a reply to the root post
322 if root_uri is None:
323 root_uri = reply_to_uri
324 root_cid = reply_to_cid
325
326 # Create references for the reply
327 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid))
328 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid))
329
330 # Parse rich text facets (mentions and URLs)
331 facets = []
332 text_bytes = text.encode("UTF-8")
333 mentions_found = []
334 urls_found = []
335
336 logger.debug(f"[{correlation_id}] Parsing facets from text (length: {len(text_bytes)} bytes)")
337
338 # Parse mentions - fixed to handle @ at start of text
339 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)"
340
341 for m in re.finditer(mention_regex, text_bytes):
342 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix
343 mentions_found.append(handle)
344 # Adjust byte positions to account for the optional prefix
345 mention_start = m.start(1)
346 mention_end = m.end(1)
347 try:
348 # Resolve handle to DID using the API
349 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle})
350 if resolve_resp and hasattr(resolve_resp, 'did'):
351 facets.append(
352 models.AppBskyRichtextFacet.Main(
353 index=models.AppBskyRichtextFacet.ByteSlice(
354 byteStart=mention_start,
355 byteEnd=mention_end
356 ),
357 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)]
358 )
359 )
360 logger.debug(f"[{correlation_id}] Resolved mention @{handle} -> {resolve_resp.did}")
361 except Exception as e:
362 logger.warning(f"[{correlation_id}] Failed to resolve handle @{handle}: {e}")
363 continue
364
365 # Parse URLs - fixed to handle URLs at start of text
366 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)"
367
368 for m in re.finditer(url_regex, text_bytes):
369 url = m.group(1).decode("UTF-8")
370 urls_found.append(url)
371 # Adjust byte positions to account for the optional prefix
372 url_start = m.start(1)
373 url_end = m.end(1)
374 facets.append(
375 models.AppBskyRichtextFacet.Main(
376 index=models.AppBskyRichtextFacet.ByteSlice(
377 byteStart=url_start,
378 byteEnd=url_end
379 ),
380 features=[models.AppBskyRichtextFacet.Link(uri=url)]
381 )
382 )
383 logger.debug(f"[{correlation_id}] Found URL: {url}")
384
385 logger.debug(f"[{correlation_id}] Facet parsing complete", extra={
386 'correlation_id': correlation_id,
387 'mentions_count': len(mentions_found),
388 'mentions': mentions_found,
389 'urls_count': len(urls_found),
390 'urls': urls_found,
391 'total_facets': len(facets)
392 })
393
394 # Send the reply with facets if any were found
395 logger.info(f"[{correlation_id}] Sending reply to Bluesky API", extra={
396 'correlation_id': correlation_id,
397 'has_facets': bool(facets),
398 'facet_count': len(facets),
399 'lang': lang
400 })
401
402 try:
403 if facets:
404 response = client.send_post(
405 text=text,
406 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
407 facets=facets,
408 langs=[lang] if lang else None
409 )
410 else:
411 response = client.send_post(
412 text=text,
413 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
414 langs=[lang] if lang else None
415 )
416
417 # Calculate response time
418 response_time = time.time() - start_time
419
420 # Extract post URL for user-friendly logging
421 post_url = None
422 if hasattr(response, 'uri') and response.uri:
423 # Convert AT-URI to web URL
424 # Format: at://did:plc:xxx/app.bsky.feed.post/xxx -> https://bsky.app/profile/handle/post/xxx
425 try:
426 uri_parts = response.uri.split('/')
427 if len(uri_parts) >= 4 and uri_parts[3] == 'app.bsky.feed.post':
428 rkey = uri_parts[4]
429 # We'd need to resolve DID to handle, but for now just use the URI
430 post_url = f"bsky://post/{rkey}"
431 except:
432 pass
433
434 logger.info(f"[{correlation_id}] Reply sent successfully ({response_time:.3f}s) - URI: {response.uri}" +
435 (f" - URL: {post_url}" if post_url else ""), extra={
436 'correlation_id': correlation_id,
437 'response_time': round(response_time, 3),
438 'post_uri': response.uri,
439 'post_url': post_url,
440 'post_cid': getattr(response, 'cid', None),
441 'text_length': len(text)
442 })
443
444 return response
445
446 except Exception as e:
447 response_time = time.time() - start_time
448 logger.error(f"[{correlation_id}] Failed to send reply", extra={
449 'correlation_id': correlation_id,
450 'error': str(e),
451 'error_type': type(e).__name__,
452 'response_time': round(response_time, 3),
453 'text_length': len(text)
454 })
455 raise
456
457
458def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]:
459 """
460 Get the thread containing a post to find root post information.
461
462 Args:
463 client: Authenticated Bluesky client
464 uri: The URI of the post
465
466 Returns:
467 The thread data or None if not found
468 """
469 try:
470 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10})
471 return thread
472 except Exception as e:
473 logger.error(f"Error fetching post thread: {e}")
474 return None
475
476
477def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
478 """
479 Reply to a notification (mention or reply).
480
481 Args:
482 client: Authenticated Bluesky client
483 notification: The notification object from list_notifications
484 reply_text: The text to reply with
485 lang: Language code for the post (defaults to "en-US")
486 correlation_id: Unique ID for tracking this message through the pipeline
487
488 Returns:
489 The response from sending the reply or None if failed
490 """
491 # Generate correlation ID if not provided
492 if correlation_id is None:
493 correlation_id = str(uuid.uuid4())[:8]
494
495 logger.info(f"[{correlation_id}] Processing reply_to_notification", extra={
496 'correlation_id': correlation_id,
497 'reply_length': len(reply_text),
498 'lang': lang
499 })
500
501 try:
502 # Get the post URI and CID from the notification (handle both dict and object)
503 if isinstance(notification, dict):
504 post_uri = notification.get('uri')
505 post_cid = notification.get('cid')
506 # Check if the notification record has reply info with root
507 record = notification.get('record', {})
508 reply_info = record.get('reply') if isinstance(record, dict) else None
509 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
510 post_uri = notification.uri
511 post_cid = notification.cid
512 # Check if the notification record has reply info with root
513 reply_info = None
514 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
515 reply_info = notification.record.reply
516 else:
517 post_uri = None
518 post_cid = None
519 reply_info = None
520
521 if not post_uri or not post_cid:
522 logger.error("Notification doesn't have required uri/cid fields")
523 return None
524
525 # Determine root: if post has reply info, use its root; otherwise this post IS the root
526 if reply_info:
527 # Extract root from the notification's reply structure
528 if isinstance(reply_info, dict):
529 root_ref = reply_info.get('root')
530 if root_ref and isinstance(root_ref, dict):
531 root_uri = root_ref.get('uri', post_uri)
532 root_cid = root_ref.get('cid', post_cid)
533 else:
534 # No root in reply info, use post as root
535 root_uri = post_uri
536 root_cid = post_cid
537 elif hasattr(reply_info, 'root'):
538 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
539 root_uri = reply_info.root.uri
540 root_cid = reply_info.root.cid
541 else:
542 root_uri = post_uri
543 root_cid = post_cid
544 else:
545 root_uri = post_uri
546 root_cid = post_cid
547 else:
548 # No reply info means this post IS the root
549 root_uri = post_uri
550 root_cid = post_cid
551
552 # Reply to the notification
553 return reply_to_post(
554 client=client,
555 text=reply_text,
556 reply_to_uri=post_uri,
557 reply_to_cid=post_cid,
558 root_uri=root_uri,
559 root_cid=root_cid,
560 lang=lang,
561 correlation_id=correlation_id
562 )
563
564 except Exception as e:
565 logger.error(f"[{correlation_id}] Error replying to notification: {e}", extra={
566 'correlation_id': correlation_id,
567 'error': str(e),
568 'error_type': type(e).__name__
569 })
570 return None
571
572
573def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[List[Dict[str, Any]]]:
574 """
575 Reply to a notification with a threaded chain of messages (max 15).
576
577 Args:
578 client: Authenticated Bluesky client
579 notification: The notification object from list_notifications
580 reply_messages: List of reply texts (max 15 messages, each max 300 chars)
581 lang: Language code for the posts (defaults to "en-US")
582 correlation_id: Unique ID for tracking this message through the pipeline
583
584 Returns:
585 List of responses from sending the replies or None if failed
586 """
587 # Generate correlation ID if not provided
588 if correlation_id is None:
589 correlation_id = str(uuid.uuid4())[:8]
590
591 logger.info(f"[{correlation_id}] Starting threaded reply", extra={
592 'correlation_id': correlation_id,
593 'message_count': len(reply_messages),
594 'total_length': sum(len(msg) for msg in reply_messages),
595 'lang': lang
596 })
597
598 try:
599 # Validate input
600 if not reply_messages or len(reply_messages) == 0:
601 logger.error(f"[{correlation_id}] Reply messages list cannot be empty")
602 return None
603 if len(reply_messages) > 15:
604 logger.error(f"[{correlation_id}] Cannot send more than 15 reply messages (got {len(reply_messages)})")
605 return None
606
607 # Get the post URI and CID from the notification (handle both dict and object)
608 if isinstance(notification, dict):
609 post_uri = notification.get('uri')
610 post_cid = notification.get('cid')
611 # Check if the notification record has reply info with root
612 record = notification.get('record', {})
613 reply_info = record.get('reply') if isinstance(record, dict) else None
614 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
615 post_uri = notification.uri
616 post_cid = notification.cid
617 # Check if the notification record has reply info with root
618 reply_info = None
619 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
620 reply_info = notification.record.reply
621 else:
622 post_uri = None
623 post_cid = None
624 reply_info = None
625
626 if not post_uri or not post_cid:
627 logger.error("Notification doesn't have required uri/cid fields")
628 return None
629
630 # Determine root: if post has reply info, use its root; otherwise this post IS the root
631 if reply_info:
632 # Extract root from the notification's reply structure
633 if isinstance(reply_info, dict):
634 root_ref = reply_info.get('root')
635 if root_ref and isinstance(root_ref, dict):
636 root_uri = root_ref.get('uri', post_uri)
637 root_cid = root_ref.get('cid', post_cid)
638 else:
639 # No root in reply info, use post as root
640 root_uri = post_uri
641 root_cid = post_cid
642 elif hasattr(reply_info, 'root'):
643 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
644 root_uri = reply_info.root.uri
645 root_cid = reply_info.root.cid
646 else:
647 root_uri = post_uri
648 root_cid = post_cid
649 else:
650 root_uri = post_uri
651 root_cid = post_cid
652 else:
653 # No reply info means this post IS the root
654 root_uri = post_uri
655 root_cid = post_cid
656
657 # Send replies in sequence, creating a thread
658 responses = []
659 current_parent_uri = post_uri
660 current_parent_cid = post_cid
661
662 for i, message in enumerate(reply_messages):
663 thread_correlation_id = f"{correlation_id}-{i+1}"
664 logger.info(f"[{thread_correlation_id}] Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...")
665
666 # Send this reply
667 response = reply_to_post(
668 client=client,
669 text=message,
670 reply_to_uri=current_parent_uri,
671 reply_to_cid=current_parent_cid,
672 root_uri=root_uri,
673 root_cid=root_cid,
674 lang=lang,
675 correlation_id=thread_correlation_id
676 )
677
678 if not response:
679 logger.error(f"[{thread_correlation_id}] Failed to send reply {i+1}, posting system failure message")
680 # Try to post a system failure message
681 failure_response = reply_to_post(
682 client=client,
683 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]",
684 reply_to_uri=current_parent_uri,
685 reply_to_cid=current_parent_cid,
686 root_uri=root_uri,
687 root_cid=root_cid,
688 lang=lang,
689 correlation_id=f"{thread_correlation_id}-FAIL"
690 )
691 if failure_response:
692 responses.append(failure_response)
693 current_parent_uri = failure_response.uri
694 current_parent_cid = failure_response.cid
695 else:
696 logger.error(f"[{thread_correlation_id}] Could not even send system failure message, stopping thread")
697 return responses if responses else None
698 else:
699 responses.append(response)
700 # Update parent references for next reply (if any)
701 if i < len(reply_messages) - 1: # Not the last message
702 current_parent_uri = response.uri
703 current_parent_cid = response.cid
704
705 logger.info(f"[{correlation_id}] Successfully sent {len(responses)} threaded replies", extra={
706 'correlation_id': correlation_id,
707 'replies_sent': len(responses),
708 'replies_requested': len(reply_messages)
709 })
710 return responses
711
712 except Exception as e:
713 logger.error(f"[{correlation_id}] Error sending threaded reply to notification: {e}", extra={
714 'correlation_id': correlation_id,
715 'error': str(e),
716 'error_type': type(e).__name__,
717 'message_count': len(reply_messages)
718 })
719 return None
720
721
722def create_synthesis_ack(client: Client, note: str) -> Optional[Dict[str, Any]]:
723 """
724 Create a stream.thought.ack record for synthesis without a target post.
725
726 This creates a synthesis acknowledgment with null subject field.
727
728 Args:
729 client: Authenticated Bluesky client
730 note: The synthesis note/content
731
732 Returns:
733 The response from creating the acknowledgment record or None if failed
734 """
735 try:
736 import requests
737 import json
738 from datetime import datetime, timezone
739
740 # Get session info from the client
741 access_token = None
742 user_did = None
743
744 # Try different ways to get the session info
745 if hasattr(client, '_session') and client._session:
746 access_token = client._session.access_jwt
747 user_did = client._session.did
748 elif hasattr(client, 'access_jwt'):
749 access_token = client.access_jwt
750 user_did = client.did if hasattr(client, 'did') else None
751 else:
752 logger.error("Cannot access client session information")
753 return None
754
755 if not access_token or not user_did:
756 logger.error("Missing access token or DID from session")
757 return None
758
759 # Get PDS URI from config instead of environment variables
760 from config_loader import get_bluesky_config
761 bluesky_config = get_bluesky_config()
762 pds_host = bluesky_config['pds_uri']
763
764 # Create acknowledgment record with null subject
765 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
766 ack_record = {
767 "$type": "stream.thought.ack",
768 "subject": None, # Null subject for synthesis
769 "createdAt": now,
770 "note": note
771 }
772
773 # Create the record
774 headers = {"Authorization": f"Bearer {access_token}"}
775 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
776
777 create_data = {
778 "repo": user_did,
779 "collection": "stream.thought.ack",
780 "record": ack_record
781 }
782
783 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
784 response.raise_for_status()
785 result = response.json()
786
787 logger.info(f"Successfully created synthesis acknowledgment")
788 return result
789
790 except Exception as e:
791 logger.error(f"Error creating synthesis acknowledgment: {e}")
792 return None
793
794
795def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]:
796 """
797 Create a stream.thought.ack record to acknowledge a post.
798
799 This creates a custom acknowledgment record instead of a standard Bluesky like,
800 allowing void to track which posts it has engaged with.
801
802 Args:
803 client: Authenticated Bluesky client
804 post_uri: The URI of the post to acknowledge
805 post_cid: The CID of the post to acknowledge
806 note: Optional note to attach to the acknowledgment
807
808 Returns:
809 The response from creating the acknowledgment record or None if failed
810 """
811 try:
812 import requests
813 import json
814 from datetime import datetime, timezone
815
816 # Get session info from the client
817 # The atproto Client stores the session differently
818 access_token = None
819 user_did = None
820
821 # Try different ways to get the session info
822 if hasattr(client, '_session') and client._session:
823 access_token = client._session.access_jwt
824 user_did = client._session.did
825 elif hasattr(client, 'access_jwt'):
826 access_token = client.access_jwt
827 user_did = client.did if hasattr(client, 'did') else None
828 else:
829 logger.error("Cannot access client session information")
830 return None
831
832 if not access_token or not user_did:
833 logger.error("Missing access token or DID from session")
834 return None
835
836 # Get PDS URI from config instead of environment variables
837 from config_loader import get_bluesky_config
838 bluesky_config = get_bluesky_config()
839 pds_host = bluesky_config['pds_uri']
840
841 # Create acknowledgment record with stream.thought.ack type
842 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
843 ack_record = {
844 "$type": "stream.thought.ack",
845 "subject": {
846 "uri": post_uri,
847 "cid": post_cid
848 },
849 "createdAt": now,
850 "note": note # Will be null if no note provided
851 }
852
853 # Create the record
854 headers = {"Authorization": f"Bearer {access_token}"}
855 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
856
857 create_data = {
858 "repo": user_did,
859 "collection": "stream.thought.ack",
860 "record": ack_record
861 }
862
863 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
864 response.raise_for_status()
865 result = response.json()
866
867 logger.info(f"Successfully acknowledged post: {post_uri}")
868 return result
869
870 except Exception as e:
871 logger.error(f"Error acknowledging post: {e}")
872 return None
873
874
875def create_tool_call_record(client: Client, tool_name: str, arguments: str, tool_call_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
876 """
877 Create a stream.thought.tool_call record to track tool usage.
878
879 This creates a record of tool calls made by void during processing,
880 allowing for analysis of tool usage patterns and debugging.
881
882 Args:
883 client: Authenticated Bluesky client
884 tool_name: Name of the tool being called
885 arguments: Raw JSON string of the tool arguments
886 tool_call_id: Optional ID of the tool call for correlation
887
888 Returns:
889 The response from creating the tool call record or None if failed
890 """
891 try:
892 import requests
893 import json
894 from datetime import datetime, timezone
895
896 # Get session info from the client
897 access_token = None
898 user_did = None
899
900 # Try different ways to get the session info
901 if hasattr(client, '_session') and client._session:
902 access_token = client._session.access_jwt
903 user_did = client._session.did
904 elif hasattr(client, 'access_jwt'):
905 access_token = client.access_jwt
906 user_did = client.did if hasattr(client, 'did') else None
907 else:
908 logger.error("Cannot access client session information")
909 return None
910
911 if not access_token or not user_did:
912 logger.error("Missing access token or DID from session")
913 return None
914
915 # Get PDS URI from config instead of environment variables
916 from config_loader import get_bluesky_config
917 bluesky_config = get_bluesky_config()
918 pds_host = bluesky_config['pds_uri']
919
920 # Create tool call record
921 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
922 tool_record = {
923 "$type": "stream.thought.tool.call",
924 "tool_name": tool_name,
925 "arguments": arguments, # Store as string to avoid parsing issues
926 "createdAt": now
927 }
928
929 # Add tool_call_id if provided
930 if tool_call_id:
931 tool_record["tool_call_id"] = tool_call_id
932
933 # Create the record
934 headers = {"Authorization": f"Bearer {access_token}"}
935 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
936
937 create_data = {
938 "repo": user_did,
939 "collection": "stream.thought.tool.call",
940 "record": tool_record
941 }
942
943 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
944 if response.status_code != 200:
945 logger.error(f"Tool call record creation failed: {response.status_code} - {response.text}")
946 response.raise_for_status()
947 result = response.json()
948
949 logger.debug(f"Successfully recorded tool call: {tool_name}")
950 return result
951
952 except Exception as e:
953 logger.error(f"Error creating tool call record: {e}")
954 return None
955
956
957def create_reasoning_record(client: Client, reasoning_text: str) -> Optional[Dict[str, Any]]:
958 """
959 Create a stream.thought.reasoning record to track agent reasoning.
960
961 This creates a record of void's reasoning during message processing,
962 providing transparency into the decision-making process.
963
964 Args:
965 client: Authenticated Bluesky client
966 reasoning_text: The reasoning text from the agent
967
968 Returns:
969 The response from creating the reasoning record or None if failed
970 """
971 try:
972 import requests
973 import json
974 from datetime import datetime, timezone
975
976 # Get session info from the client
977 access_token = None
978 user_did = None
979
980 # Try different ways to get the session info
981 if hasattr(client, '_session') and client._session:
982 access_token = client._session.access_jwt
983 user_did = client._session.did
984 elif hasattr(client, 'access_jwt'):
985 access_token = client.access_jwt
986 user_did = client.did if hasattr(client, 'did') else None
987 else:
988 logger.error("Cannot access client session information")
989 return None
990
991 if not access_token or not user_did:
992 logger.error("Missing access token or DID from session")
993 return None
994
995 # Get PDS URI from config instead of environment variables
996 from config_loader import get_bluesky_config
997 bluesky_config = get_bluesky_config()
998 pds_host = bluesky_config['pds_uri']
999
1000 # Create reasoning record
1001 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
1002 reasoning_record = {
1003 "$type": "stream.thought.reasoning",
1004 "reasoning": reasoning_text,
1005 "createdAt": now
1006 }
1007
1008 # Create the record
1009 headers = {"Authorization": f"Bearer {access_token}"}
1010 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
1011
1012 create_data = {
1013 "repo": user_did,
1014 "collection": "stream.thought.reasoning",
1015 "record": reasoning_record
1016 }
1017
1018 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
1019 response.raise_for_status()
1020 result = response.json()
1021
1022 logger.debug(f"Successfully recorded reasoning (length: {len(reasoning_text)} chars)")
1023 return result
1024
1025 except Exception as e:
1026 logger.error(f"Error creating reasoning record: {e}")
1027 return None
1028
1029
1030if __name__ == "__main__":
1031 client = default_login()
1032 # do something with the client
1033 logger.info("Client is ready to use!")