a digital person for bluesky
1import os
2import logging
3import uuid
4import time
5from typing import Optional, Dict, Any, List
6from atproto_client import Client, Session, SessionEvent, models
7
8# Configure logging
9logging.basicConfig(
10 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
11)
12logger = logging.getLogger("bluesky_session_handler")
13
14# Load the environment variables
15import dotenv
16dotenv.load_dotenv(override=True)
17
18import yaml
19import json
20
21# Strip fields. A list of fields to remove from a JSON object
22STRIP_FIELDS = [
23 "cid",
24 "rev",
25 "did",
26 "uri",
27 "langs",
28 "threadgate",
29 "py_type",
30 "labels",
31 "avatar",
32 "viewer",
33 "indexed_at",
34 "tags",
35 "associated",
36 "thread_context",
37 "aspect_ratio",
38 "thumb",
39 "fullsize",
40 "root",
41 "created_at",
42 "verification",
43 "like_count",
44 "quote_count",
45 "reply_count",
46 "repost_count",
47 "embedding_disabled",
48 "thread_muted",
49 "reply_disabled",
50 "pinned",
51 "like",
52 "repost",
53 "blocked_by",
54 "blocking",
55 "blocking_by_list",
56 "followed_by",
57 "following",
58 "known_followers",
59 "muted",
60 "muted_by_list",
61 "root_author_like",
62 "entities",
63 "ref",
64 "mime_type",
65 "size",
66]
67def convert_to_basic_types(obj):
68 """Convert complex Python objects to basic types for JSON/YAML serialization."""
69 if hasattr(obj, '__dict__'):
70 # Convert objects with __dict__ to their dictionary representation
71 return convert_to_basic_types(obj.__dict__)
72 elif isinstance(obj, dict):
73 return {key: convert_to_basic_types(value) for key, value in obj.items()}
74 elif isinstance(obj, list):
75 return [convert_to_basic_types(item) for item in obj]
76 elif isinstance(obj, (str, int, float, bool)) or obj is None:
77 return obj
78 else:
79 # For other types, try to convert to string
80 return str(obj)
81
82
83def strip_fields(obj, strip_field_list):
84 """Recursively strip fields from a JSON object."""
85 if isinstance(obj, dict):
86 keys_flagged_for_removal = []
87
88 # Remove fields from strip list and pydantic metadata
89 for field in list(obj.keys()):
90 if field in strip_field_list or field.startswith("__"):
91 keys_flagged_for_removal.append(field)
92
93 # Remove flagged keys
94 for key in keys_flagged_for_removal:
95 obj.pop(key, None)
96
97 # Recursively process remaining values
98 for key, value in list(obj.items()):
99 obj[key] = strip_fields(value, strip_field_list)
100 # Remove empty/null values after processing
101 if (
102 obj[key] is None
103 or (isinstance(obj[key], dict) and len(obj[key]) == 0)
104 or (isinstance(obj[key], list) and len(obj[key]) == 0)
105 or (isinstance(obj[key], str) and obj[key].strip() == "")
106 ):
107 obj.pop(key, None)
108
109 elif isinstance(obj, list):
110 for i, value in enumerate(obj):
111 obj[i] = strip_fields(value, strip_field_list)
112 # Remove None values from list
113 obj[:] = [item for item in obj if item is not None]
114
115 return obj
116
117
118def extract_links_from_facets(record_text: str, facets: list) -> list:
119 """
120 Extract link URLs from facets with their associated text.
121
122 Args:
123 record_text: The post text (needed to extract link text using byte offsets)
124 facets: List of facet objects from post record
125
126 Returns:
127 List of dicts with 'url' and 'text' keys
128 """
129 links = []
130 text_bytes = record_text.encode('utf-8')
131
132 for facet in facets:
133 for feature in facet.features:
134 if hasattr(feature, 'uri'): # Link facet
135 byte_start = facet.index.byte_start
136 byte_end = facet.index.byte_end
137 try:
138 link_text = text_bytes[byte_start:byte_end].decode('utf-8')
139 except (UnicodeDecodeError, IndexError):
140 link_text = feature.uri # Fallback to URL itself
141 links.append({
142 'url': feature.uri,
143 'text': link_text
144 })
145 return links
146
147
148def extract_images_from_embed(embed, include_thumbnails: bool = True) -> list[dict]:
149 """Extract image URLs and alt text from a post embed (View type).
150
151 This function handles the View types returned by get_post_thread(),
152 which contain CDN URLs for images (unlike raw record embeds which
153 only have BlobRefs).
154
155 Also extracts thumbnails from external links and videos when include_thumbnails=True.
156
157 Args:
158 embed: The embed object from post.embed (View type)
159 include_thumbnails: Whether to include thumbnails from links/videos (default True)
160
161 Returns:
162 List of dicts with 'fullsize', 'thumb', 'alt', and optional 'source' keys
163 """
164 images = []
165 if not embed:
166 return images
167
168 embed_type = getattr(embed, 'py_type', '')
169
170 # Direct image embed (app.bsky.embed.images#view)
171 if 'images' in embed_type and 'record' not in embed_type:
172 for img in embed.images:
173 images.append({
174 'fullsize': getattr(img, 'fullsize', None),
175 'thumb': getattr(img, 'thumb', None),
176 'alt': getattr(img, 'alt', '') or ''
177 })
178
179 # External link with thumbnail (app.bsky.embed.external#view)
180 elif 'external' in embed_type and 'record' not in embed_type and include_thumbnails:
181 if hasattr(embed, 'external') and embed.external:
182 thumb = getattr(embed.external, 'thumb', None)
183 if thumb:
184 title = getattr(embed.external, 'title', '') or ''
185 images.append({
186 'fullsize': thumb, # External links only have thumb, use as fullsize too
187 'thumb': thumb,
188 'alt': f"Link preview: {title}" if title else 'Link preview image',
189 'source': 'external_link'
190 })
191
192 # Video with thumbnail (app.bsky.embed.video#view)
193 elif 'video' in embed_type and 'record' not in embed_type and include_thumbnails:
194 thumb = getattr(embed, 'thumbnail', None)
195 if thumb:
196 alt = getattr(embed, 'alt', '') or 'Video thumbnail'
197 images.append({
198 'fullsize': thumb,
199 'thumb': thumb,
200 'alt': alt,
201 'source': 'video'
202 })
203
204 # Quote post with media (app.bsky.embed.recordWithMedia#view)
205 elif 'recordWithMedia' in embed_type and hasattr(embed, 'media'):
206 media_type = getattr(embed.media, 'py_type', '')
207 # Images in media
208 if 'images' in media_type and hasattr(embed.media, 'images'):
209 for img in embed.media.images:
210 images.append({
211 'fullsize': getattr(img, 'fullsize', None),
212 'thumb': getattr(img, 'thumb', None),
213 'alt': getattr(img, 'alt', '') or ''
214 })
215 # External link thumbnail in media
216 elif 'external' in media_type and include_thumbnails:
217 if hasattr(embed.media, 'external') and embed.media.external:
218 thumb = getattr(embed.media.external, 'thumb', None)
219 if thumb:
220 title = getattr(embed.media.external, 'title', '') or ''
221 images.append({
222 'fullsize': thumb,
223 'thumb': thumb,
224 'alt': f"Link preview: {title}" if title else 'Link preview image',
225 'source': 'external_link'
226 })
227 # Video thumbnail in media
228 elif 'video' in media_type and include_thumbnails:
229 thumb = getattr(embed.media, 'thumbnail', None)
230 if thumb:
231 alt = getattr(embed.media, 'alt', '') or 'Video thumbnail'
232 images.append({
233 'fullsize': thumb,
234 'thumb': thumb,
235 'alt': alt,
236 'source': 'video'
237 })
238
239 # Quote post - check for images in nested embeds (app.bsky.embed.record#view)
240 elif 'record' in embed_type and 'recordWithMedia' not in embed_type:
241 if hasattr(embed, 'record') and embed.record:
242 record = embed.record
243 if hasattr(record, 'embeds') and record.embeds:
244 for nested in record.embeds:
245 nested_type = getattr(nested, 'py_type', '')
246 # Nested images
247 if 'images' in nested_type and hasattr(nested, 'images'):
248 for img in nested.images:
249 images.append({
250 'fullsize': getattr(img, 'fullsize', None),
251 'thumb': getattr(img, 'thumb', None),
252 'alt': getattr(img, 'alt', '') or '',
253 'source': 'quoted_post'
254 })
255 # Nested external link thumbnail
256 elif 'external' in nested_type and include_thumbnails:
257 if hasattr(nested, 'external') and nested.external:
258 thumb = getattr(nested.external, 'thumb', None)
259 if thumb:
260 title = getattr(nested.external, 'title', '') or ''
261 images.append({
262 'fullsize': thumb,
263 'thumb': thumb,
264 'alt': f"Link preview: {title}" if title else 'Link preview image',
265 'source': 'quoted_post_link'
266 })
267 # Nested video thumbnail
268 elif 'video' in nested_type and include_thumbnails:
269 thumb = getattr(nested, 'thumbnail', None)
270 if thumb:
271 alt = getattr(nested, 'alt', '') or 'Video thumbnail'
272 images.append({
273 'fullsize': thumb,
274 'thumb': thumb,
275 'alt': alt,
276 'source': 'quoted_post_video'
277 })
278
279 return images
280
281
282def extract_images_from_thread(thread_data, max_images: int = 8) -> list[dict]:
283 """Extract all images from a thread, up to max_images.
284
285 Traverses the thread structure and extracts image URLs from post embeds.
286 Images are collected in chronological order (parents before children).
287
288 Args:
289 thread_data: The thread data from get_post_thread
290 max_images: Maximum number of images to extract (default 8)
291
292 Returns:
293 List of image dicts with 'fullsize', 'thumb', 'alt', 'author_handle' keys
294 """
295 images = []
296
297 def traverse_thread(node):
298 if not node or len(images) >= max_images:
299 return
300
301 # Traverse parent first (chronological order)
302 if hasattr(node, 'parent') and node.parent:
303 traverse_thread(node.parent)
304
305 # Extract images from this post's embed (View type, not record.embed)
306 if hasattr(node, 'post') and node.post:
307 post = node.post
308 if hasattr(post, 'embed') and post.embed:
309 post_images = extract_images_from_embed(post.embed)
310 author_handle = getattr(post.author, 'handle', 'unknown') if hasattr(post, 'author') else 'unknown'
311 for img in post_images:
312 if len(images) >= max_images:
313 break
314 img['author_handle'] = author_handle
315 images.append(img)
316
317 # Traverse replies
318 if hasattr(node, 'replies') and node.replies:
319 for reply in node.replies:
320 if len(images) >= max_images:
321 break
322 traverse_thread(reply)
323
324 if hasattr(thread_data, 'thread'):
325 traverse_thread(thread_data.thread)
326
327 return images
328
329
330def extract_external_link_from_embed(embed) -> dict | None:
331 """Extract external link card data from a post embed (View type).
332
333 External links are shown as "link cards" with URL, title, description,
334 and optional thumbnail.
335
336 Args:
337 embed: The embed object from post.embed (View type)
338
339 Returns:
340 Dict with 'url', 'title', 'description', 'thumbnail' keys, or None
341 """
342 if not embed:
343 return None
344
345 embed_type = getattr(embed, 'py_type', '')
346
347 # Direct external link embed (app.bsky.embed.external#view)
348 if 'external' in embed_type and hasattr(embed, 'external'):
349 external = embed.external
350 return {
351 'url': getattr(external, 'uri', ''),
352 'title': getattr(external, 'title', ''),
353 'description': getattr(external, 'description', ''),
354 'thumbnail': getattr(external, 'thumb', None)
355 }
356
357 # RecordWithMedia with external link (app.bsky.embed.recordWithMedia#view)
358 if 'recordWithMedia' in embed_type and hasattr(embed, 'media'):
359 media_type = getattr(embed.media, 'py_type', '')
360 if 'external' in media_type and hasattr(embed.media, 'external'):
361 external = embed.media.external
362 return {
363 'url': getattr(external, 'uri', ''),
364 'title': getattr(external, 'title', ''),
365 'description': getattr(external, 'description', ''),
366 'thumbnail': getattr(external, 'thumb', None)
367 }
368
369 return None
370
371
372def extract_quote_post_from_embed(embed) -> dict | None:
373 """Extract quoted post data from a record embed (View type).
374
375 Quote posts embed another post, which can include the quoted text,
376 author, and any media attached to the quoted post.
377
378 Args:
379 embed: The embed object from post.embed (View type)
380
381 Returns:
382 Dict with quote post data, or None if not a quote or unavailable
383 """
384 if not embed:
385 return None
386
387 embed_type = getattr(embed, 'py_type', '')
388
389 # Get the record object (works for both record and recordWithMedia)
390 record = None
391 if 'recordWithMedia' in embed_type and hasattr(embed, 'record'):
392 # recordWithMedia has record.record for the actual quote
393 record = getattr(embed.record, 'record', None)
394 elif 'record' in embed_type and hasattr(embed, 'record'):
395 record = embed.record
396
397 if not record:
398 return None
399
400 record_type = getattr(record, 'py_type', '')
401
402 # Handle different quote post states
403 if 'viewNotFound' in record_type:
404 return {
405 'status': 'not_found',
406 'uri': getattr(record, 'uri', ''),
407 'message': 'Quoted post was deleted or not found'
408 }
409
410 if 'viewBlocked' in record_type:
411 return {
412 'status': 'blocked',
413 'uri': getattr(record, 'uri', ''),
414 'message': 'Quoted post is from a blocked account'
415 }
416
417 if 'viewDetached' in record_type:
418 return {
419 'status': 'detached',
420 'uri': getattr(record, 'uri', ''),
421 'message': 'Quoted post was detached'
422 }
423
424 # Normal quote post (viewRecord)
425 if 'viewRecord' in record_type or hasattr(record, 'author'):
426 result = {
427 'status': 'available',
428 'uri': getattr(record, 'uri', ''),
429 }
430
431 # Extract author info
432 if hasattr(record, 'author') and record.author:
433 author = record.author
434 result['author'] = {
435 'handle': getattr(author, 'handle', 'unknown'),
436 'display_name': getattr(author, 'display_name', '') or getattr(author, 'handle', 'unknown')
437 }
438
439 # Extract the quoted post text from value
440 # The 'value' field contains the actual post record
441 if hasattr(record, 'value') and record.value:
442 value = record.value
443 # value can be a dict or an object
444 if isinstance(value, dict):
445 result['text'] = value.get('text', '')
446 elif hasattr(value, 'text'):
447 result['text'] = getattr(value, 'text', '')
448
449 # Extract engagement metrics if present
450 metrics = {}
451 if hasattr(record, 'like_count') and record.like_count is not None:
452 metrics['likes'] = record.like_count
453 if hasattr(record, 'repost_count') and record.repost_count is not None:
454 metrics['reposts'] = record.repost_count
455 if hasattr(record, 'reply_count') and record.reply_count is not None:
456 metrics['replies'] = record.reply_count
457 if hasattr(record, 'quote_count') and record.quote_count is not None:
458 metrics['quotes'] = record.quote_count
459 if metrics:
460 result['metrics'] = metrics
461
462 # Add thread context hints (for hybrid thread navigation)
463 thread_context = {}
464
465 # Reply count indicates replies exist below this post
466 if metrics.get('replies'):
467 thread_context['reply_count'] = metrics['replies']
468
469 # Check if quoted post is itself a reply (has parents above)
470 if hasattr(record, 'value') and record.value:
471 value = record.value
472 reply_ref = value.get('reply') if isinstance(value, dict) else getattr(value, 'reply', None)
473 if reply_ref:
474 thread_context['has_parents'] = True
475
476 if thread_context:
477 result['thread_context'] = thread_context
478
479 # Check for nested embeds in the quoted post
480 if hasattr(record, 'embeds') and record.embeds:
481 nested_embeds = []
482 for nested in record.embeds:
483 nested_type = getattr(nested, 'py_type', '')
484 if 'images' in nested_type:
485 nested_embeds.append({'type': 'images', 'count': len(getattr(nested, 'images', []))})
486 elif 'video' in nested_type:
487 nested_embeds.append({'type': 'video'})
488 elif 'external' in nested_type:
489 ext = getattr(nested, 'external', None)
490 if ext:
491 nested_embeds.append({
492 'type': 'external_link',
493 'url': getattr(ext, 'uri', ''),
494 'title': getattr(ext, 'title', '')
495 })
496 if nested_embeds:
497 result['embeds'] = nested_embeds
498
499 return result
500
501 return None
502
503
504def extract_embed_data(embed) -> dict | None:
505 """Extract structured data from any embed type.
506
507 This is the main entry point for embed extraction. It detects the embed
508 type and delegates to the appropriate extraction function.
509
510 Args:
511 embed: The embed object from post.embed (View type)
512
513 Returns:
514 Dict with embed type and extracted data, or None if no embed
515 """
516 if not embed:
517 return None
518
519 embed_type = getattr(embed, 'py_type', '')
520
521 # Images
522 if 'images' in embed_type and 'record' not in embed_type:
523 images = extract_images_from_embed(embed)
524 if images:
525 return {
526 'type': 'images',
527 'images': images
528 }
529
530 # External link
531 if 'external' in embed_type and 'record' not in embed_type:
532 link = extract_external_link_from_embed(embed)
533 if link:
534 return {
535 'type': 'external_link',
536 'link': link
537 }
538
539 # Quote post (record)
540 if embed_type == 'app.bsky.embed.record#view':
541 quote = extract_quote_post_from_embed(embed)
542 if quote:
543 return {
544 'type': 'quote_post',
545 'quote': quote
546 }
547
548 # Quote post with media (recordWithMedia)
549 if 'recordWithMedia' in embed_type:
550 result = {'type': 'quote_with_media'}
551
552 # Extract the quote
553 quote = extract_quote_post_from_embed(embed)
554 if quote:
555 result['quote'] = quote
556
557 # Extract the media
558 if hasattr(embed, 'media'):
559 media_type = getattr(embed.media, 'py_type', '')
560 if 'images' in media_type:
561 images = extract_images_from_embed(embed)
562 if images:
563 result['media'] = {'type': 'images', 'images': images}
564 elif 'external' in media_type:
565 link = extract_external_link_from_embed(embed)
566 if link:
567 result['media'] = {'type': 'external_link', 'link': link}
568 elif 'video' in media_type:
569 # Basic video info
570 result['media'] = {
571 'type': 'video',
572 'thumbnail': getattr(embed.media, 'thumbnail', None),
573 'alt': getattr(embed.media, 'alt', None)
574 }
575
576 return result
577
578 # Video (basic handling)
579 if 'video' in embed_type:
580 return {
581 'type': 'video',
582 'thumbnail': getattr(embed, 'thumbnail', None),
583 'alt': getattr(embed, 'alt', None)
584 }
585
586 return None
587
588
589def flatten_thread_structure(thread_data):
590 """
591 Flatten a nested thread structure into a list while preserving all data.
592
593 Args:
594 thread_data: The thread data from get_post_thread
595
596 Returns:
597 Dict with 'posts' key containing a list of posts in chronological order
598 """
599 posts = []
600
601 def traverse_thread(node):
602 """Recursively traverse the thread structure to collect posts."""
603 if not node:
604 return
605
606 # If this node has a parent, traverse it first (to maintain chronological order)
607 if hasattr(node, 'parent') and node.parent:
608 traverse_thread(node.parent)
609
610 # Then add this node's post
611 if hasattr(node, 'post') and node.post:
612 # Extract post data by accessing properties directly (not __dict__)
613 # AT Protocol objects store data in properties, not __dict__
614 post = node.post
615
616 # Build post dict with proper property access
617 post_dict = {}
618
619 # Extract basic fields
620 if hasattr(post, 'uri'):
621 post_dict['uri'] = post.uri
622 if hasattr(post, 'cid'):
623 post_dict['cid'] = post.cid
624
625 # Extract author info
626 if hasattr(post, 'author') and post.author:
627 author = post.author
628 post_dict['author'] = {
629 'handle': getattr(author, 'handle', 'unknown'),
630 'display_name': getattr(author, 'display_name', 'unknown'),
631 'did': getattr(author, 'did', 'unknown')
632 }
633
634 # Extract record info (text, created_at, etc.)
635 if hasattr(post, 'record') and post.record:
636 record = post.record
637 record_dict = {
638 'text': getattr(record, 'text', ''),
639 'createdAt': getattr(record, 'created_at', 'unknown')
640 }
641
642 # Extract links from facets if present
643 if hasattr(record, 'facets') and record.facets:
644 links = extract_links_from_facets(
645 getattr(record, 'text', ''),
646 record.facets
647 )
648 if links:
649 record_dict['links'] = links
650
651 post_dict['record'] = record_dict
652
653 # Extract embed data from post.embed (View type with CDN URLs)
654 # This is different from record.embed which only has raw BlobRefs
655 if hasattr(post, 'embed') and post.embed:
656 embed_data = extract_embed_data(post.embed)
657 if embed_data:
658 post_dict['embed'] = embed_data
659
660 # Extract parent_uri for tree visualization
661 parent_uri = None
662 if hasattr(post, 'record') and post.record:
663 record_obj = post.record
664 if hasattr(record_obj, 'reply') and record_obj.reply:
665 reply_ref = record_obj.reply
666 if hasattr(reply_ref, 'parent') and reply_ref.parent:
667 if hasattr(reply_ref.parent, 'uri'):
668 parent_uri = reply_ref.parent.uri
669 post_dict['parent_uri'] = parent_uri
670
671 posts.append(post_dict)
672
673 # Then traverse any replies (going DOWN the thread)
674 if hasattr(node, 'replies') and node.replies:
675 for reply in node.replies:
676 traverse_thread(reply)
677
678 # Handle the thread structure
679 if hasattr(thread_data, 'thread'):
680 # Start from the main thread node
681 traverse_thread(thread_data.thread)
682 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__:
683 traverse_thread(thread_data.__dict__['thread'])
684
685 # Return a simple structure with posts list
686 return {'posts': posts}
687
688
689def count_thread_posts(thread):
690 """
691 Count the number of posts in a thread.
692
693 Args:
694 thread: The thread data from get_post_thread
695
696 Returns:
697 Integer count of posts in the thread
698 """
699 flattened = flatten_thread_structure(thread)
700 return len(flattened.get('posts', []))
701
702
703def compute_tree_prefixes(posts: List[Dict]) -> Dict[str, str]:
704 """
705 Compute tree-style prefixes based on parent relationships.
706
707 Args:
708 posts: List of post dicts, each with 'uri' and 'parent_uri' keys
709
710 Returns:
711 Dict mapping uri -> prefix string (e.g., "├─ ", "│ └─ ")
712 """
713 if not posts:
714 return {}
715
716 uri_to_post = {p.get('uri'): p for p in posts if p.get('uri')}
717 children_map: Dict[str, List[str]] = {} # parent_uri -> [child_uris]
718 root_uris: List[str] = []
719
720 for post in posts:
721 uri = post.get('uri')
722 if not uri:
723 continue
724 parent_uri = post.get('parent_uri')
725 if not parent_uri or parent_uri not in uri_to_post:
726 root_uris.append(uri)
727 else:
728 children_map.setdefault(parent_uri, []).append(uri)
729
730 prefixes: Dict[str, str] = {}
731 visited: set = set()
732
733 def compute_recursive(uri: str, ancestors_last: List[bool]):
734 if uri in visited:
735 return
736 visited.add(uri)
737
738 prefix_parts = []
739 for is_last in ancestors_last[:-1]:
740 prefix_parts.append(" " if is_last else "│ ")
741 if ancestors_last:
742 prefix_parts.append("└─ " if ancestors_last[-1] else "├─ ")
743 prefixes[uri] = "".join(prefix_parts)
744
745 children = children_map.get(uri, [])
746 for i, child_uri in enumerate(children):
747 compute_recursive(child_uri, ancestors_last + [i == len(children) - 1])
748
749 for i, root_uri in enumerate(root_uris):
750 if len(root_uris) == 1:
751 prefixes[root_uri] = ""
752 children = children_map.get(root_uri, [])
753 for j, child_uri in enumerate(children):
754 compute_recursive(child_uri, [j == len(children) - 1])
755 else:
756 compute_recursive(root_uri, [i == len(root_uris) - 1])
757
758 return prefixes
759
760
761def build_tree_view(posts: List[Dict]) -> str:
762 """
763 Build a tree-style text visualization of a thread.
764
765 Args:
766 posts: List of post dicts with uri, parent_uri, author, record fields
767
768 Returns:
769 Multi-line string showing thread structure with tree prefixes
770 """
771 if not posts:
772 return "(empty thread)"
773
774 prefixes = compute_tree_prefixes(posts)
775 lines = []
776
777 for post in posts:
778 uri = post.get('uri', '')
779 prefix = prefixes.get(uri, '')
780
781 author = post.get('author', {})
782 handle = author.get('handle', 'unknown')
783 record = post.get('record', {})
784 text = record.get('text', '').replace('\n', ' | ')
785
786 lines.append(f"{prefix}@{handle}: {text}")
787
788 return "\n".join(lines)
789
790
791def thread_to_yaml_string(thread, strip_metadata=True, include_tree_view=True):
792 """
793 Convert thread data to a YAML-formatted string for LLM parsing.
794
795 Args:
796 thread: The thread data from get_post_thread
797 strip_metadata: Whether to strip metadata fields for cleaner output
798 include_tree_view: Whether to prepend a tree visualization of the thread
799
800 Returns:
801 String representation of the thread with optional tree view and YAML data
802 """
803 # First flatten the thread structure to avoid deep nesting
804 flattened = flatten_thread_structure(thread)
805 posts = flattened.get('posts', [])
806
807 output_parts = []
808
809 # Build tree visualization if requested
810 if include_tree_view and posts:
811 tree_view = build_tree_view(posts)
812 output_parts.append("THREAD STRUCTURE:")
813 output_parts.append(tree_view)
814 output_parts.append("")
815 output_parts.append("FULL POST DATA:")
816
817 # Convert complex objects to basic types
818 basic_thread = convert_to_basic_types(flattened)
819
820 if strip_metadata:
821 # Create a copy and strip unwanted fields
822 cleaned_thread = strip_fields(basic_thread, STRIP_FIELDS)
823 else:
824 cleaned_thread = basic_thread
825
826 yaml_output = yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False)
827 output_parts.append(yaml_output)
828
829 return "\n".join(output_parts)
830
831
832
833
834
835
836
837def get_session(username: str) -> Optional[str]:
838 try:
839 with open(f"session_{username}.txt", encoding="UTF-8") as f:
840 return f.read()
841 except FileNotFoundError:
842 logger.debug(f"No existing session found for {username}")
843 return None
844
845def save_session(username: str, session_string: str) -> None:
846 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f:
847 f.write(session_string)
848 logger.debug(f"Session saved for {username}")
849
850def on_session_change(username: str, event: SessionEvent, session: Session) -> None:
851 logger.debug(f"Session changed: {event} {repr(session)}")
852 if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
853 logger.debug(f"Saving changed session for {username}")
854 save_session(username, session.export())
855
856def init_client(username: str, password: str) -> Client:
857 pds_uri = os.getenv("PDS_URI")
858 if pds_uri is None:
859 logger.warning(
860 "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable."
861 )
862 pds_uri = "https://bsky.social"
863
864 # Print the PDS URI
865 logger.debug(f"Using PDS URI: {pds_uri}")
866
867 client = Client(pds_uri)
868 client.on_session_change(
869 lambda event, session: on_session_change(username, event, session)
870 )
871
872 session_string = get_session(username)
873 if session_string:
874 logger.debug(f"Reusing existing session for {username}")
875 client.login(session_string=session_string)
876 else:
877 logger.debug(f"Creating new session for {username}")
878 client.login(username, password)
879
880 return client
881
882
883def default_login() -> Client:
884 """Login using configuration from config.yaml or environment variables."""
885 try:
886 from config_loader import get_bluesky_config
887 bluesky_config = get_bluesky_config()
888
889 username = bluesky_config['username']
890 password = bluesky_config['password']
891 pds_uri = bluesky_config.get('pds_uri', 'https://bsky.social')
892
893 logger.info(f"Logging into Bluesky as {username} via {pds_uri}")
894
895 # Use pds_uri from config
896 client = Client(base_url=pds_uri)
897 client.login(username, password)
898 return client
899
900 except Exception as e:
901 logger.error(f"Failed to load Bluesky configuration: {e}")
902 logger.error("Please check your config.yaml file or environment variables")
903 exit(1)
904
905def remove_outside_quotes(text: str) -> str:
906 """
907 Remove outside double quotes from response text.
908
909 Only handles double quotes to avoid interfering with contractions:
910 - Double quotes: "text" → text
911 - Preserves single quotes and internal quotes
912
913 Args:
914 text: The text to process
915
916 Returns:
917 Text with outside double quotes removed
918 """
919 if not text or len(text) < 2:
920 return text
921
922 text = text.strip()
923
924 # Only remove double quotes from start and end
925 if text.startswith('"') and text.endswith('"'):
926 return text[1:-1]
927
928 return text
929
930def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None, correlation_id: Optional[str] = None) -> Dict[str, Any]:
931 """
932 Reply to a post on Bluesky with rich text support.
933
934 Args:
935 client: Authenticated Bluesky client
936 text: The reply text
937 reply_to_uri: The URI of the post being replied to (parent)
938 reply_to_cid: The CID of the post being replied to (parent)
939 root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri
940 root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid
941 lang: Language code for the post (e.g., 'en-US', 'es', 'ja')
942 correlation_id: Unique ID for tracking this message through the pipeline
943
944 Returns:
945 The response from sending the post
946 """
947 import re
948
949 # Generate correlation ID if not provided
950 if correlation_id is None:
951 correlation_id = str(uuid.uuid4())[:8]
952
953 # Enhanced logging with structured data
954 logger.info(f"[{correlation_id}] Starting reply_to_post", extra={
955 'correlation_id': correlation_id,
956 'text_length': len(text),
957 'text_preview': text[:100] + '...' if len(text) > 100 else text,
958 'reply_to_uri': reply_to_uri,
959 'root_uri': root_uri,
960 'lang': lang
961 })
962
963 start_time = time.time()
964
965 # If root is not provided, this is a reply to the root post
966 if root_uri is None:
967 root_uri = reply_to_uri
968 root_cid = reply_to_cid
969
970 # Create references for the reply
971 parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid))
972 root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid))
973
974 # Parse rich text facets (mentions and URLs)
975 facets = []
976 text_bytes = text.encode("UTF-8")
977 mentions_found = []
978 urls_found = []
979
980 logger.debug(f"[{correlation_id}] Parsing facets from text (length: {len(text_bytes)} bytes)")
981
982 # Parse mentions - fixed to handle @ at start of text
983 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)"
984
985 for m in re.finditer(mention_regex, text_bytes):
986 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix
987 mentions_found.append(handle)
988 # Adjust byte positions to account for the optional prefix
989 mention_start = m.start(1)
990 mention_end = m.end(1)
991 try:
992 # Resolve handle to DID using the API
993 resolve_resp = client.app.bsky.actor.get_profile({'actor': handle})
994 if resolve_resp and hasattr(resolve_resp, 'did'):
995 facets.append(
996 models.AppBskyRichtextFacet.Main(
997 index=models.AppBskyRichtextFacet.ByteSlice(
998 byteStart=mention_start,
999 byteEnd=mention_end
1000 ),
1001 features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)]
1002 )
1003 )
1004 logger.debug(f"[{correlation_id}] Resolved mention @{handle} -> {resolve_resp.did}")
1005 except Exception as e:
1006 logger.warning(f"[{correlation_id}] Failed to resolve handle @{handle}: {e}")
1007 continue
1008
1009 # Parse URLs - fixed to handle URLs at start of text
1010 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)"
1011
1012 for m in re.finditer(url_regex, text_bytes):
1013 url = m.group(1).decode("UTF-8")
1014 urls_found.append(url)
1015 # Adjust byte positions to account for the optional prefix
1016 url_start = m.start(1)
1017 url_end = m.end(1)
1018 facets.append(
1019 models.AppBskyRichtextFacet.Main(
1020 index=models.AppBskyRichtextFacet.ByteSlice(
1021 byteStart=url_start,
1022 byteEnd=url_end
1023 ),
1024 features=[models.AppBskyRichtextFacet.Link(uri=url)]
1025 )
1026 )
1027 logger.debug(f"[{correlation_id}] Found URL: {url}")
1028
1029 # Parse hashtags
1030 hashtag_regex = rb"(?:^|[$|\s])#([a-zA-Z0-9_]+)"
1031 hashtags_found = []
1032
1033 for m in re.finditer(hashtag_regex, text_bytes):
1034 tag = m.group(1).decode("UTF-8") # Get tag without # prefix
1035 hashtags_found.append(tag)
1036 # Get byte positions for the entire hashtag including #
1037 tag_start = m.start(0)
1038 # Adjust start if there's a space/prefix
1039 if text_bytes[tag_start:tag_start+1] in (b' ', b'$'):
1040 tag_start += 1
1041 tag_end = m.end(0)
1042 facets.append(
1043 models.AppBskyRichtextFacet.Main(
1044 index=models.AppBskyRichtextFacet.ByteSlice(
1045 byteStart=tag_start,
1046 byteEnd=tag_end
1047 ),
1048 features=[models.AppBskyRichtextFacet.Tag(tag=tag)]
1049 )
1050 )
1051 logger.debug(f"[{correlation_id}] Found hashtag: #{tag}")
1052
1053 logger.debug(f"[{correlation_id}] Facet parsing complete", extra={
1054 'correlation_id': correlation_id,
1055 'mentions_count': len(mentions_found),
1056 'mentions': mentions_found,
1057 'urls_count': len(urls_found),
1058 'urls': urls_found,
1059 'hashtags_count': len(hashtags_found),
1060 'hashtags': hashtags_found,
1061 'total_facets': len(facets)
1062 })
1063
1064 # Send the reply with facets if any were found
1065 logger.info(f"[{correlation_id}] Sending reply to Bluesky API", extra={
1066 'correlation_id': correlation_id,
1067 'has_facets': bool(facets),
1068 'facet_count': len(facets),
1069 'lang': lang
1070 })
1071
1072 try:
1073 if facets:
1074 response = client.send_post(
1075 text=text,
1076 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
1077 facets=facets,
1078 langs=[lang] if lang else None
1079 )
1080 else:
1081 response = client.send_post(
1082 text=text,
1083 reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
1084 langs=[lang] if lang else None
1085 )
1086
1087 # Calculate response time
1088 response_time = time.time() - start_time
1089
1090 # Extract post URL for user-friendly logging
1091 post_url = None
1092 if hasattr(response, 'uri') and response.uri:
1093 # Convert AT-URI to web URL
1094 # Format: at://did:plc:xxx/app.bsky.feed.post/xxx -> https://bsky.app/profile/handle/post/xxx
1095 try:
1096 uri_parts = response.uri.split('/')
1097 if len(uri_parts) >= 4 and uri_parts[3] == 'app.bsky.feed.post':
1098 rkey = uri_parts[4]
1099 # We'd need to resolve DID to handle, but for now just use the URI
1100 post_url = f"bsky://post/{rkey}"
1101 except:
1102 pass
1103
1104 logger.info(f"[{correlation_id}] Reply sent successfully ({response_time:.3f}s) - URI: {response.uri}" +
1105 (f" - URL: {post_url}" if post_url else ""), extra={
1106 'correlation_id': correlation_id,
1107 'response_time': round(response_time, 3),
1108 'post_uri': response.uri,
1109 'post_url': post_url,
1110 'post_cid': getattr(response, 'cid', None),
1111 'text_length': len(text)
1112 })
1113
1114 return response
1115
1116 except Exception as e:
1117 response_time = time.time() - start_time
1118 logger.error(f"[{correlation_id}] Failed to send reply", extra={
1119 'correlation_id': correlation_id,
1120 'error': str(e),
1121 'error_type': type(e).__name__,
1122 'response_time': round(response_time, 3),
1123 'text_length': len(text)
1124 })
1125 raise
1126
1127
1128def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]:
1129 """
1130 Get the thread containing a post to find root post information.
1131
1132 Args:
1133 client: Authenticated Bluesky client
1134 uri: The URI of the post
1135
1136 Returns:
1137 The thread data or None if not found
1138 """
1139 try:
1140 thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10})
1141 return thread
1142 except Exception as e:
1143 logger.error(f"Error fetching post thread: {e}")
1144 return None
1145
1146
1147def find_last_consecutive_post_in_chain(thread_node, author_handle: str):
1148 """
1149 Find the last consecutive post in the direct reply chain by the same author.
1150
1151 Starting from the given thread node, this function traverses down the direct reply chain
1152 (not all branches) to find the last consecutive post made by the specified author.
1153
1154 Args:
1155 thread_node: The thread node to start from (usually the mention post's thread node)
1156 author_handle: The handle of the author to match (e.g., "user.bsky.social")
1157
1158 Returns:
1159 Tuple of (uri, cid, text) for the last consecutive post by the author, or None if no consecutive posts
1160
1161 Example:
1162 If the thread structure is:
1163 - Post A by @alice (mention) -> thread_node starts here
1164 - Post B by @alice (consecutive)
1165 - Post C by @alice (consecutive)
1166 - Post D by @bob (different author, stop here)
1167
1168 Returns (uri_C, cid_C, text_C)
1169 """
1170 if not thread_node:
1171 return None
1172
1173 # Start with the current node's post
1174 current_post = None
1175 if hasattr(thread_node, 'post') and thread_node.post:
1176 current_post = thread_node.post
1177
1178 if not current_post:
1179 return None
1180
1181 # Check if current post is by the target author
1182 current_author = None
1183 if hasattr(current_post, 'author') and hasattr(current_post.author, 'handle'):
1184 current_author = current_post.author.handle
1185
1186 if current_author != author_handle:
1187 # Current post is not by target author, can't find consecutive posts
1188 return None
1189
1190 # Track the last consecutive post (start with current)
1191 last_uri = current_post.uri if hasattr(current_post, 'uri') else None
1192 last_cid = current_post.cid if hasattr(current_post, 'cid') else None
1193 last_text = ""
1194 if hasattr(current_post, 'record') and hasattr(current_post.record, 'text'):
1195 last_text = current_post.record.text
1196
1197 # Traverse down the direct reply chain
1198 current_node = thread_node
1199 while True:
1200 # Check if there are replies to this node
1201 if not hasattr(current_node, 'replies') or not current_node.replies:
1202 # No more replies, we've found the last consecutive post
1203 break
1204
1205 # For direct chain traversal, we look for replies by the same author
1206 # If there are multiple replies, we'll take the first one by the same author
1207 next_node = None
1208 for reply in current_node.replies:
1209 if hasattr(reply, 'post') and reply.post:
1210 reply_author = None
1211 if hasattr(reply.post, 'author') and hasattr(reply.post.author, 'handle'):
1212 reply_author = reply.post.author.handle
1213
1214 if reply_author == author_handle:
1215 # Found a consecutive post by same author
1216 next_node = reply
1217 break
1218
1219 if not next_node:
1220 # No more consecutive posts by same author
1221 break
1222
1223 # Update last post info to this consecutive post
1224 current_node = next_node
1225 current_post = current_node.post
1226
1227 if hasattr(current_post, 'uri'):
1228 last_uri = current_post.uri
1229 if hasattr(current_post, 'cid'):
1230 last_cid = current_post.cid
1231 if hasattr(current_post, 'record') and hasattr(current_post.record, 'text'):
1232 last_text = current_post.record.text
1233
1234 # Return the last consecutive post's metadata
1235 # Only return if we actually have valid URI and CID
1236 if last_uri and last_cid:
1237 return (last_uri, last_cid, last_text)
1238
1239 return None
1240
1241
1242def find_consecutive_parent_posts_by_author(thread_node, author_handle: str) -> List[Dict]:
1243 """
1244 Find consecutive posts by the same author in the parent chain.
1245
1246 Starting from the given thread node, this function traverses UP the parent chain
1247 to find all consecutive posts made by the specified author.
1248
1249 This is the inverse of find_last_consecutive_post_in_chain which traverses DOWN.
1250
1251 Args:
1252 thread_node: The thread node to start from (the notification post's thread node)
1253 author_handle: The handle of the author to match (e.g., "user.bsky.social")
1254
1255 Returns:
1256 List of post dicts for consecutive posts by the author in the parent chain,
1257 in chronological order (oldest first). Returns empty list if no parent posts
1258 by the same author.
1259
1260 Example:
1261 If the thread structure is:
1262 - Post A by @alice (first part)
1263 - Post B by @alice (consecutive) <- start from here (notification)
1264
1265 Returns [Post A dict] (not including Post B since that's the current node)
1266 """
1267 parent_posts = []
1268
1269 if not thread_node:
1270 return parent_posts
1271
1272 # Traverse up the parent chain
1273 current_node = thread_node
1274 while True:
1275 # Check if this node has a parent
1276 if not hasattr(current_node, 'parent') or not current_node.parent:
1277 break
1278
1279 parent_node = current_node.parent
1280 if not hasattr(parent_node, 'post') or not parent_node.post:
1281 break
1282
1283 parent_post = parent_node.post
1284
1285 # Check if parent is by the same author
1286 parent_author = None
1287 if hasattr(parent_post, 'author') and hasattr(parent_post.author, 'handle'):
1288 parent_author = parent_post.author.handle
1289
1290 if parent_author != author_handle:
1291 # Parent is by different author, stop here
1292 break
1293
1294 # Collect this parent post
1295 post_dict = {
1296 'uri': getattr(parent_post, 'uri', ''),
1297 'cid': getattr(parent_post, 'cid', ''),
1298 'author': {
1299 'handle': parent_author,
1300 'display_name': getattr(parent_post.author, 'display_name', '') if hasattr(parent_post, 'author') else '',
1301 'did': getattr(parent_post.author, 'did', '') if hasattr(parent_post, 'author') else ''
1302 },
1303 'record': {
1304 'text': getattr(parent_post.record, 'text', '') if hasattr(parent_post, 'record') else '',
1305 'createdAt': getattr(parent_post.record, 'created_at', '') if hasattr(parent_post, 'record') else ''
1306 }
1307 }
1308 parent_posts.append(post_dict)
1309
1310 # Move up to the next parent
1311 current_node = parent_node
1312
1313 # Return in chronological order (oldest first)
1314 parent_posts.reverse()
1315 return parent_posts
1316
1317
1318def reply_to_notification(client: Client, notification: Any, reply_text: str, lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
1319 """
1320 Reply to a notification (mention or reply).
1321
1322 Args:
1323 client: Authenticated Bluesky client
1324 notification: The notification object from list_notifications
1325 reply_text: The text to reply with
1326 lang: Language code for the post (defaults to "en-US")
1327 correlation_id: Unique ID for tracking this message through the pipeline
1328
1329 Returns:
1330 The response from sending the reply or None if failed
1331 """
1332 # Generate correlation ID if not provided
1333 if correlation_id is None:
1334 correlation_id = str(uuid.uuid4())[:8]
1335
1336 logger.info(f"[{correlation_id}] Processing reply_to_notification", extra={
1337 'correlation_id': correlation_id,
1338 'reply_length': len(reply_text),
1339 'lang': lang
1340 })
1341
1342 try:
1343 # Get the post URI and CID from the notification (handle both dict and object)
1344 if isinstance(notification, dict):
1345 post_uri = notification.get('uri')
1346 post_cid = notification.get('cid')
1347 # Check if the notification record has reply info with root
1348 record = notification.get('record', {})
1349 reply_info = record.get('reply') if isinstance(record, dict) else None
1350 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
1351 post_uri = notification.uri
1352 post_cid = notification.cid
1353 # Check if the notification record has reply info with root
1354 reply_info = None
1355 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
1356 reply_info = notification.record.reply
1357 else:
1358 post_uri = None
1359 post_cid = None
1360 reply_info = None
1361
1362 if not post_uri or not post_cid:
1363 logger.error("Notification doesn't have required uri/cid fields")
1364 return None
1365
1366 # Determine root: if post has reply info, use its root; otherwise this post IS the root
1367 if reply_info:
1368 # Extract root from the notification's reply structure
1369 if isinstance(reply_info, dict):
1370 root_ref = reply_info.get('root')
1371 if root_ref and isinstance(root_ref, dict):
1372 root_uri = root_ref.get('uri', post_uri)
1373 root_cid = root_ref.get('cid', post_cid)
1374 else:
1375 # No root in reply info, use post as root
1376 root_uri = post_uri
1377 root_cid = post_cid
1378 elif hasattr(reply_info, 'root'):
1379 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
1380 root_uri = reply_info.root.uri
1381 root_cid = reply_info.root.cid
1382 else:
1383 root_uri = post_uri
1384 root_cid = post_cid
1385 else:
1386 root_uri = post_uri
1387 root_cid = post_cid
1388 else:
1389 # No reply info means this post IS the root
1390 root_uri = post_uri
1391 root_cid = post_cid
1392
1393 # Reply to the notification
1394 return reply_to_post(
1395 client=client,
1396 text=reply_text,
1397 reply_to_uri=post_uri,
1398 reply_to_cid=post_cid,
1399 root_uri=root_uri,
1400 root_cid=root_cid,
1401 lang=lang,
1402 correlation_id=correlation_id
1403 )
1404
1405 except Exception as e:
1406 logger.error(f"[{correlation_id}] Error replying to notification: {e}", extra={
1407 'correlation_id': correlation_id,
1408 'error': str(e),
1409 'error_type': type(e).__name__
1410 })
1411 return None
1412
1413
1414def reply_with_thread_to_notification(client: Client, notification: Any, reply_messages: List[str], lang: str = "en-US", correlation_id: Optional[str] = None) -> Optional[List[Dict[str, Any]]]:
1415 """
1416 Reply to a notification with a threaded chain of messages (max 15).
1417
1418 Args:
1419 client: Authenticated Bluesky client
1420 notification: The notification object from list_notifications
1421 reply_messages: List of reply texts (max 15 messages, each max 300 chars)
1422 lang: Language code for the posts (defaults to "en-US")
1423 correlation_id: Unique ID for tracking this message through the pipeline
1424
1425 Returns:
1426 List of responses from sending the replies or None if failed
1427 """
1428 # Generate correlation ID if not provided
1429 if correlation_id is None:
1430 correlation_id = str(uuid.uuid4())[:8]
1431
1432 logger.info(f"[{correlation_id}] Starting threaded reply", extra={
1433 'correlation_id': correlation_id,
1434 'message_count': len(reply_messages),
1435 'total_length': sum(len(msg) for msg in reply_messages),
1436 'lang': lang
1437 })
1438
1439 try:
1440 # Validate input
1441 if not reply_messages or len(reply_messages) == 0:
1442 logger.error(f"[{correlation_id}] Reply messages list cannot be empty")
1443 return None
1444 if len(reply_messages) > 15:
1445 logger.error(f"[{correlation_id}] Cannot send more than 15 reply messages (got {len(reply_messages)})")
1446 return None
1447
1448 # Get the post URI and CID from the notification (handle both dict and object)
1449 if isinstance(notification, dict):
1450 post_uri = notification.get('uri')
1451 post_cid = notification.get('cid')
1452 # Check if the notification record has reply info with root
1453 record = notification.get('record', {})
1454 reply_info = record.get('reply') if isinstance(record, dict) else None
1455 elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
1456 post_uri = notification.uri
1457 post_cid = notification.cid
1458 # Check if the notification record has reply info with root
1459 reply_info = None
1460 if hasattr(notification, 'record') and hasattr(notification.record, 'reply'):
1461 reply_info = notification.record.reply
1462 else:
1463 post_uri = None
1464 post_cid = None
1465 reply_info = None
1466
1467 if not post_uri or not post_cid:
1468 logger.error("Notification doesn't have required uri/cid fields")
1469 return None
1470
1471 # Determine root: if post has reply info, use its root; otherwise this post IS the root
1472 if reply_info:
1473 # Extract root from the notification's reply structure
1474 if isinstance(reply_info, dict):
1475 root_ref = reply_info.get('root')
1476 if root_ref and isinstance(root_ref, dict):
1477 root_uri = root_ref.get('uri', post_uri)
1478 root_cid = root_ref.get('cid', post_cid)
1479 else:
1480 # No root in reply info, use post as root
1481 root_uri = post_uri
1482 root_cid = post_cid
1483 elif hasattr(reply_info, 'root'):
1484 if hasattr(reply_info.root, 'uri') and hasattr(reply_info.root, 'cid'):
1485 root_uri = reply_info.root.uri
1486 root_cid = reply_info.root.cid
1487 else:
1488 root_uri = post_uri
1489 root_cid = post_cid
1490 else:
1491 root_uri = post_uri
1492 root_cid = post_cid
1493 else:
1494 # No reply info means this post IS the root
1495 root_uri = post_uri
1496 root_cid = post_cid
1497
1498 # Send replies in sequence, creating a thread
1499 responses = []
1500 current_parent_uri = post_uri
1501 current_parent_cid = post_cid
1502
1503 for i, message in enumerate(reply_messages):
1504 thread_correlation_id = f"{correlation_id}-{i+1}"
1505 logger.info(f"[{thread_correlation_id}] Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...")
1506
1507 # Send this reply
1508 response = reply_to_post(
1509 client=client,
1510 text=message,
1511 reply_to_uri=current_parent_uri,
1512 reply_to_cid=current_parent_cid,
1513 root_uri=root_uri,
1514 root_cid=root_cid,
1515 lang=lang,
1516 correlation_id=thread_correlation_id
1517 )
1518
1519 if not response:
1520 logger.error(f"[{thread_correlation_id}] Failed to send reply {i+1}, posting system failure message")
1521 # Try to post a system failure message
1522 failure_response = reply_to_post(
1523 client=client,
1524 text="[SYSTEM FAILURE: COULD NOT POST MESSAGE, PLEASE TRY AGAIN]",
1525 reply_to_uri=current_parent_uri,
1526 reply_to_cid=current_parent_cid,
1527 root_uri=root_uri,
1528 root_cid=root_cid,
1529 lang=lang,
1530 correlation_id=f"{thread_correlation_id}-FAIL"
1531 )
1532 if failure_response:
1533 responses.append(failure_response)
1534 current_parent_uri = failure_response.uri
1535 current_parent_cid = failure_response.cid
1536 else:
1537 logger.error(f"[{thread_correlation_id}] Could not even send system failure message, stopping thread")
1538 return responses if responses else None
1539 else:
1540 responses.append(response)
1541 # Update parent references for next reply (if any)
1542 if i < len(reply_messages) - 1: # Not the last message
1543 current_parent_uri = response.uri
1544 current_parent_cid = response.cid
1545
1546 logger.info(f"[{correlation_id}] Successfully sent {len(responses)} threaded replies", extra={
1547 'correlation_id': correlation_id,
1548 'replies_sent': len(responses),
1549 'replies_requested': len(reply_messages)
1550 })
1551 return responses
1552
1553 except Exception as e:
1554 logger.error(f"[{correlation_id}] Error sending threaded reply to notification: {e}", extra={
1555 'correlation_id': correlation_id,
1556 'error': str(e),
1557 'error_type': type(e).__name__,
1558 'message_count': len(reply_messages)
1559 })
1560 return None
1561
1562
1563def create_synthesis_ack(client: Client, note: str) -> Optional[Dict[str, Any]]:
1564 """
1565 Create a stream.thought.ack record for synthesis without a target post.
1566
1567 This creates a synthesis acknowledgment with null subject field.
1568
1569 Args:
1570 client: Authenticated Bluesky client
1571 note: The synthesis note/content
1572
1573 Returns:
1574 The response from creating the acknowledgment record or None if failed
1575 """
1576 try:
1577 import requests
1578 import json
1579 from datetime import datetime, timezone
1580
1581 # Get session info from the client
1582 access_token = None
1583 user_did = None
1584
1585 # Try different ways to get the session info
1586 if hasattr(client, '_session') and client._session:
1587 access_token = client._session.access_jwt
1588 user_did = client._session.did
1589 elif hasattr(client, 'access_jwt'):
1590 access_token = client.access_jwt
1591 user_did = client.did if hasattr(client, 'did') else None
1592 else:
1593 logger.error("Cannot access client session information")
1594 return None
1595
1596 if not access_token or not user_did:
1597 logger.error("Missing access token or DID from session")
1598 return None
1599
1600 # Get PDS URI from config instead of environment variables
1601 from config_loader import get_bluesky_config
1602 bluesky_config = get_bluesky_config()
1603 pds_host = bluesky_config['pds_uri']
1604
1605 # Create acknowledgment record with null subject
1606 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
1607 ack_record = {
1608 "$type": "stream.thought.ack",
1609 "subject": None, # Null subject for synthesis
1610 "createdAt": now,
1611 "note": note
1612 }
1613
1614 # Create the record
1615 headers = {"Authorization": f"Bearer {access_token}"}
1616 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
1617
1618 create_data = {
1619 "repo": user_did,
1620 "collection": "stream.thought.ack",
1621 "record": ack_record
1622 }
1623
1624 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
1625 response.raise_for_status()
1626 result = response.json()
1627
1628 logger.info(f"Successfully created synthesis acknowledgment")
1629 return result
1630
1631 except Exception as e:
1632 logger.error(f"Error creating synthesis acknowledgment: {e}")
1633 return None
1634
1635
1636def acknowledge_post(client: Client, post_uri: str, post_cid: str, note: Optional[str] = None) -> Optional[Dict[str, Any]]:
1637 """
1638 Create a stream.thought.ack record to acknowledge a post.
1639
1640 This creates a custom acknowledgment record instead of a standard Bluesky like,
1641 allowing void to track which posts it has engaged with.
1642
1643 Args:
1644 client: Authenticated Bluesky client
1645 post_uri: The URI of the post to acknowledge
1646 post_cid: The CID of the post to acknowledge
1647 note: Optional note to attach to the acknowledgment
1648
1649 Returns:
1650 The response from creating the acknowledgment record or None if failed
1651 """
1652 try:
1653 import requests
1654 import json
1655 from datetime import datetime, timezone
1656
1657 # Get session info from the client
1658 # The atproto Client stores the session differently
1659 access_token = None
1660 user_did = None
1661
1662 # Try different ways to get the session info
1663 if hasattr(client, '_session') and client._session:
1664 access_token = client._session.access_jwt
1665 user_did = client._session.did
1666 elif hasattr(client, 'access_jwt'):
1667 access_token = client.access_jwt
1668 user_did = client.did if hasattr(client, 'did') else None
1669 else:
1670 logger.error("Cannot access client session information")
1671 return None
1672
1673 if not access_token or not user_did:
1674 logger.error("Missing access token or DID from session")
1675 return None
1676
1677 # Get PDS URI from config instead of environment variables
1678 from config_loader import get_bluesky_config
1679 bluesky_config = get_bluesky_config()
1680 pds_host = bluesky_config['pds_uri']
1681
1682 # Create acknowledgment record with stream.thought.ack type
1683 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
1684 ack_record = {
1685 "$type": "stream.thought.ack",
1686 "subject": {
1687 "uri": post_uri,
1688 "cid": post_cid
1689 },
1690 "createdAt": now,
1691 "note": note # Will be null if no note provided
1692 }
1693
1694 # Create the record
1695 headers = {"Authorization": f"Bearer {access_token}"}
1696 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
1697
1698 create_data = {
1699 "repo": user_did,
1700 "collection": "stream.thought.ack",
1701 "record": ack_record
1702 }
1703
1704 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
1705 response.raise_for_status()
1706 result = response.json()
1707
1708 logger.info(f"Successfully acknowledged post: {post_uri}")
1709 return result
1710
1711 except Exception as e:
1712 logger.error(f"Error acknowledging post: {e}")
1713 return None
1714
1715
1716def create_tool_call_record(client: Client, tool_name: str, arguments: str, tool_call_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
1717 """
1718 Create a stream.thought.tool_call record to track tool usage.
1719
1720 This creates a record of tool calls made by void during processing,
1721 allowing for analysis of tool usage patterns and debugging.
1722
1723 Args:
1724 client: Authenticated Bluesky client
1725 tool_name: Name of the tool being called
1726 arguments: Raw JSON string of the tool arguments
1727 tool_call_id: Optional ID of the tool call for correlation
1728
1729 Returns:
1730 The response from creating the tool call record or None if failed
1731 """
1732 try:
1733 import requests
1734 import json
1735 from datetime import datetime, timezone
1736
1737 # Get session info from the client
1738 access_token = None
1739 user_did = None
1740
1741 # Try different ways to get the session info
1742 if hasattr(client, '_session') and client._session:
1743 access_token = client._session.access_jwt
1744 user_did = client._session.did
1745 elif hasattr(client, 'access_jwt'):
1746 access_token = client.access_jwt
1747 user_did = client.did if hasattr(client, 'did') else None
1748 else:
1749 logger.error("Cannot access client session information")
1750 return None
1751
1752 if not access_token or not user_did:
1753 logger.error("Missing access token or DID from session")
1754 return None
1755
1756 # Get PDS URI from config instead of environment variables
1757 from config_loader import get_bluesky_config
1758 bluesky_config = get_bluesky_config()
1759 pds_host = bluesky_config['pds_uri']
1760
1761 # Create tool call record
1762 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
1763 tool_record = {
1764 "$type": "stream.thought.tool.call",
1765 "tool_name": tool_name,
1766 "arguments": arguments, # Store as string to avoid parsing issues
1767 "createdAt": now
1768 }
1769
1770 # Add tool_call_id if provided
1771 if tool_call_id:
1772 tool_record["tool_call_id"] = tool_call_id
1773
1774 # Create the record
1775 headers = {"Authorization": f"Bearer {access_token}"}
1776 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
1777
1778 create_data = {
1779 "repo": user_did,
1780 "collection": "stream.thought.tool.call",
1781 "record": tool_record
1782 }
1783
1784 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
1785 if response.status_code != 200:
1786 logger.error(f"Tool call record creation failed: {response.status_code} - {response.text}")
1787 response.raise_for_status()
1788 result = response.json()
1789
1790 logger.debug(f"Successfully recorded tool call: {tool_name}")
1791 return result
1792
1793 except Exception as e:
1794 logger.error(f"Error creating tool call record: {e}")
1795 return None
1796
1797
1798def create_reasoning_record(client: Client, reasoning_text: str) -> Optional[Dict[str, Any]]:
1799 """
1800 Create a stream.thought.reasoning record to track agent reasoning.
1801
1802 This creates a record of void's reasoning during message processing,
1803 providing transparency into the decision-making process.
1804
1805 Args:
1806 client: Authenticated Bluesky client
1807 reasoning_text: The reasoning text from the agent
1808
1809 Returns:
1810 The response from creating the reasoning record or None if failed
1811 """
1812 try:
1813 import requests
1814 import json
1815 from datetime import datetime, timezone
1816
1817 # Get session info from the client
1818 access_token = None
1819 user_did = None
1820
1821 # Try different ways to get the session info
1822 if hasattr(client, '_session') and client._session:
1823 access_token = client._session.access_jwt
1824 user_did = client._session.did
1825 elif hasattr(client, 'access_jwt'):
1826 access_token = client.access_jwt
1827 user_did = client.did if hasattr(client, 'did') else None
1828 else:
1829 logger.error("Cannot access client session information")
1830 return None
1831
1832 if not access_token or not user_did:
1833 logger.error("Missing access token or DID from session")
1834 return None
1835
1836 # Get PDS URI from config instead of environment variables
1837 from config_loader import get_bluesky_config
1838 bluesky_config = get_bluesky_config()
1839 pds_host = bluesky_config['pds_uri']
1840
1841 # Create reasoning record
1842 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
1843 reasoning_record = {
1844 "$type": "stream.thought.reasoning",
1845 "reasoning": reasoning_text,
1846 "createdAt": now
1847 }
1848
1849 # Create the record
1850 headers = {"Authorization": f"Bearer {access_token}"}
1851 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
1852
1853 create_data = {
1854 "repo": user_did,
1855 "collection": "stream.thought.reasoning",
1856 "record": reasoning_record
1857 }
1858
1859 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
1860 response.raise_for_status()
1861 result = response.json()
1862
1863 logger.debug(f"Successfully recorded reasoning (length: {len(reasoning_text)} chars)")
1864 return result
1865
1866 except Exception as e:
1867 logger.error(f"Error creating reasoning record: {e}")
1868 return None
1869
1870
1871def create_memory_record(client: Client, content: str, tags: Optional[List[str]] = None) -> Optional[Dict[str, Any]]:
1872 """
1873 Create a stream.thought.memory record to store archival memory insertions.
1874
1875 This creates a record of archival_memory_insert tool calls, preserving
1876 important memories and context in the AT Protocol.
1877
1878 Args:
1879 client: Authenticated Bluesky client
1880 content: The memory content being archived
1881 tags: Optional list of tags associated with this memory
1882
1883 Returns:
1884 The response from creating the memory record or None if failed
1885 """
1886 try:
1887 import requests
1888 import json
1889 from datetime import datetime, timezone
1890
1891 # Get session info from the client
1892 access_token = None
1893 user_did = None
1894
1895 # Try different ways to get the session info
1896 if hasattr(client, '_session') and client._session:
1897 access_token = client._session.access_jwt
1898 user_did = client._session.did
1899 elif hasattr(client, 'access_jwt'):
1900 access_token = client.access_jwt
1901 user_did = client.did if hasattr(client, 'did') else None
1902 else:
1903 logger.error("Cannot access client session information")
1904 return None
1905
1906 if not access_token or not user_did:
1907 logger.error("Missing access token or DID from session")
1908 return None
1909
1910 # Get PDS URI from config instead of environment variables
1911 from config_loader import get_bluesky_config
1912 bluesky_config = get_bluesky_config()
1913 pds_host = bluesky_config['pds_uri']
1914
1915 # Create memory record
1916 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
1917 memory_record = {
1918 "$type": "stream.thought.memory",
1919 "content": content,
1920 "createdAt": now
1921 }
1922
1923 # Add tags if provided (can be null)
1924 if tags is not None:
1925 memory_record["tags"] = tags
1926
1927 # Create the record
1928 headers = {"Authorization": f"Bearer {access_token}"}
1929 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
1930
1931 create_data = {
1932 "repo": user_did,
1933 "collection": "stream.thought.memory",
1934 "record": memory_record
1935 }
1936
1937 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
1938 response.raise_for_status()
1939 result = response.json()
1940
1941 tags_info = f" with {len(tags)} tags" if tags else " (no tags)"
1942 logger.debug(f"Successfully recorded memory (length: {len(content)} chars{tags_info})")
1943 return result
1944
1945 except Exception as e:
1946 logger.error(f"Error creating memory record: {e}")
1947 return None
1948
1949
1950def sync_followers(client: Client, dry_run: bool = False) -> Dict[str, Any]:
1951 """
1952 Check who is following the bot and who the bot is following,
1953 then follow back users who aren't already followed.
1954
1955 This implements the autofollow feature by creating follow records
1956 (app.bsky.graph.follow) for users who follow the bot.
1957
1958 Args:
1959 client: Authenticated Bluesky client
1960 dry_run: If True, only report what would be done without actually following
1961
1962 Returns:
1963 Dict with stats: {
1964 'followers_count': int,
1965 'following_count': int,
1966 'to_follow': List[str], # List of handles to follow
1967 'newly_followed': List[str], # List of handles actually followed (empty if dry_run)
1968 'errors': List[str] # Any errors encountered
1969 }
1970 """
1971 try:
1972 from datetime import datetime, timezone
1973
1974 # Get session info from the client
1975 access_token = None
1976 user_did = None
1977
1978 if hasattr(client, '_session') and client._session:
1979 access_token = client._session.access_jwt
1980 user_did = client._session.did
1981 elif hasattr(client, 'access_jwt'):
1982 access_token = client.access_jwt
1983 user_did = client.did if hasattr(client, 'did') else None
1984 else:
1985 logger.error("Cannot access client session information")
1986 return {'error': 'Cannot access client session'}
1987
1988 if not access_token or not user_did:
1989 logger.error("Missing access token or DID from session")
1990 return {'error': 'Missing access token or DID'}
1991
1992 # Get PDS URI from config
1993 from config_loader import get_bluesky_config
1994 bluesky_config = get_bluesky_config()
1995 pds_host = bluesky_config['pds_uri']
1996
1997 # Get followers using the API
1998 followers_response = client.app.bsky.graph.get_followers({'actor': user_did})
1999 followers = followers_response.followers if hasattr(followers_response, 'followers') else []
2000 follower_dids = {f.did for f in followers}
2001
2002 # Get following using the API
2003 following_response = client.app.bsky.graph.get_follows({'actor': user_did})
2004 following = following_response.follows if hasattr(following_response, 'follows') else []
2005 following_dids = {f.did for f in following}
2006
2007 # Find users who follow us but we don't follow back
2008 to_follow_dids = follower_dids - following_dids
2009
2010 # Build result object
2011 result = {
2012 'followers_count': len(followers),
2013 'following_count': len(following),
2014 'to_follow': [],
2015 'newly_followed': [],
2016 'errors': []
2017 }
2018
2019 # Get handles for users to follow
2020 to_follow_handles = []
2021 for follower in followers:
2022 if follower.did in to_follow_dids:
2023 handle = follower.handle if hasattr(follower, 'handle') else follower.did
2024 to_follow_handles.append(handle)
2025 result['to_follow'].append(handle)
2026
2027 logger.info(f"Follower sync: {len(followers)} followers, {len(following)} following, {len(to_follow_dids)} to follow")
2028
2029 # If dry run, just return the stats
2030 if dry_run:
2031 logger.info(f"Dry run - would follow: {', '.join(to_follow_handles)}")
2032 return result
2033
2034 # Actually follow the users with rate limiting
2035 import requests
2036 headers = {"Authorization": f"Bearer {access_token}"}
2037 create_record_url = f"{pds_host}/xrpc/com.atproto.repo.createRecord"
2038
2039 for i, did in enumerate(to_follow_dids):
2040 try:
2041 # Rate limiting: wait 2 seconds between follows to avoid spamming the server
2042 if i > 0:
2043 time.sleep(2)
2044
2045 # Create follow record
2046 now = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
2047 follow_record = {
2048 "$type": "app.bsky.graph.follow",
2049 "subject": did,
2050 "createdAt": now
2051 }
2052
2053 create_data = {
2054 "repo": user_did,
2055 "collection": "app.bsky.graph.follow",
2056 "record": follow_record
2057 }
2058
2059 response = requests.post(create_record_url, headers=headers, json=create_data, timeout=10)
2060 response.raise_for_status()
2061
2062 # Find handle for this DID
2063 handle = next((f.handle for f in followers if f.did == did), did)
2064 result['newly_followed'].append(handle)
2065 logger.info(f"Followed: {handle}")
2066
2067 except Exception as e:
2068 error_msg = f"Failed to follow {did}: {e}"
2069 logger.error(error_msg)
2070 result['errors'].append(error_msg)
2071
2072 return result
2073
2074 except Exception as e:
2075 logger.error(f"Error syncing followers: {e}")
2076 return {'error': str(e)}
2077
2078
2079if __name__ == "__main__":
2080 client = default_login()
2081 # do something with the client
2082 logger.info("Client is ready to use!")