a digital person for bluesky
1import os
2import logging
3from typing import Optional, Dict, Any, List
4from atproto_client import Client, Session, SessionEvent, models
5
6# Configure logging
7logging.basicConfig(
8 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
9)
10logger = logging.getLogger("bluesky_session_handler")
11
12# Load the environment variables
13import dotenv
14dotenv.load_dotenv(override=True)
15
16import yaml
17import json
18
19# Strip fields. A list of fields to remove from a JSON object
20STRIP_FIELDS = [
21 "cid",
22 "rev",
23 "did",
24 "uri",
25 "langs",
26 "threadgate",
27 "py_type",
28 "labels",
29 "avatar",
30 "viewer",
31 "indexed_at",
32 "tags",
33 "associated",
34 "thread_context",
35 "aspect_ratio",
36 "thumb",
37 "fullsize",
38 "root",
39 "created_at",
40 "verification",
41 "like_count",
42 "quote_count",
43 "reply_count",
44 "repost_count",
45 "embedding_disabled",
46 "thread_muted",
47 "reply_disabled",
48 "pinned",
49 "like",
50 "repost",
51 "blocked_by",
52 "blocking",
53 "blocking_by_list",
54 "followed_by",
55 "following",
56 "known_followers",
57 "muted",
58 "muted_by_list",
59 "root_author_like",
60 "entities",
61 "ref",
62 "mime_type",
63 "size",
64]
65def convert_to_basic_types(obj):
66 """Convert complex Python objects to basic types for JSON/YAML serialization."""
67 if hasattr(obj, '__dict__'):
68 # Convert objects with __dict__ to their dictionary representation
69 return convert_to_basic_types(obj.__dict__)
70 elif isinstance(obj, dict):
71 return {key: convert_to_basic_types(value) for key, value in obj.items()}
72 elif isinstance(obj, list):
73 return [convert_to_basic_types(item) for item in obj]
74 elif isinstance(obj, (str, int, float, bool)) or obj is None:
75 return obj
76 else:
77 # For other types, try to convert to string
78 return str(obj)
79
80
81def strip_fields(obj, strip_field_list):
82 """Recursively strip fields from a JSON object."""
83 if isinstance(obj, dict):
84 keys_flagged_for_removal = []
85
86 # Remove fields from strip list and pydantic metadata
87 for field in list(obj.keys()):
88 if field in strip_field_list or field.startswith("__"):
89 keys_flagged_for_removal.append(field)
90
91 # Remove flagged keys
92 for key in keys_flagged_for_removal:
93 obj.pop(key, None)
94
95 # Recursively process remaining values
96 for key, value in list(obj.items()):
97 obj[key] = strip_fields(value, strip_field_list)
98 # Remove empty/null values after processing
99 if (
100 obj[key] is None
101 or (isinstance(obj[key], dict) and len(obj[key]) == 0)
102 or (isinstance(obj[key], list) and len(obj[key]) == 0)
103 or (isinstance(obj[key], str) and obj[key].strip() == "")
104 ):
105 obj.pop(key, None)
106
107 elif isinstance(obj, list):
108 for i, value in enumerate(obj):
109 obj[i] = strip_fields(value, strip_field_list)
110 # Remove None values from list
111 obj[:] = [item for item in obj if item is not None]
112
113 return obj
114
115
116def flatten_thread_structure(thread_data):
117 """
118 Flatten a nested thread structure into a list while preserving all data.
119
120 Args:
121 thread_data: The thread data from get_post_thread
122
123 Returns:
124 Dict with 'posts' key containing a list of posts in chronological order
125 """
126 posts = []
127
128 def traverse_thread(node):
129 """Recursively traverse the thread structure to collect posts."""
130 if not node:
131 return
132
133 # If this node has a parent, traverse it first (to maintain chronological order)
134 if hasattr(node, 'parent') and node.parent:
135 traverse_thread(node.parent)
136
137 # Then add this node's post
138 if hasattr(node, 'post') and node.post:
139 # Convert to dict if needed to ensure we can process it
140 if hasattr(node.post, '__dict__'):
141 post_dict = node.post.__dict__.copy()
142 elif isinstance(node.post, dict):
143 post_dict = node.post.copy()
144 else:
145 post_dict = {}
146
147 posts.append(post_dict)
148
149 # Handle the thread structure
150 if hasattr(thread_data, 'thread'):
151 # Start from the main thread node
152 traverse_thread(thread_data.thread)
153 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__:
154 traverse_thread(thread_data.__dict__['thread'])
155
156 # Return a simple structure with posts list
157 return {'posts': posts}
158
159
160def thread_to_yaml_string(thread, strip_metadata=True):
161 """
162 Convert thread data to a YAML-formatted string for LLM parsing.
163
164 Args:
165 thread: The thread data from get_post_thread
166 strip_metadata: Whether to strip metadata fields for cleaner output
167
168 Returns:
169 YAML-formatted string representation of the thread
170 """
171 # First flatten the thread structure to avoid deep nesting
172 flattened = flatten_thread_structure(thread)
173
174 # Convert complex objects to basic types
175 basic_thread = convert_to_basic_types(flattened)
176
177 if strip_metadata:
178 # Create a copy and strip unwanted fields
179 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS)
180 else:
181 cleaned_thread = basic_thread
182
183 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False)
184
185
186
187
188
189
190
191def get_session(username: str) -> Optional[str]:
192 try:
193 with open(f"session_{username}.txt", encoding="UTF-8") as f:
194 return f.read()
195 except FileNotFoundError:
196 logger.debug(f"No existing session found for {username}")
197 return None
198
199def save_session(username: str, session_string: str) -> None:
200 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f:
201 f.write(session_string)
202 logger.debug(f"Session saved for {username}")
203
204def on_session_change(username: str, event: SessionEvent, session: Session) -> None:
205 logger.debug(f"Session changed: {event} {repr(session)}")
206 if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
207 logger.debug(f"Saving changed session for {username}")
208 save_session(username, session.export())
209
210def init_client(username: str, password: str) -> Client:
211 pds_uri = os.getenv("PDS_URI")
212 if pds_uri is None:
213 logger.warning(
214 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable."
215 )
216 pds_uri = "https://bsky.social"
217
218 # Print the PDS URI
219 logger.debug(f"Using PDS URI: {pds_uri}")
220
221 client = Client(pds_uri)
222 client.on_session_change(
223 lambda event, session: on_session_change(username, event, session)
224 )
225
226 session_string = get_session(username)
227 if session_string:
228 logger.debug(f"Reusing existing session for {username}")
229 client.login(session_string=session_string)
230 else:
231 logger.debug(f"Creating new session for {username}")
232 client.login(username, password)
233
234 return client
235
236
237def default_login() -> Client:
238 username = os.getenv("BSKY_USERNAME")
239 password = os.getenv("BSKY_PASSWORD")
240
241 if username is None:
242 logger.error(
243 "No username provided. Please provide a username using the BSKY_USERNAME environment variable."
244 )
245 exit()
246
247 if password is None:
248 logger.error(
249 "No password provided. Please provide a password using the BSKY_PASSWORD environment variable."
250 )
251 exit()
252
253 return init_client(username, password)
254
255def remove_outside_quotes(text: str) -> str:
256 """
257 Remove outside double quotes from response text.
258
259 Only handles double quotes to avoid interfering with contractions:
260 - Double quotes: "text" → text
261 - Preserves single quotes and internal quotes
262
263 Args:
264 text: The text to process
265
266 Returns:
267 Text with outside double quotes removed
268 """
269 if not text or len(text) < 2:
270 return text
271
272 text = text.strip()
273
274 # Only remove double quotes from start and end
275 if text.startswith('"') and text.endswith('"'):
276 return text[1:-1]
277
278 return text
279
280def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None) -> Dict[str, Any]:
281 """
282 Reply to a post on Bluesky with rich text support.
283
284 Args:
285 client: Authenticated Bluesky client
286 text: The reply text
287 reply_to_uri: The URI of the post being replied to (parent)
288 reply_to_cid: The CID of the post being replied to (parent)
289 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri
290 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid
291 lang: Language code for the post (e.g., 'en-US', 'es', 'ja')
292
293 Returns:
294 The response from sending the post
295 """
296 import re
297
298 # If root is not provided, this is a reply to the root post
299 if root_uri is None:
300 root_uri = reply_to_uri
301 root_cid = reply_to_cid
302
303 # Create references for the reply
304 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid))
305 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid))
306
307 # Parse rich text facets (mentions and URLs)
308 facets = []
309 text_bytes = text.encode("UTF-8")
310
311 # Parse mentions - fixed to handle @ at start of text
312 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)"
313
314 for m in re.finditer(mention_regex, text_bytes):
315 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix
316 # Adjust byte positions to account for the optional prefix
317 mention_start = m.start(1)
318 mention_end = m.end(1)
319 try:
320 # Resolve handle to DID using the API
321 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle})
322 if resolve_resp and hasattr(resolve_resp, 'did'):
323 facets.append(
324 models.AppBskyRichtextFacet.Main(
325 index=models.AppBskyRichtextFacet.ByteSlice(
326 byteStart=mention_start,
327 byteEnd=mention_end
328 ),
329 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)]
330 )
331 )
332 except Exception as e:
333 logger.debug(f"Failed to resolve handle {handle}: {e}")
334 continue
335
336 # Parse URLs - fixed to handle URLs at start of text
337 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)"
338
339 for m in re.finditer(url_regex, text_bytes):
340 url = m.group(1).decode("UTF-8")
341 # Adjust byte positions to account for the optional prefix
342 url_start = m.start(1)
343 url_end = m.end(1)
344 facets.append(
345 models.AppBskyRichtextFacet.Main(
346 index=models.AppBskyRichtextFacet.ByteSlice(
347 byteStart=url_start,
348 byteEnd=url_end
349 ),
350 features=[models.AppBskyRichtextFacet.Link(uri=url)]
351 )
352 )
353
354 # Send the reply with facets if any were found
355 if facets:
356 response = client.send_post(
357 text=text,
358 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
359 facets=facets,
360 langs=[lang] if lang else None
361 )
362 else:
363 response = client.send_post(
364 text=text,
365 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
366 langs=[lang] if lang else None
367 )
368
369 logger.info(f"Reply sent successfully: {response.uri}")
370 return response
371
372
373def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]:
374 """
375 Get the thread containing a post to find root post information.
376
377 Args:
378 client: Authenticated Bluesky client
379 uri: The URI of the post
380
381 Returns:
382 The thread data or None if not found
383 """
384 try:
385 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10})
386 return thread
387 except Exception as e:
388 logger.error(f"Error fetching post thread: {e}")
389 return None
390
391
392def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US") -> Optional[Dict[str, Any]]:
393 """
394 Reply to a notification (mention or reply).
395
396 Args:
397 client: Authenticated Bluesky client
398 notification: The notification object from list_notifications
399 reply_text: The text to reply with
400 lang: Language code for the post (defaults to "en-US")
401
402 Returns:
403 The response from sending the reply or None if failed
404 """
405 try:
406 # Get the post URI and CID from the notification (handle both dict and object)
407 if isinstance(notification, dict):
408 post_uri = notification.get('uri')
409 post_cid = notification.get('cid')
410 # Check if the notification record has reply info with root
411 record = notification.get('record', {})
412 reply_info = record.get('reply') if isinstance(record, dict) else None
413 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
414 post_uri = notification.uri
415 post_cid = notification.cid
416 # Check if the notification record has reply info with root
417 reply_info = None
418 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
419 reply_info = notification.record.reply
420 else:
421 post_uri = None
422 post_cid = None
423 reply_info = None
424
425 if not post_uri or not post_cid:
426 logger.error("Notification doesn't have required uri/cid fields")
427 return None
428
429 # Determine root: if post has reply info, use its root; otherwise this post IS the root
430 if reply_info:
431 # Extract root from the notification's reply structure
432 if isinstance(reply_info, dict):
433 root_ref = reply_info.get('root')
434 if root_ref and isinstance(root_ref, dict):
435 root_uri = root_ref.get('uri', post_uri)
436 root_cid = root_ref.get('cid', post_cid)
437 else:
438 # No root in reply info, use post as root
439 root_uri = post_uri
440 root_cid = post_cid
441 elif hasattr(reply_info, 'root'):
442 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
443 root_uri = reply_info.root.uri
444 root_cid = reply_info.root.cid
445 else:
446 root_uri = post_uri
447 root_cid = post_cid
448 else:
449 root_uri = post_uri
450 root_cid = post_cid
451 else:
452 # No reply info means this post IS the root
453 root_uri = post_uri
454 root_cid = post_cid
455
456 # Reply to the notification
457 return reply_to_post(
458 client=client,
459 text=reply_text,
460 reply_to_uri=post_uri,
461 reply_to_cid=post_cid,
462 root_uri=root_uri,
463 root_cid=root_cid,
464 lang=lang
465 )
466
467 except Exception as e:
468 logger.error(f"Error replying to notification: {e}")
469 return None
470
471
472def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US") -> Optional[List[Dict[str, Any]]]:
473 """
474 Reply to a notification with a threaded chain of messages (max 15).
475
476 Args:
477 client: Authenticated Bluesky client
478 notification: The notification object from list_notifications
479 reply_messages: List of reply texts (max 15 messages, each max 300 chars)
480 lang: Language code for the posts (defaults to "en-US")
481
482 Returns:
483 List of responses from sending the replies or None if failed
484 """
485 try:
486 # Validate input
487 if not reply_messages or len(reply_messages) == 0:
488 logger.error("Reply messages list cannot be empty")
489 return None
490 if len(reply_messages) > 15:
491 logger.error(f"Cannot send more than 15 reply messages (got {len(reply_messages)})")
492 return None
493
494 # Get the post URI and CID from the notification (handle both dict and object)
495 if isinstance(notification, dict):
496 post_uri = notification.get('uri')
497 post_cid = notification.get('cid')
498 # Check if the notification record has reply info with root
499 record = notification.get('record', {})
500 reply_info = record.get('reply') if isinstance(record, dict) else None
501 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
502 post_uri = notification.uri
503 post_cid = notification.cid
504 # Check if the notification record has reply info with root
505 reply_info = None
506 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
507 reply_info = notification.record.reply
508 else:
509 post_uri = None
510 post_cid = None
511 reply_info = None
512
513 if not post_uri or not post_cid:
514 logger.error("Notification doesn't have required uri/cid fields")
515 return None
516
517 # Determine root: if post has reply info, use its root; otherwise this post IS the root
518 if reply_info:
519 # Extract root from the notification's reply structure
520 if isinstance(reply_info, dict):
521 root_ref = reply_info.get('root')
522 if root_ref and isinstance(root_ref, dict):
523 root_uri = root_ref.get('uri', post_uri)
524 root_cid = root_ref.get('cid', post_cid)
525 else:
526 # No root in reply info, use post as root
527 root_uri = post_uri
528 root_cid = post_cid
529 elif hasattr(reply_info, 'root'):
530 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
531 root_uri = reply_info.root.uri
532 root_cid = reply_info.root.cid
533 else:
534 root_uri = post_uri
535 root_cid = post_cid
536 else:
537 root_uri = post_uri
538 root_cid = post_cid
539 else:
540 # No reply info means this post IS the root
541 root_uri = post_uri
542 root_cid = post_cid
543
544 # Send replies in sequence, creating a thread
545 responses = []
546 current_parent_uri = post_uri
547 current_parent_cid = post_cid
548
549 for i, message in enumerate(reply_messages):
550 logger.info(f"Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...")
551
552 # Send this reply
553 response = reply_to_post(
554 client=client,
555 text=message,
556 reply_to_uri=current_parent_uri,
557 reply_to_cid=current_parent_cid,
558 root_uri=root_uri,
559 root_cid=root_cid,
560 lang=lang
561 )
562
563 if not response:
564 logger.error(f"Failed to send reply {i+1}, posting system failure message")
565 # Try to post a system failure message
566 failure_response = reply_to_post(
567 client=client,
568 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]",
569 reply_to_uri=current_parent_uri,
570 reply_to_cid=current_parent_cid,
571 root_uri=root_uri,
572 root_cid=root_cid,
573 lang=lang
574 )
575 if failure_response:
576 responses.append(failure_response)
577 current_parent_uri = failure_response.uri
578 current_parent_cid = failure_response.cid
579 else:
580 logger.error("Could not even send system failure message, stopping thread")
581 return responses if responses else None
582 else:
583 responses.append(response)
584 # Update parent references for next reply (if any)
585 if i < len(reply_messages) - 1: # Not the last message
586 current_parent_uri = response.uri
587 current_parent_cid = response.cid
588
589 logger.info(f"Successfully sent {len(responses)} threaded replies")
590 return responses
591
592 except Exception as e:
593 logger.error(f"Error sending threaded reply to notification: {e}")
594 return None
595
596
597def create_synthesis_ack(client: Client, note: str) -> Optional[Dict[str, Any]]:
598 """
599 Create a stream.thought.ack record for synthesis without a target post.
600
601 This creates a synthesis acknowledgment with null subject field.
602
603 Args:
604 client: Authenticated Bluesky client
605 note: The synthesis note/content
606
607 Returns:
608 The response from creating the acknowledgment record or None if failed
609 """
610 try:
611 import requests
612 import json
613 from datetime import datetime, timezone
614
615 # Get session info from the client
616 access_token = None
617 user_did = None
618
619 # Try different ways to get the session info
620 if hasattr(client, '_session') and client._session:
621 access_token = client._session.access_jwt
622 user_did = client._session.did
623 elif hasattr(client, 'access_jwt'):
624 access_token = client.access_jwt
625 user_did = client.did if hasattr(client, 'did') else None
626 else:
627 logger.error("Cannot access client session information")
628 return None
629
630 if not access_token or not user_did:
631 logger.error("Missing access token or DID from session")
632 return None
633
634 pds_host = os.getenv("PDS_URI", "https://bsky.social")
635
636 # Create acknowledgment record with null subject
637 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
638 ack_record = {
639 "$type": "stream.thought.ack",
640 "subject": None, # Null subject for synthesis
641 "createdAt": now,
642 "note": note
643 }
644
645 # Create the record
646 headers = {"Authorization": f"Bearer {access_token}"}
647 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
648
649 create_data = {
650 "repo": user_did,
651 "collection": "stream.thought.ack",
652 "record": ack_record
653 }
654
655 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
656 response.raise_for_status()
657 result = response.json()
658
659 logger.info(f"Successfully created synthesis acknowledgment")
660 return result
661
662 except Exception as e:
663 logger.error(f"Error creating synthesis acknowledgment: {e}")
664 return None
665
666
667def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]:
668 """
669 Create a stream.thought.ack record to acknowledge a post.
670
671 This creates a custom acknowledgment record instead of a standard Bluesky like,
672 allowing void to track which posts it has engaged with.
673
674 Args:
675 client: Authenticated Bluesky client
676 post_uri: The URI of the post to acknowledge
677 post_cid: The CID of the post to acknowledge
678 note: Optional note to attach to the acknowledgment
679
680 Returns:
681 The response from creating the acknowledgment record or None if failed
682 """
683 try:
684 import requests
685 import json
686 from datetime import datetime, timezone
687
688 # Get session info from the client
689 # The atproto Client stores the session differently
690 access_token = None
691 user_did = None
692
693 # Try different ways to get the session info
694 if hasattr(client, '_session') and client._session:
695 access_token = client._session.access_jwt
696 user_did = client._session.did
697 elif hasattr(client, 'access_jwt'):
698 access_token = client.access_jwt
699 user_did = client.did if hasattr(client, 'did') else None
700 else:
701 logger.error("Cannot access client session information")
702 return None
703
704 if not access_token or not user_did:
705 logger.error("Missing access token or DID from session")
706 return None
707
708 pds_host = os.getenv("PDS_URI", "https://bsky.social")
709
710 # Create acknowledgment record with stream.thought.ack type
711 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
712 ack_record = {
713 "$type": "stream.thought.ack",
714 "subject": {
715 "uri": post_uri,
716 "cid": post_cid
717 },
718 "createdAt": now,
719 "note": note # Will be null if no note provided
720 }
721
722 # Create the record
723 headers = {"Authorization": f"Bearer {access_token}"}
724 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
725
726 create_data = {
727 "repo": user_did,
728 "collection": "stream.thought.ack",
729 "record": ack_record
730 }
731
732 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
733 response.raise_for_status()
734 result = response.json()
735
736 logger.info(f"Successfully acknowledged post: {post_uri}")
737 return result
738
739 except Exception as e:
740 logger.error(f"Error acknowledging post: {e}")
741 return None
742
743
744def create_tool_call_record(client: Client, tool_name: str, arguments: str, tool_call_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
745 """
746 Create a stream.thought.tool_call record to track tool usage.
747
748 This creates a record of tool calls made by void during processing,
749 allowing for analysis of tool usage patterns and debugging.
750
751 Args:
752 client: Authenticated Bluesky client
753 tool_name: Name of the tool being called
754 arguments: Raw JSON string of the tool arguments
755 tool_call_id: Optional ID of the tool call for correlation
756
757 Returns:
758 The response from creating the tool call record or None if failed
759 """
760 try:
761 import requests
762 import json
763 from datetime import datetime, timezone
764
765 # Get session info from the client
766 access_token = None
767 user_did = None
768
769 # Try different ways to get the session info
770 if hasattr(client, '_session') and client._session:
771 access_token = client._session.access_jwt
772 user_did = client._session.did
773 elif hasattr(client, 'access_jwt'):
774 access_token = client.access_jwt
775 user_did = client.did if hasattr(client, 'did') else None
776 else:
777 logger.error("Cannot access client session information")
778 return None
779
780 if not access_token or not user_did:
781 logger.error("Missing access token or DID from session")
782 return None
783
784 pds_host = os.getenv("PDS_URI", "https://bsky.social")
785
786 # Create tool call record
787 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
788 tool_record = {
789 "$type": "stream.thought.tool.call",
790 "tool_name": tool_name,
791 "arguments": arguments, # Store as string to avoid parsing issues
792 "createdAt": now
793 }
794
795 # Add tool_call_id if provided
796 if tool_call_id:
797 tool_record["tool_call_id"] = tool_call_id
798
799 # Create the record
800 headers = {"Authorization": f"Bearer {access_token}"}
801 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
802
803 create_data = {
804 "repo": user_did,
805 "collection": "stream.thought.tool.call",
806 "record": tool_record
807 }
808
809 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
810 if response.status_code != 200:
811 logger.error(f"Tool call record creation failed: {response.status_code} - {response.text}")
812 response.raise_for_status()
813 result = response.json()
814
815 logger.debug(f"Successfully recorded tool call: {tool_name}")
816 return result
817
818 except Exception as e:
819 logger.error(f"Error creating tool call record: {e}")
820 return None
821
822
823def create_reasoning_record(client: Client, reasoning_text: str) -> Optional[Dict[str, Any]]:
824 """
825 Create a stream.thought.reasoning record to track agent reasoning.
826
827 This creates a record of void's reasoning during message processing,
828 providing transparency into the decision-making process.
829
830 Args:
831 client: Authenticated Bluesky client
832 reasoning_text: The reasoning text from the agent
833
834 Returns:
835 The response from creating the reasoning record or None if failed
836 """
837 try:
838 import requests
839 import json
840 from datetime import datetime, timezone
841
842 # Get session info from the client
843 access_token = None
844 user_did = None
845
846 # Try different ways to get the session info
847 if hasattr(client, '_session') and client._session:
848 access_token = client._session.access_jwt
849 user_did = client._session.did
850 elif hasattr(client, 'access_jwt'):
851 access_token = client.access_jwt
852 user_did = client.did if hasattr(client, 'did') else None
853 else:
854 logger.error("Cannot access client session information")
855 return None
856
857 if not access_token or not user_did:
858 logger.error("Missing access token or DID from session")
859 return None
860
861 pds_host = os.getenv("PDS_URI", "https://bsky.social")
862
863 # Create reasoning record
864 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
865 reasoning_record = {
866 "$type": "stream.thought.reasoning",
867 "reasoning": reasoning_text,
868 "createdAt": now
869 }
870
871 # Create the record
872 headers = {"Authorization": f"Bearer {access_token}"}
873 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
874
875 create_data = {
876 "repo": user_did,
877 "collection": "stream.thought.reasoning",
878 "record": reasoning_record
879 }
880
881 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
882 response.raise_for_status()
883 result = response.json()
884
885 logger.debug(f"Successfully recorded reasoning (length: {len(reasoning_text)} chars)")
886 return result
887
888 except Exception as e:
889 logger.error(f"Error creating reasoning record: {e}")
890 return None
891
892
893if __name__ == "__main__":
894 client = default_login()
895 # do something with the client
896 logger.info("Client is ready to use!")