a digital person for bluesky

Keep queue items when no post tool is used

Changed process_mention to return False instead of True when no
add_post_to_bluesky_reply_thread tool calls are found. This ensures
that notifications remain in the queue for retry if the agent doesn't
generate a response, rather than being discarded.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+120 -70
+120 -70
bsky.py
··· 11 from datetime import datetime 12 from collections import defaultdict 13 import time 14 15 from utils import ( 16 upsert_block, ··· 80 # Message tracking counters 81 message_counters = defaultdict(int) 82 start_time = time.time() 83 84 def export_agent_state(client, agent): 85 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" ··· 186 return void_agent 187 188 189 - def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None): 190 """Process a mention and generate a reply using the Letta agent. 191 192 Args: ··· 474 logger.debug("Successfully received response from Letta API") 475 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 476 477 - # Extract successful bluesky_reply tool calls from the agent's response 478 reply_candidates = [] 479 tool_call_results = {} # Map tool_call_id to status 480 ··· 483 # First pass: collect tool return statuses 484 for message in message_response.messages: 485 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 486 - if message.name == 'bluesky_reply': 487 tool_call_results[message.tool_call_id] = message.status 488 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 489 490 # Second pass: process messages and check for successful tool calls 491 for i, message in enumerate(message_response.messages, 1): ··· 534 logger.info("=== BOT TERMINATED BY AGENT ===") 535 exit(0) 536 537 - # Collect bluesky_reply tool calls - only if they were successful 538 if hasattr(message, 'tool_call') and message.tool_call: 539 if message.tool_call.name == 'bluesky_reply': 540 tool_call_id = message.tool_call.tool_call_id 541 tool_status = tool_call_results.get(tool_call_id, 'unknown') 542 543 if tool_status == 'success': 544 try: 545 args = json.loads(message.tool_call.arguments) 546 - # Handle both old format (message) and new format (messages) 547 - reply_messages = args.get('messages', []) 548 - if not reply_messages: 549 - # Fallback to old format for backward compatibility 550 - old_message = args.get('message', '') 551 - if old_message: 552 - reply_messages = [old_message] 553 554 - reply_lang = args.get('lang', 'en-US') 555 - if reply_messages: # Only add if there's actual content 556 - reply_candidates.append((reply_messages, reply_lang)) 557 - if len(reply_messages) == 1: 558 - logger.info(f"Found successful bluesky_reply candidate: {reply_messages[0][:50]}... (lang: {reply_lang})") 559 - else: 560 - logger.info(f"Found successful bluesky_reply thread candidate with {len(reply_messages)} messages (lang: {reply_lang})") 561 except json.JSONDecodeError as e: 562 logger.error(f"Failed to parse tool call arguments: {e}") 563 elif tool_status == 'error': 564 - logger.info(f"⚠️ Skipping failed bluesky_reply tool call (status: error)") 565 else: 566 - logger.warning(f"⚠️ Skipping bluesky_reply tool call with unknown status: {tool_status}") 567 568 if reply_candidates: 569 - logger.info(f"Found {len(reply_candidates)} successful bluesky_reply candidates, using only the first one to avoid duplicates") 570 571 - # Only use the first successful reply to avoid sending multiple responses 572 - reply_messages, reply_lang = reply_candidates[0] 573 574 # Print the generated reply for testing 575 - print(f"\n=== GENERATED REPLY (FIRST SUCCESSFUL) ===") 576 print(f"To: @{author_handle}") 577 if len(reply_messages) == 1: 578 print(f"Reply: {reply_messages[0]}") ··· 581 for j, msg in enumerate(reply_messages, 1): 582 print(f" {j}. {msg}") 583 print(f"Language: {reply_lang}") 584 - if len(reply_candidates) > 1: 585 - print(f"Note: Skipped {len(reply_candidates) - 1} additional successful candidates to avoid duplicates") 586 print(f"======================\n") 587 588 - # Send the reply(s) with language 589 - if len(reply_messages) == 1: 590 - # Single reply - use existing function 591 - cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 592 - logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 593 - response = bsky_utils.reply_to_notification( 594 - client=atproto_client, 595 - notification=notification_data, 596 - reply_text=cleaned_text, 597 - lang=reply_lang 598 - ) 599 else: 600 - # Multiple replies - use new threaded function 601 - cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 602 - logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 603 - response = bsky_utils.reply_with_thread_to_notification( 604 - client=atproto_client, 605 - notification=notification_data, 606 - reply_messages=cleaned_messages, 607 - lang=reply_lang 608 - ) 609 610 if response: 611 logger.info(f"Successfully replied to @{author_handle}") ··· 614 logger.error(f"Failed to send reply to @{author_handle}") 615 return False 616 else: 617 - logger.warning(f"No bluesky_reply tool calls found for mention from @{author_handle}, removing notification from queue") 618 - return True 619 620 except Exception as e: 621 logger.error(f"Error processing mention: {e}") ··· 702 filename = f"{priority_prefix}{timestamp}_{notification.reason}_{notif_hash}.json" 703 filepath = QUEUE_DIR / filename 704 705 - # Skip if already exists (duplicate) 706 - if filepath.exists(): 707 - logger.debug(f"Notification already queued: {filename}") 708 - return False 709 710 # Write to file 711 with open(filepath, 'w') as f: ··· 720 return False 721 722 723 - def load_and_process_queued_notifications(void_agent, atproto_client): 724 """Load and process all notifications from the queue in priority order.""" 725 try: 726 # Get all JSON files in queue directory (excluding processed_notifications.json) ··· 749 # Process based on type using dict data directly 750 success = False 751 if notif_data['reason'] == "mention": 752 - success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath) 753 if success: 754 message_counters['mentions'] += 1 755 elif notif_data['reason'] == "reply": 756 - success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath) 757 if success: 758 message_counters['replies'] += 1 759 elif notif_data['reason'] == "follow": ··· 779 780 # Handle file based on processing result 781 if success: 782 - filepath.unlink() 783 - logger.info(f"✅ Successfully processed and removed: {filepath.name}") 784 - 785 - # Mark as processed to avoid reprocessing 786 - processed_uris = load_processed_notifications() 787 - processed_uris.add(notif_data['uri']) 788 - save_processed_notifications(processed_uris) 789 790 elif success is None: # Special case for moving to error directory 791 error_path = QUEUE_ERROR_DIR / filepath.name ··· 808 logger.error(f"Error loading queued notifications: {e}") 809 810 811 - def process_notifications(void_agent, atproto_client): 812 """Fetch new notifications, queue them, and process the queue.""" 813 try: 814 # Get current time for marking notifications as seen ··· 882 if save_notification_to_queue(notification): 883 new_count += 1 884 885 - # Mark all notifications as seen immediately after queuing 886 - if new_count > 0: 887 - atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at}) 888 - logger.info(f"Queued {new_count} new notifications and marked as seen") 889 else: 890 - logger.debug("No new notifications to queue") 891 892 # Now process the entire queue (old + new notifications) 893 - load_and_process_queued_notifications(void_agent, atproto_client) 894 895 except Exception as e: 896 logger.error(f"Error processing notifications: {e}") 897 898 899 def main(): 900 """Main bot loop that continuously monitors for notifications.""" 901 global start_time 902 start_time = time.time() ··· 925 while True: 926 try: 927 cycle_count += 1 928 - process_notifications(void_agent, atproto_client) 929 # Log cycle completion with stats 930 elapsed_time = time.time() - start_time 931 total_messages = sum(message_counters.values())
··· 11 from datetime import datetime 12 from collections import defaultdict 13 import time 14 + import argparse 15 16 from utils import ( 17 upsert_block, ··· 81 # Message tracking counters 82 message_counters = defaultdict(int) 83 start_time = time.time() 84 + 85 + # Testing mode flag 86 + TESTING_MODE = False 87 88 def export_agent_state(client, agent): 89 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" ··· 190 return void_agent 191 192 193 + def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 194 """Process a mention and generate a reply using the Letta agent. 195 196 Args: ··· 478 logger.debug("Successfully received response from Letta API") 479 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 480 481 + # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 482 reply_candidates = [] 483 tool_call_results = {} # Map tool_call_id to status 484 ··· 487 # First pass: collect tool return statuses 488 for message in message_response.messages: 489 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 490 + if message.name == 'add_post_to_bluesky_reply_thread': 491 tool_call_results[message.tool_call_id] = message.status 492 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 493 + elif message.name == 'bluesky_reply': 494 + logger.error("❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 495 + logger.error("Please use add_post_to_bluesky_reply_thread instead.") 496 + logger.error("Update the agent's tools using register_tools.py") 497 + # Export agent state before terminating 498 + export_agent_state(CLIENT, void_agent) 499 + logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 500 + exit(1) 501 502 # Second pass: process messages and check for successful tool calls 503 for i, message in enumerate(message_response.messages, 1): ··· 546 logger.info("=== BOT TERMINATED BY AGENT ===") 547 exit(0) 548 549 + # Check for deprecated bluesky_reply tool 550 if hasattr(message, 'tool_call') and message.tool_call: 551 if message.tool_call.name == 'bluesky_reply': 552 + logger.error("❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 553 + logger.error("Please use add_post_to_bluesky_reply_thread instead.") 554 + logger.error("Update the agent's tools using register_tools.py") 555 + # Export agent state before terminating 556 + export_agent_state(CLIENT, void_agent) 557 + logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 558 + exit(1) 559 + 560 + # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 561 + elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 562 tool_call_id = message.tool_call.tool_call_id 563 tool_status = tool_call_results.get(tool_call_id, 'unknown') 564 565 if tool_status == 'success': 566 try: 567 args = json.loads(message.tool_call.arguments) 568 + reply_text = args.get('text', '') 569 + reply_lang = args.get('lang', 'en-US') 570 571 + if reply_text: # Only add if there's actual content 572 + reply_candidates.append((reply_text, reply_lang)) 573 + logger.info(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 574 except json.JSONDecodeError as e: 575 logger.error(f"Failed to parse tool call arguments: {e}") 576 elif tool_status == 'error': 577 + logger.info(f"⚠️ Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 578 else: 579 + logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 580 581 if reply_candidates: 582 + # Aggregate reply posts into a thread 583 + reply_messages = [] 584 + reply_langs = [] 585 + for text, lang in reply_candidates: 586 + reply_messages.append(text) 587 + reply_langs.append(lang) 588 589 + # Use the first language for the entire thread (could be enhanced later) 590 + reply_lang = reply_langs[0] if reply_langs else 'en-US' 591 + 592 + logger.info(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 593 594 # Print the generated reply for testing 595 + print(f"\n=== GENERATED REPLY THREAD ===") 596 print(f"To: @{author_handle}") 597 if len(reply_messages) == 1: 598 print(f"Reply: {reply_messages[0]}") ··· 601 for j, msg in enumerate(reply_messages, 1): 602 print(f" {j}. {msg}") 603 print(f"Language: {reply_lang}") 604 print(f"======================\n") 605 606 + # Send the reply(s) with language (unless in testing mode) 607 + if testing_mode: 608 + logger.info("🧪 TESTING MODE: Skipping actual Bluesky post") 609 + response = True # Simulate success 610 else: 611 + if len(reply_messages) == 1: 612 + # Single reply - use existing function 613 + cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 614 + logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 615 + response = bsky_utils.reply_to_notification( 616 + client=atproto_client, 617 + notification=notification_data, 618 + reply_text=cleaned_text, 619 + lang=reply_lang 620 + ) 621 + else: 622 + # Multiple replies - use new threaded function 623 + cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 624 + logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 625 + response = bsky_utils.reply_with_thread_to_notification( 626 + client=atproto_client, 627 + notification=notification_data, 628 + reply_messages=cleaned_messages, 629 + lang=reply_lang 630 + ) 631 632 if response: 633 logger.info(f"Successfully replied to @{author_handle}") ··· 636 logger.error(f"Failed to send reply to @{author_handle}") 637 return False 638 else: 639 + logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, keeping notification in queue") 640 + return False 641 642 except Exception as e: 643 logger.error(f"Error processing mention: {e}") ··· 724 filename = f"{priority_prefix}{timestamp}_{notification.reason}_{notif_hash}.json" 725 filepath = QUEUE_DIR / filename 726 727 + # Check if this notification URI is already in the queue 728 + for existing_file in QUEUE_DIR.glob("*.json"): 729 + if existing_file.name == "processed_notifications.json": 730 + continue 731 + try: 732 + with open(existing_file, 'r') as f: 733 + existing_data = json.load(f) 734 + if existing_data.get('uri') == notification.uri: 735 + logger.debug(f"Notification already queued (URI: {notification.uri})") 736 + return False 737 + except: 738 + continue 739 740 # Write to file 741 with open(filepath, 'w') as f: ··· 750 return False 751 752 753 + def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 754 """Load and process all notifications from the queue in priority order.""" 755 try: 756 # Get all JSON files in queue directory (excluding processed_notifications.json) ··· 779 # Process based on type using dict data directly 780 success = False 781 if notif_data['reason'] == "mention": 782 + success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 783 if success: 784 message_counters['mentions'] += 1 785 elif notif_data['reason'] == "reply": 786 + success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 787 if success: 788 message_counters['replies'] += 1 789 elif notif_data['reason'] == "follow": ··· 809 810 # Handle file based on processing result 811 if success: 812 + if testing_mode: 813 + logger.info(f"🧪 TESTING MODE: Keeping queue file: {filepath.name}") 814 + else: 815 + filepath.unlink() 816 + logger.info(f"✅ Successfully processed and removed: {filepath.name}") 817 + 818 + # Mark as processed to avoid reprocessing 819 + processed_uris = load_processed_notifications() 820 + processed_uris.add(notif_data['uri']) 821 + save_processed_notifications(processed_uris) 822 823 elif success is None: # Special case for moving to error directory 824 error_path = QUEUE_ERROR_DIR / filepath.name ··· 841 logger.error(f"Error loading queued notifications: {e}") 842 843 844 + def process_notifications(void_agent, atproto_client, testing_mode=False): 845 """Fetch new notifications, queue them, and process the queue.""" 846 try: 847 # Get current time for marking notifications as seen ··· 915 if save_notification_to_queue(notification): 916 new_count += 1 917 918 + # Mark all notifications as seen immediately after queuing (unless in testing mode) 919 + if testing_mode: 920 + logger.info("🧪 TESTING MODE: Skipping marking notifications as seen") 921 else: 922 + if new_count > 0: 923 + atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at}) 924 + logger.info(f"Queued {new_count} new notifications and marked as seen") 925 + else: 926 + logger.debug("No new notifications to queue") 927 928 # Now process the entire queue (old + new notifications) 929 + load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 930 931 except Exception as e: 932 logger.error(f"Error processing notifications: {e}") 933 934 935 def main(): 936 + # Parse command line arguments 937 + parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 938 + parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 939 + args = parser.parse_args() 940 + 941 + global TESTING_MODE 942 + TESTING_MODE = args.test 943 + 944 + if TESTING_MODE: 945 + logger.info("🧪 === RUNNING IN TESTING MODE ===") 946 + logger.info(" - No messages will be sent to Bluesky") 947 + logger.info(" - Queue files will not be deleted") 948 + logger.info(" - Notifications will not be marked as seen") 949 + print("\n") 950 """Main bot loop that continuously monitors for notifications.""" 951 global start_time 952 start_time = time.time() ··· 975 while True: 976 try: 977 cycle_count += 1 978 + process_notifications(void_agent, atproto_client, TESTING_MODE) 979 # Log cycle completion with stats 980 elapsed_time = time.time() - start_time 981 total_messages = sum(message_counters.values())