a digital person for bluesky
1import os
2import logging
3from typing import Optional, Dict, Any, List
4from atproto_client import Client, Session, SessionEvent, models
5
6# Configure logging
7logging.basicConfig(
8 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
9)
10logger = logging.getLogger("bluesky_session_handler")
11
12# Load the environment variables
13import dotenv
14dotenv.load_dotenv(override=True)
15
16import yaml
17import json
18
19# Strip fields. A list of fields to remove from a JSON object
20STRIP_FIELDS = [
21 "cid",
22 "rev",
23 "did",
24 "uri",
25 "langs",
26 "threadgate",
27 "py_type",
28 "labels",
29 "avatar",
30 "viewer",
31 "indexed_at",
32 "tags",
33 "associated",
34 "thread_context",
35 "aspect_ratio",
36 "thumb",
37 "fullsize",
38 "root",
39 "created_at",
40 "verification",
41 "like_count",
42 "quote_count",
43 "reply_count",
44 "repost_count",
45 "embedding_disabled",
46 "thread_muted",
47 "reply_disabled",
48 "pinned",
49 "like",
50 "repost",
51 "blocked_by",
52 "blocking",
53 "blocking_by_list",
54 "followed_by",
55 "following",
56 "known_followers",
57 "muted",
58 "muted_by_list",
59 "root_author_like",
60 "entities",
61 "ref",
62 "mime_type",
63 "size",
64]
65def convert_to_basic_types(obj):
66 """Convert complex Python objects to basic types for JSON/YAML serialization."""
67 if hasattr(obj, '__dict__'):
68 # Convert objects with __dict__ to their dictionary representation
69 return convert_to_basic_types(obj.__dict__)
70 elif isinstance(obj, dict):
71 return {key: convert_to_basic_types(value) for key, value in obj.items()}
72 elif isinstance(obj, list):
73 return [convert_to_basic_types(item) for item in obj]
74 elif isinstance(obj, (str, int, float, bool)) or obj is None:
75 return obj
76 else:
77 # For other types, try to convert to string
78 return str(obj)
79
80
81def strip_fields(obj, strip_field_list):
82 """Recursively strip fields from a JSON object."""
83 if isinstance(obj, dict):
84 keys_flagged_for_removal = []
85
86 # Remove fields from strip list and pydantic metadata
87 for field in list(obj.keys()):
88 if field in strip_field_list or field.startswith("__"):
89 keys_flagged_for_removal.append(field)
90
91 # Remove flagged keys
92 for key in keys_flagged_for_removal:
93 obj.pop(key, None)
94
95 # Recursively process remaining values
96 for key, value in list(obj.items()):
97 obj[key] = strip_fields(value, strip_field_list)
98 # Remove empty/null values after processing
99 if (
100 obj[key] is None
101 or (isinstance(obj[key], dict) and len(obj[key]) == 0)
102 or (isinstance(obj[key], list) and len(obj[key]) == 0)
103 or (isinstance(obj[key], str) and obj[key].strip() == "")
104 ):
105 obj.pop(key, None)
106
107 elif isinstance(obj, list):
108 for i, value in enumerate(obj):
109 obj[i] = strip_fields(value, strip_field_list)
110 # Remove None values from list
111 obj[:] = [item for item in obj if item is not None]
112
113 return obj
114
115
116def flatten_thread_structure(thread_data):
117 """
118 Flatten a nested thread structure into a list while preserving all data.
119
120 Args:
121 thread_data: The thread data from get_post_thread
122
123 Returns:
124 Dict with 'posts' key containing a list of posts in chronological order
125 """
126 posts = []
127
128 def traverse_thread(node):
129 """Recursively traverse the thread structure to collect posts."""
130 if not node:
131 return
132
133 # If this node has a parent, traverse it first (to maintain chronological order)
134 if hasattr(node, 'parent') and node.parent:
135 traverse_thread(node.parent)
136
137 # Then add this node's post
138 if hasattr(node, 'post') and node.post:
139 # Convert to dict if needed to ensure we can process it
140 if hasattr(node.post, '__dict__'):
141 post_dict = node.post.__dict__.copy()
142 elif isinstance(node.post, dict):
143 post_dict = node.post.copy()
144 else:
145 post_dict = {}
146
147 posts.append(post_dict)
148
149 # Handle the thread structure
150 if hasattr(thread_data, 'thread'):
151 # Start from the main thread node
152 traverse_thread(thread_data.thread)
153 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__:
154 traverse_thread(thread_data.__dict__['thread'])
155
156 # Return a simple structure with posts list
157 return {'posts': posts}
158
159
160def thread_to_yaml_string(thread, strip_metadata=True):
161 """
162 Convert thread data to a YAML-formatted string for LLM parsing.
163
164 Args:
165 thread: The thread data from get_post_thread
166 strip_metadata: Whether to strip metadata fields for cleaner output
167
168 Returns:
169 YAML-formatted string representation of the thread
170 """
171 # First flatten the thread structure to avoid deep nesting
172 flattened = flatten_thread_structure(thread)
173
174 # Convert complex objects to basic types
175 basic_thread = convert_to_basic_types(flattened)
176
177 if strip_metadata:
178 # Create a copy and strip unwanted fields
179 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS)
180 else:
181 cleaned_thread = basic_thread
182
183 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False)
184
185
186
187
188
189
190
191def get_session(username: str) -> Optional[str]:
192 try:
193 with open(f"session_{username}.txt", encoding="UTF-8") as f:
194 return f.read()
195 except FileNotFoundError:
196 logger.debug(f"No existing session found for {username}")
197 return None
198
199def save_session(username: str, session_string: str) -> None:
200 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f:
201 f.write(session_string)
202 logger.debug(f"Session saved for {username}")
203
204def on_session_change(username: str, event: SessionEvent, session: Session) -> None:
205 logger.debug(f"Session changed: {event} {repr(session)}")
206 if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
207 logger.debug(f"Saving changed session for {username}")
208 save_session(username, session.export())
209
210def init_client(username: str, password: str) -> Client:
211 from config_loader import get_bluesky_config
212 try:
213 bluesky_config = get_bluesky_config()
214 pds_uri = bluesky_config.get('pds_uri', 'https://bsky.social')
215 except (ValueError, KeyError):
216 pds_uri = os.getenv("PDS_URI", "https://bsky.social")
217 logger.warning(
218 "Failed to load PDS URI from config. Using environment variable or default."
219 )
220
221 # Print the PDS URI
222 logger.debug(f"Using PDS URI: {pds_uri}")
223
224 client = Client(pds_uri)
225 client.on_session_change(
226 lambda event, session: on_session_change(username, event, session)
227 )
228
229 session_string = get_session(username)
230 if session_string:
231 logger.debug(f"Reusing existing session for {username}")
232 client.login(session_string=session_string)
233 else:
234 logger.debug(f"Creating new session for {username}")
235 client.login(username, password)
236
237 return client
238
239
240def default_login(config_path: str = "config.yaml") -> Client:
241 from config_loader import get_bluesky_config
242 try:
243 bluesky_config = get_bluesky_config(config_path)
244 username = bluesky_config['username']
245 password = bluesky_config['password']
246 except (ValueError, KeyError) as e:
247 logger.error(f"Failed to load Bluesky configuration: {e}")
248 exit()
249
250
251 if username is None:
252 logger.error(
253 "No username provided. Please provide a username using the BSKY_USERNAME environment variable."
254 )
255 exit()
256
257 if password is None:
258 logger.error(
259 "No password provided. Please provide a password using the BSKY_PASSWORD environment variable."
260 )
261 exit()
262
263 return init_client(username, password)
264
265def remove_outside_quotes(text: str) -> str:
266 """
267 Remove outside double quotes from response text.
268
269 Only handles double quotes to avoid interfering with contractions:
270 - Double quotes: "text" → text
271 - Preserves single quotes and internal quotes
272
273 Args:
274 text: The text to process
275
276 Returns:
277 Text with outside double quotes removed
278 """
279 if not text or len(text) < 2:
280 return text
281
282 text = text.strip()
283
284 # Only remove double quotes from start and end
285 if text.startswith('"') and text.endswith('"'):
286 return text[1:-1]
287
288 return text
289
290def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None) -> Dict[str, Any]:
291 """
292 Reply to a post on Bluesky with rich text support.
293
294 Args:
295 client: Authenticated Bluesky client
296 text: The reply text
297 reply_to_uri: The URI of the post being replied to (parent)
298 reply_to_cid: The CID of the post being replied to (parent)
299 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri
300 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid
301 lang: Language code for the post (e.g., 'en-US', 'es', 'ja')
302
303 Returns:
304 The response from sending the post
305 """
306 import re
307
308 # If root is not provided, this is a reply to the root post
309 if root_uri is None:
310 root_uri = reply_to_uri
311 root_cid = reply_to_cid
312
313 # Create references for the reply
314 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid))
315 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid))
316
317 # Parse rich text facets (mentions and URLs)
318 facets = []
319 text_bytes = text.encode("UTF-8")
320
321 # Parse mentions - fixed to handle @ at start of text
322 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)"
323
324 for m in re.finditer(mention_regex, text_bytes):
325 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix
326 # Adjust byte positions to account for the optional prefix
327 mention_start = m.start(1)
328 mention_end = m.end(1)
329 try:
330 # Resolve handle to DID using the API
331 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle})
332 if resolve_resp and hasattr(resolve_resp, 'did'):
333 facets.append(
334 models.AppBskyRichtextFacet.Main(
335 index=models.AppBskyRichtextFacet.ByteSlice(
336 byteStart=mention_start,
337 byteEnd=mention_end
338 ),
339 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)]
340 )
341 )
342 except Exception as e:
343 logger.debug(f"Failed to resolve handle {handle}: {e}")
344 continue
345
346 # Parse URLs - fixed to handle URLs at start of text
347 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)"
348
349 for m in re.finditer(url_regex, text_bytes):
350 url = m.group(1).decode("UTF-8")
351 # Adjust byte positions to account for the optional prefix
352 url_start = m.start(1)
353 url_end = m.end(1)
354 facets.append(
355 models.AppBskyRichtextFacet.Main(
356 index=models.AppBskyRichtextFacet.ByteSlice(
357 byteStart=url_start,
358 byteEnd=url_end
359 ),
360 features=[models.AppBskyRichtextFacet.Link(uri=url)]
361 )
362 )
363
364 # Send the reply with facets if any were found
365 if facets:
366 response = client.send_post(
367 text=text,
368 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
369 facets=facets,
370 langs=[lang] if lang else None
371 )
372 else:
373 response = client.send_post(
374 text=text,
375 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
376 langs=[lang] if lang else None
377 )
378
379 logger.info(f"Reply sent successfully: {response.uri}")
380 return response
381
382
383def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]:
384 """
385 Get the thread containing a post to find root post information.
386
387 Args:
388 client: Authenticated Bluesky client
389 uri: The URI of the post
390
391 Returns:
392 The thread data or None if not found
393 """
394 try:
395 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10})
396 return thread
397 except Exception as e:
398 logger.error(f"Error fetching post thread: {e}")
399 return None
400
401
402def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US") -> Optional[Dict[str, Any]]:
403 """
404 Reply to a notification (mention or reply).
405
406 Args:
407 client: Authenticated Bluesky client
408 notification: The notification object from list_notifications
409 reply_text: The text to reply with
410 lang: Language code for the post (defaults to "en-US")
411
412 Returns:
413 The response from sending the reply or None if failed
414 """
415 try:
416 # Get the post URI and CID from the notification (handle both dict and object)
417 if isinstance(notification, dict):
418 post_uri = notification.get('uri')
419 post_cid = notification.get('cid')
420 # Check if the notification record has reply info with root
421 record = notification.get('record', {})
422 reply_info = record.get('reply') if isinstance(record, dict) else None
423 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
424 post_uri = notification.uri
425 post_cid = notification.cid
426 # Check if the notification record has reply info with root
427 reply_info = None
428 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
429 reply_info = notification.record.reply
430 else:
431 post_uri = None
432 post_cid = None
433 reply_info = None
434
435 if not post_uri or not post_cid:
436 logger.error("Notification doesn't have required uri/cid fields")
437 return None
438
439 # Determine root: if post has reply info, use its root; otherwise this post IS the root
440 if reply_info:
441 # Extract root from the notification's reply structure
442 if isinstance(reply_info, dict):
443 root_ref = reply_info.get('root')
444 if root_ref and isinstance(root_ref, dict):
445 root_uri = root_ref.get('uri', post_uri)
446 root_cid = root_ref.get('cid', post_cid)
447 else:
448 # No root in reply info, use post as root
449 root_uri = post_uri
450 root_cid = post_cid
451 elif hasattr(reply_info, 'root'):
452 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
453 root_uri = reply_info.root.uri
454 root_cid = reply_info.root.cid
455 else:
456 root_uri = post_uri
457 root_cid = post_cid
458 else:
459 root_uri = post_uri
460 root_cid = post_cid
461 else:
462 # No reply info means this post IS the root
463 root_uri = post_uri
464 root_cid = post_cid
465
466 # Reply to the notification
467 return reply_to_post(
468 client=client,
469 text=reply_text,
470 reply_to_uri=post_uri,
471 reply_to_cid=post_cid,
472 root_uri=root_uri,
473 root_cid=root_cid,
474 lang=lang
475 )
476
477 except Exception as e:
478 logger.error(f"Error replying to notification: {e}")
479 return None
480
481
482def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US") -> Optional[List[Dict[str, Any]]]:
483 """
484 Reply to a notification with a threaded chain of messages (max 15).
485
486 Args:
487 client: Authenticated Bluesky client
488 notification: The notification object from list_notifications
489 reply_messages: List of reply texts (max 15 messages, each max 300 chars)
490 lang: Language code for the posts (defaults to "en-US")
491
492 Returns:
493 List of responses from sending the replies or None if failed
494 """
495 try:
496 # Validate input
497 if not reply_messages or len(reply_messages) == 0:
498 logger.error("Reply messages list cannot be empty")
499 return None
500 if len(reply_messages) > 15:
501 logger.error(f"Cannot send more than 15 reply messages (got {len(reply_messages)})")
502 return None
503
504 # Get the post URI and CID from the notification (handle both dict and object)
505 if isinstance(notification, dict):
506 post_uri = notification.get('uri')
507 post_cid = notification.get('cid')
508 # Check if the notification record has reply info with root
509 record = notification.get('record', {})
510 reply_info = record.get('reply') if isinstance(record, dict) else None
511 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
512 post_uri = notification.uri
513 post_cid = notification.cid
514 # Check if the notification record has reply info with root
515 reply_info = None
516 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
517 reply_info = notification.record.reply
518 else:
519 post_uri = None
520 post_cid = None
521 reply_info = None
522
523 if not post_uri or not post_cid:
524 logger.error("Notification doesn't have required uri/cid fields")
525 return None
526
527 # Determine root: if post has reply info, use its root; otherwise this post IS the root
528 if reply_info:
529 # Extract root from the notification's reply structure
530 if isinstance(reply_info, dict):
531 root_ref = reply_info.get('root')
532 if root_ref and isinstance(root_ref, dict):
533 root_uri = root_ref.get('uri', post_uri)
534 root_cid = root_ref.get('cid', post_cid)
535 else:
536 # No root in reply info, use post as root
537 root_uri = post_uri
538 root_cid = post_cid
539 elif hasattr(reply_info, 'root'):
540 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
541 root_uri = reply_info.root.uri
542 root_cid = reply_info.root.cid
543 else:
544 root_uri = post_uri
545 root_cid = post_cid
546 else:
547 root_uri = post_uri
548 root_cid = post_cid
549 else:
550 # No reply info means this post IS the root
551 root_uri = post_uri
552 root_cid = post_cid
553
554 # Send replies in sequence, creating a thread
555 responses = []
556 current_parent_uri = post_uri
557 current_parent_cid = post_cid
558
559 for i, message in enumerate(reply_messages):
560 logger.info(f"Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...")
561
562 # Send this reply
563 response = reply_to_post(
564 client=client,
565 text=message,
566 reply_to_uri=current_parent_uri,
567 reply_to_cid=current_parent_cid,
568 root_uri=root_uri,
569 root_cid=root_cid,
570 lang=lang
571 )
572
573 if not response:
574 logger.error(f"Failed to send reply {i+1}, posting system failure message")
575 # Try to post a system failure message
576 failure_response = reply_to_post(
577 client=client,
578 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]",
579 reply_to_uri=current_parent_uri,
580 reply_to_cid=current_parent_cid,
581 root_uri=root_uri,
582 root_cid=root_cid,
583 lang=lang
584 )
585 if failure_response:
586 responses.append(failure_response)
587 current_parent_uri = failure_response.uri
588 current_parent_cid = failure_response.cid
589 else:
590 logger.error("Could not even send system failure message, stopping thread")
591 return responses if responses else None
592 else:
593 responses.append(response)
594 # Update parent references for next reply (if any)
595 if i < len(reply_messages) - 1: # Not the last message
596 current_parent_uri = response.uri
597 current_parent_cid = response.cid
598
599 logger.info(f"Successfully sent {len(responses)} threaded replies")
600 return responses
601
602 except Exception as e:
603 logger.error(f"Error sending threaded reply to notification: {e}")
604 return None
605
606
607def create_synthesis_ack(client: Client, note: str) -> Optional[Dict[str, Any]]:
608 """
609 Create a stream.thought.ack record for synthesis without a target post.
610
611 This creates a synthesis acknowledgment with null subject field.
612
613 Args:
614 client: Authenticated Bluesky client
615 note: The synthesis note/content
616
617 Returns:
618 The response from creating the acknowledgment record or None if failed
619 """
620 try:
621 import requests
622 import json
623 from datetime import datetime, timezone
624
625 # Get session info from the client
626 access_token = None
627 user_did = None
628
629 # Try different ways to get the session info
630 if hasattr(client, '_session') and client._session:
631 access_token = client._session.access_jwt
632 user_did = client._session.did
633 elif hasattr(client, 'access_jwt'):
634 access_token = client.access_jwt
635 user_did = client.did if hasattr(client, 'did') else None
636 else:
637 logger.error("Cannot access client session information")
638 return None
639
640 if not access_token or not user_did:
641 logger.error("Missing access token or DID from session")
642 return None
643
644 from config_loader import get_bluesky_config
645 try:
646 pds_host = get_bluesky_config().get('pds_uri', 'https://bsky.social')
647 except:
648 pds_host = os.getenv("PDS_URI", "https://bsky.social")
649
650 # Create acknowledgment record with null subject
651 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
652 ack_record = {
653 "$type": "stream.thought.ack",
654 "subject": None, # Null subject for synthesis
655 "createdAt": now,
656 "note": note
657 }
658
659 # Create the record
660 headers = {"Authorization": f"Bearer {access_token}"}
661 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
662
663 create_data = {
664 "repo": user_did,
665 "collection": "stream.thought.ack",
666 "record": ack_record
667 }
668
669 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
670 response.raise_for_status()
671 result = response.json()
672
673 logger.info(f"Successfully created synthesis acknowledgment")
674 return result
675
676 except Exception as e:
677 logger.error(f"Error creating synthesis acknowledgment: {e}")
678 return None
679
680
681def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]:
682 """
683 Create a stream.thought.ack record to acknowledge a post.
684
685 This creates a custom acknowledgment record instead of a standard Bluesky like,
686 allowing void to track which posts it has engaged with.
687
688 Args:
689 client: Authenticated Bluesky client
690 post_uri: The URI of the post to acknowledge
691 post_cid: The CID of the post to acknowledge
692 note: Optional note to attach to the acknowledgment
693
694 Returns:
695 The response from creating the acknowledgment record or None if failed
696 """
697 try:
698 import requests
699 import json
700 from datetime import datetime, timezone
701
702 # Get session info from the client
703 # The atproto Client stores the session differently
704 access_token = None
705 user_did = None
706
707 # Try different ways to get the session info
708 if hasattr(client, '_session') and client._session:
709 access_token = client._session.access_jwt
710 user_did = client._session.did
711 elif hasattr(client, 'access_jwt'):
712 access_token = client.access_jwt
713 user_did = client.did if hasattr(client, 'did') else None
714 else:
715 logger.error("Cannot access client session information")
716 return None
717
718 if not access_token or not user_did:
719 logger.error("Missing access token or DID from session")
720 return None
721
722 from config_loader import get_bluesky_config
723 try:
724 pds_host = get_bluesky_config().get('pds_uri', 'https://bsky.social')
725 except:
726 pds_host = os.getenv("PDS_URI", "https://bsky.social")
727
728 # Create acknowledgment record with stream.thought.ack type
729 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
730 ack_record = {
731 "$type": "stream.thought.ack",
732 "subject": {
733 "uri": post_uri,
734 "cid": post_cid
735 },
736 "createdAt": now,
737 "note": note # Will be null if no note provided
738 }
739
740 # Create the record
741 headers = {"Authorization": f"Bearer {access_token}"}
742 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
743
744 create_data = {
745 "repo": user_did,
746 "collection": "stream.thought.ack",
747 "record": ack_record
748 }
749
750 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
751 response.raise_for_status()
752 result = response.json()
753
754 logger.info(f"Successfully acknowledged post: {post_uri}")
755 return result
756
757 except Exception as e:
758 logger.error(f"Error acknowledging post: {e}")
759 return None
760
761
762def create_tool_call_record(client: Client, tool_name: str, arguments: str, tool_call_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
763 """
764 Create a stream.thought.tool_call record to track tool usage.
765
766 This creates a record of tool calls made by void during processing,
767 allowing for analysis of tool usage patterns and debugging.
768
769 Args:
770 client: Authenticated Bluesky client
771 tool_name: Name of the tool being called
772 arguments: Raw JSON string of the tool arguments
773 tool_call_id: Optional ID of the tool call for correlation
774
775 Returns:
776 The response from creating the tool call record or None if failed
777 """
778 try:
779 import requests
780 import json
781 from datetime import datetime, timezone
782
783 # Get session info from the client
784 access_token = None
785 user_did = None
786
787 # Try different ways to get the session info
788 if hasattr(client, '_session') and client._session:
789 access_token = client._session.access_jwt
790 user_did = client._session.did
791 elif hasattr(client, 'access_jwt'):
792 access_token = client.access_jwt
793 user_did = client.did if hasattr(client, 'did') else None
794 else:
795 logger.error("Cannot access client session information")
796 return None
797
798 if not access_token or not user_did:
799 logger.error("Missing access token or DID from session")
800 return None
801
802 from config_loader import get_bluesky_config
803 try:
804 pds_host = get_bluesky_config().get('pds_uri', 'https://bsky.social')
805 except:
806 pds_host = os.getenv("PDS_URI", "https://bsky.social")
807
808 # Create tool call record
809 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
810 tool_record = {
811 "$type": "stream.thought.tool.call",
812 "tool_name": tool_name,
813 "arguments": arguments, # Store as string to avoid parsing issues
814 "createdAt": now
815 }
816
817 # Add tool_call_id if provided
818 if tool_call_id:
819 tool_record["tool_call_id"] = tool_call_id
820
821 # Create the record
822 headers = {"Authorization": f"Bearer {access_token}"}
823 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
824
825 create_data = {
826 "repo": user_did,
827 "collection": "stream.thought.tool.call",
828 "record": tool_record
829 }
830
831 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
832 if response.status_code != 200:
833 logger.error(f"Tool call record creation failed: {response.status_code} - {response.text}")
834 response.raise_for_status()
835 result = response.json()
836
837 logger.debug(f"Successfully recorded tool call: {tool_name}")
838 return result
839
840 except Exception as e:
841 logger.error(f"Error creating tool call record: {e}")
842 return None
843
844
845def create_reasoning_record(client: Client, reasoning_text: str) -> Optional[Dict[str, Any]]:
846 """
847 Create a stream.thought.reasoning record to track agent reasoning.
848
849 This creates a record of void's reasoning during message processing,
850 providing transparency into the decision-making process.
851
852 Args:
853 client: Authenticated Bluesky client
854 reasoning_text: The reasoning text from the agent
855
856 Returns:
857 The response from creating the reasoning record or None if failed
858 """
859 try:
860 import requests
861 import json
862 from datetime import datetime, timezone
863
864 # Get session info from the client
865 access_token = None
866 user_did = None
867
868 # Try different ways to get the session info
869 if hasattr(client, '_session') and client._session:
870 access_token = client._session.access_jwt
871 user_did = client._session.did
872 elif hasattr(client, 'access_jwt'):
873 access_token = client.access_jwt
874 user_did = client.did if hasattr(client, 'did') else None
875 else:
876 logger.error("Cannot access client session information")
877 return None
878
879 if not access_token or not user_did:
880 logger.error("Missing access token or DID from session")
881 return None
882
883 from config_loader import get_bluesky_config
884 try:
885 pds_host = get_bluesky_config().get('pds_uri', 'https://bsky.social')
886 except:
887 pds_host = os.getenv("PDS_URI", "https://bsky.social")
888
889 # Create reasoning record
890 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
891 reasoning_record = {
892 "$type": "stream.thought.reasoning",
893 "reasoning": reasoning_text,
894 "createdAt": now
895 }
896
897 # Create the record
898 headers = {"Authorization": f"Bearer {access_token}"}
899 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
900
901 create_data = {
902 "repo": user_did,
903 "collection": "stream.thought.reasoning",
904 "record": reasoning_record
905 }
906
907 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
908 response.raise_for_status()
909 result = response.json()
910
911 logger.debug(f"Successfully recorded reasoning (length: {len(reasoning_text)} chars)")
912 return result
913
914 except Exception as e:
915 logger.error(f"Error creating reasoning record: {e}")
916 return None
917
918
919if __name__ == "__main__":
920 client = default_login()
921 # do something with the client
922 logger.info("Client is ready to use!")