a digital person for bluesky
1# Rich imports removed - using simple text formatting 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14import argparse 15 16from utils import ( 17 upsert_block, 18 upsert_agent 19) 20 21import bsky_utils 22from tools.blocks import attach_user_blocks, detach_user_blocks 23from datetime import date 24 25def extract_handles_from_data(data): 26 """Recursively extract all unique handles from nested data structure.""" 27 handles = set() 28 29 def _extract_recursive(obj): 30 if isinstance(obj, dict): 31 # Check if this dict has a 'handle' key 32 if 'handle' in obj: 33 handles.add(obj['handle']) 34 # Recursively check all values 35 for value in obj.values(): 36 _extract_recursive(value) 37 elif isinstance(obj, list): 38 # Recursively check all list items 39 for item in obj: 40 _extract_recursive(item) 41 42 _extract_recursive(data) 43 return list(handles) 44 45# Logging will be configured after argument parsing 46logger = None 47prompt_logger = None 48# Simple text formatting (Rich no longer used) 49SHOW_REASONING = False 50last_archival_query = "archival memory search" 51 52def log_with_panel(message, title=None, border_color="white"): 53 """Log a message with Unicode box-drawing characters""" 54 if title: 55 # Map old color names to appropriate symbols 56 symbol_map = { 57 "blue": "", # Tool calls 58 "green": "", # Success/completion 59 "yellow": "", # Reasoning 60 "red": "", # Errors 61 "white": "", # Default/mentions 62 "cyan": "", # Posts 63 } 64 symbol = symbol_map.get(border_color, "") 65 66 print(f"\n{symbol} {title}") 67 print(f" {'' * len(title)}") 68 # Indent message lines 69 for line in message.split('\n'): 70 print(f" {line}") 71 else: 72 print(message) 73 74 75# Create a client with extended timeout for LLM operations 76CLIENT= Letta( 77 token=os.environ["LETTA_API_KEY"], 78 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout 79) 80 81# Use the "Bluesky" project 82PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 83 84# Notification check delay 85FETCH_NOTIFICATIONS_DELAY_SEC = 30 86 87# Check for new notifications every N queue items 88CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 5 89 90# Queue directory 91QUEUE_DIR = Path("queue") 92QUEUE_DIR.mkdir(exist_ok=True) 93QUEUE_ERROR_DIR = Path("queue/errors") 94QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 95QUEUE_NO_REPLY_DIR = Path("queue/no_reply") 96QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 97PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json") 98 99# Maximum number of processed notifications to track 100MAX_PROCESSED_NOTIFICATIONS = 10000 101 102# Message tracking counters 103message_counters = defaultdict(int) 104start_time = time.time() 105 106# Testing mode flag 107TESTING_MODE = False 108 109# Skip git operations flag 110SKIP_GIT = False 111 112# Synthesis message tracking 113last_synthesis_time = time.time() 114 115def export_agent_state(client, agent, skip_git=False): 116 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 117 try: 118 # Confirm export with user unless git is being skipped 119 if not skip_git: 120 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 121 if response not in ['y', 'yes']: 122 logger.info("Agent export cancelled by user.") 123 return 124 else: 125 logger.info("Exporting agent state (git staging disabled)") 126 127 # Create directories if they don't exist 128 os.makedirs("agent_archive", exist_ok=True) 129 os.makedirs("agents", exist_ok=True) 130 131 # Export agent data 132 logger.info(f"Exporting agent {agent.id}. This takes some time...") 133 agent_data = client.agents.export_file(agent_id=agent.id) 134 135 # Save timestamped archive copy 136 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 137 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 138 with open(archive_file, 'w', encoding='utf-8') as f: 139 json.dump(agent_data, f, indent=2, ensure_ascii=False) 140 141 # Save current agent state 142 current_file = os.path.join("agents", "void.af") 143 with open(current_file, 'w', encoding='utf-8') as f: 144 json.dump(agent_data, f, indent=2, ensure_ascii=False) 145 146 logger.info(f"Agent exported to {archive_file} and {current_file}") 147 148 # Git add only the current agent file (archive is ignored) unless skip_git is True 149 if not skip_git: 150 try: 151 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 152 logger.info("Added current agent file to git staging") 153 except subprocess.CalledProcessError as e: 154 logger.warning(f"Failed to git add agent file: {e}") 155 156 except Exception as e: 157 logger.error(f"Failed to export agent: {e}") 158 159def initialize_void(): 160 logger.info("Starting void agent initialization...") 161 162 # Get the configured void agent by ID 163 logger.info("Loading void agent from config...") 164 from config_loader import get_letta_config 165 letta_config = get_letta_config() 166 agent_id = letta_config['agent_id'] 167 168 try: 169 void_agent = CLIENT.agents.retrieve(agent_id=agent_id) 170 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})") 171 except Exception as e: 172 logger.error(f"Failed to load void agent {agent_id}: {e}") 173 logger.error("Please ensure the agent_id in config.yaml is correct") 174 raise e 175 176 # Export agent state 177 logger.info("Exporting agent state...") 178 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 179 180 # Log agent details 181 logger.info(f"Void agent details - ID: {void_agent.id}") 182 logger.info(f"Agent name: {void_agent.name}") 183 if hasattr(void_agent, 'llm_config'): 184 logger.info(f"Agent model: {void_agent.llm_config.model}") 185 logger.info(f"Agent project_id: {void_agent.project_id}") 186 if hasattr(void_agent, 'tools'): 187 logger.info(f"Agent has {len(void_agent.tools)} tools") 188 for tool in void_agent.tools[:3]: # Show first 3 tools 189 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 190 191 return void_agent 192 193 194def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 195 """Process a mention and generate a reply using the Letta agent. 196 197 Args: 198 void_agent: The Letta agent instance 199 atproto_client: The AT Protocol client 200 notification_data: The notification data dictionary 201 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 202 203 Returns: 204 True: Successfully processed, remove from queue 205 False: Failed but retryable, keep in queue 206 None: Failed with non-retryable error, move to errors directory 207 "no_reply": No reply was generated, move to no_reply directory 208 """ 209 try: 210 logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}") 211 212 # Handle both dict and object inputs for backwards compatibility 213 if isinstance(notification_data, dict): 214 uri = notification_data['uri'] 215 mention_text = notification_data.get('record', {}).get('text', '') 216 author_handle = notification_data['author']['handle'] 217 author_name = notification_data['author'].get('display_name') or author_handle 218 else: 219 # Legacy object access 220 uri = notification_data.uri 221 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 222 author_handle = notification_data.author.handle 223 author_name = notification_data.author.display_name or author_handle 224 225 logger.debug(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...") 226 227 # Retrieve the entire thread associated with the mention 228 try: 229 thread = atproto_client.app.bsky.feed.get_post_thread({ 230 'uri': uri, 231 'parent_height': 40, 232 'depth': 10 233 }) 234 except Exception as e: 235 error_str = str(e) 236 # Check if this is a NotFound error 237 if 'NotFound' in error_str or 'Post not found' in error_str: 238 logger.warning(f"Post not found for URI {uri}, removing from queue") 239 return True # Return True to remove from queue 240 else: 241 # Re-raise other errors 242 logger.error(f"Error fetching thread: {e}") 243 raise 244 245 # Get thread context as YAML string 246 logger.debug("Converting thread to YAML string") 247 try: 248 thread_context = thread_to_yaml_string(thread) 249 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 250 251 # Check if #voidstop appears anywhere in the thread 252 if "#voidstop" in thread_context.lower(): 253 logger.info("Found #voidstop in thread context, skipping this mention") 254 return True # Return True to remove from queue 255 256 # Also check the mention text directly 257 if "#voidstop" in mention_text.lower(): 258 logger.info("Found #voidstop in mention text, skipping this mention") 259 return True # Return True to remove from queue 260 261 # Create a more informative preview by extracting meaningful content 262 lines = thread_context.split('\n') 263 meaningful_lines = [] 264 265 for line in lines: 266 stripped = line.strip() 267 if not stripped: 268 continue 269 270 # Look for lines with actual content (not just structure) 271 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 272 meaningful_lines.append(line) 273 if len(meaningful_lines) >= 5: 274 break 275 276 if meaningful_lines: 277 preview = '\n'.join(meaningful_lines) 278 logger.debug(f"Thread content preview:\n{preview}") 279 else: 280 # If no content fields found, just show it's a thread structure 281 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 282 except Exception as yaml_error: 283 import traceback 284 logger.error(f"Error converting thread to YAML: {yaml_error}") 285 logger.error(f"Full traceback:\n{traceback.format_exc()}") 286 logger.error(f"Thread type: {type(thread)}") 287 if hasattr(thread, '__dict__'): 288 logger.error(f"Thread attributes: {thread.__dict__}") 289 # Try to continue with a simple context 290 thread_context = f"Error processing thread context: {str(yaml_error)}" 291 292 # Create a prompt for the Letta agent with thread context 293 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 294 295MOST RECENT POST (the mention you're responding to): 296"{mention_text}" 297 298FULL THREAD CONTEXT: 299```yaml 300{thread_context} 301``` 302 303The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 304 305To reply, use the add_post_to_bluesky_reply_thread tool: 306- Each call creates one post (max 300 characters) 307- For most responses, a single call is sufficient 308- Only use multiple calls for threaded replies when: 309 * The topic requires extended explanation that cannot fit in 300 characters 310 * You're explicitly asked for a detailed/long response 311 * The conversation naturally benefits from a structured multi-part answer 312- Avoid unnecessary threads - be concise when possible""" 313 314 # Extract all handles from notification and thread data 315 all_handles = set() 316 all_handles.update(extract_handles_from_data(notification_data)) 317 all_handles.update(extract_handles_from_data(thread.model_dump())) 318 unique_handles = list(all_handles) 319 320 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 321 322 # Check if any handles are in known_bots list 323 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs 324 import json 325 326 try: 327 # Check for known bots in thread 328 bot_check_result = check_known_bots(unique_handles, void_agent) 329 bot_check_data = json.loads(bot_check_result) 330 331 if bot_check_data.get("bot_detected", False): 332 detected_bots = bot_check_data.get("detected_bots", []) 333 logger.info(f"Bot detected in thread: {detected_bots}") 334 335 # Decide whether to respond (10% chance) 336 if not should_respond_to_bot_thread(): 337 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}") 338 # Return False to keep in queue for potential later processing 339 return False 340 else: 341 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}") 342 else: 343 logger.debug("No known bots detected in thread") 344 345 except Exception as bot_check_error: 346 logger.warning(f"Error checking for bots: {bot_check_error}") 347 # Continue processing if bot check fails 348 349 # Attach user blocks before agent call 350 attached_handles = [] 351 if unique_handles: 352 try: 353 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 354 attach_result = attach_user_blocks(unique_handles, void_agent) 355 attached_handles = unique_handles # Track successfully attached handles 356 logger.debug(f"Attach result: {attach_result}") 357 except Exception as attach_error: 358 logger.warning(f"Failed to attach user blocks: {attach_error}") 359 # Continue without user blocks rather than failing completely 360 361 # Get response from Letta agent 362 # Format with Unicode characters 363 title = f"MENTION FROM @{author_handle}" 364 print(f"\n{title}") 365 print(f" {'' * len(title)}") 366 # Indent the mention text 367 for line in mention_text.split('\n'): 368 print(f" {line}") 369 370 # Log prompt details to separate logger 371 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 372 373 # Log concise prompt info to main logger 374 thread_handles_count = len(unique_handles) 375 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users") 376 377 try: 378 # Use streaming to avoid 524 timeout errors 379 message_stream = CLIENT.agents.messages.create_stream( 380 agent_id=void_agent.id, 381 messages=[{"role": "user", "content": prompt}], 382 stream_tokens=False, # Step streaming only (faster than token streaming) 383 max_steps=100 384 ) 385 386 # Collect the streaming response 387 all_messages = [] 388 for chunk in message_stream: 389 # Log condensed chunk info 390 if hasattr(chunk, 'message_type'): 391 if chunk.message_type == 'reasoning_message': 392 # Show full reasoning without truncation 393 if SHOW_REASONING: 394 # Format with Unicode characters 395 print("\n◆ Reasoning") 396 print(" ─────────") 397 # Indent reasoning lines 398 for line in chunk.reasoning.split('\n'): 399 print(f" {line}") 400 else: 401 # Default log format (only when --reasoning is used due to log level) 402 # Format with Unicode characters 403 print("\n◆ Reasoning") 404 print(" ─────────") 405 # Indent reasoning lines 406 for line in chunk.reasoning.split('\n'): 407 print(f" {line}") 408 elif chunk.message_type == 'tool_call_message': 409 # Parse tool arguments for better display 410 tool_name = chunk.tool_call.name 411 try: 412 args = json.loads(chunk.tool_call.arguments) 413 # Format based on tool type 414 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: 415 # Extract the text being posted 416 text = args.get('text', '') 417 if text: 418 # Format with Unicode characters 419 print("\n✎ Bluesky Post") 420 print(" ────────────") 421 # Indent post text 422 for line in text.split('\n'): 423 print(f" {line}") 424 else: 425 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 426 elif tool_name == 'archival_memory_search': 427 query = args.get('query', 'unknown') 428 global last_archival_query 429 last_archival_query = query 430 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 431 elif tool_name == 'archival_memory_insert': 432 content = args.get('content', '') 433 # Show the full content being inserted 434 log_with_panel(content, f"Tool call: {tool_name}", "blue") 435 elif tool_name == 'update_block': 436 label = args.get('label', 'unknown') 437 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 438 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 439 else: 440 # Generic display for other tools 441 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 442 if len(args_str) > 150: 443 args_str = args_str[:150] + "..." 444 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 445 except: 446 # Fallback to original format if parsing fails 447 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 448 elif chunk.message_type == 'tool_return_message': 449 # Enhanced tool result logging 450 tool_name = chunk.name 451 status = chunk.status 452 453 if status == 'success': 454 # Try to show meaningful result info based on tool type 455 if hasattr(chunk, 'tool_return') and chunk.tool_return: 456 result_str = str(chunk.tool_return) 457 if tool_name == 'archival_memory_search': 458 459 try: 460 # Handle both string and list formats 461 if isinstance(chunk.tool_return, str): 462 # The string format is: "([{...}, {...}], count)" 463 # We need to extract just the list part 464 if chunk.tool_return.strip(): 465 # Find the list part between the first [ and last ] 466 start_idx = chunk.tool_return.find('[') 467 end_idx = chunk.tool_return.rfind(']') 468 if start_idx != -1 and end_idx != -1: 469 list_str = chunk.tool_return[start_idx:end_idx+1] 470 # Use ast.literal_eval since this is Python literal syntax, not JSON 471 import ast 472 results = ast.literal_eval(list_str) 473 else: 474 logger.warning("Could not find list in archival_memory_search result") 475 results = [] 476 else: 477 logger.warning("Empty string returned from archival_memory_search") 478 results = [] 479 else: 480 # If it's already a list, use directly 481 results = chunk.tool_return 482 483 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name}", "green") 484 485 # Use the captured search query from the tool call 486 search_query = last_archival_query 487 488 # Combine all results into a single text block 489 content_text = "" 490 for i, entry in enumerate(results, 1): 491 timestamp = entry.get('timestamp', 'N/A') 492 content = entry.get('content', '') 493 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" 494 495 # Format with Unicode characters 496 title = f"{search_query} ({len(results)} results)" 497 print(f"\n{title}") 498 print(f" {'' * len(title)}") 499 # Indent content text 500 for line in content_text.strip().split('\n'): 501 print(f" {line}") 502 503 except Exception as e: 504 logger.error(f"Error formatting archival memory results: {e}") 505 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name}", "green") 506 elif tool_name == 'add_post_to_bluesky_reply_thread': 507 # Just show success for bluesky posts, the text was already shown in tool call 508 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") 509 elif tool_name == 'archival_memory_insert': 510 # Skip archival memory insert results (always returns None) 511 pass 512 elif tool_name == 'update_block': 513 log_with_panel("Memory block updated", f"Tool result: {tool_name}", "green") 514 else: 515 # Generic success with preview 516 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 517 log_with_panel(preview, f"Tool result: {tool_name}", "green") 518 else: 519 log_with_panel("Success", f"Tool result: {tool_name}", "green") 520 elif status == 'error': 521 # Show error details 522 if tool_name == 'add_post_to_bluesky_reply_thread': 523 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" 524 log_with_panel(error_str, f"Bluesky Post ✗", "red") 525 elif tool_name == 'archival_memory_insert': 526 # Skip archival memory insert errors too 527 pass 528 else: 529 error_preview = "" 530 if hasattr(chunk, 'tool_return') and chunk.tool_return: 531 error_str = str(chunk.tool_return) 532 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 533 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name}", "red") 534 else: 535 log_with_panel("Error occurred", f"Tool result: {tool_name}", "red") 536 else: 537 logger.info(f"Tool result: {tool_name} - {status}") 538 elif chunk.message_type == 'assistant_message': 539 # Format with Unicode characters 540 print("\n▶ Assistant Response") 541 print(" ──────────────────") 542 # Indent response text 543 for line in chunk.content.split('\n'): 544 print(f" {line}") 545 else: 546 # Filter out verbose message types 547 if chunk.message_type not in ['usage_statistics', 'stop_reason']: 548 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") 549 else: 550 logger.info(f"📦 Stream status: {chunk}") 551 552 # Log full chunk for debugging 553 logger.debug(f"Full streaming chunk: {chunk}") 554 all_messages.append(chunk) 555 if str(chunk) == 'done': 556 break 557 558 # Convert streaming response to standard format for compatibility 559 message_response = type('StreamingResponse', (), { 560 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 561 })() 562 except Exception as api_error: 563 import traceback 564 error_str = str(api_error) 565 logger.error(f"Letta API error: {api_error}") 566 logger.error(f"Error type: {type(api_error).__name__}") 567 logger.error(f"Full traceback:\n{traceback.format_exc()}") 568 logger.error(f"Mention text was: {mention_text}") 569 logger.error(f"Author: @{author_handle}") 570 logger.error(f"URI: {uri}") 571 572 573 # Try to extract more info from different error types 574 if hasattr(api_error, 'response'): 575 logger.error(f"Error response object exists") 576 if hasattr(api_error.response, 'text'): 577 logger.error(f"Response text: {api_error.response.text}") 578 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 579 try: 580 logger.error(f"Response JSON: {api_error.response.json()}") 581 except: 582 pass 583 584 # Check for specific error types 585 if hasattr(api_error, 'status_code'): 586 logger.error(f"API Status code: {api_error.status_code}") 587 if hasattr(api_error, 'body'): 588 logger.error(f"API Response body: {api_error.body}") 589 if hasattr(api_error, 'headers'): 590 logger.error(f"API Response headers: {api_error.headers}") 591 592 if api_error.status_code == 413: 593 logger.error("413 Payload Too Large - moving to errors directory") 594 return None # Move to errors directory - payload is too large to ever succeed 595 elif api_error.status_code == 524: 596 logger.error("524 error - timeout from Cloudflare, will retry later") 597 return False # Keep in queue for retry 598 599 # Check if error indicates we should remove from queue 600 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 601 logger.warning("Payload too large error, moving to errors directory") 602 return None # Move to errors directory - cannot be fixed by retry 603 elif 'status_code: 524' in error_str: 604 logger.warning("524 timeout error, keeping in queue for retry") 605 return False # Keep in queue for retry 606 607 raise 608 609 # Log successful response 610 logger.debug("Successfully received response from Letta API") 611 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 612 613 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 614 reply_candidates = [] 615 tool_call_results = {} # Map tool_call_id to status 616 ack_note = None # Track any note from annotate_ack tool 617 618 logger.debug(f"Processing {len(message_response.messages)} response messages...") 619 620 # First pass: collect tool return statuses 621 ignored_notification = False 622 ignore_reason = "" 623 ignore_category = "" 624 625 for message in message_response.messages: 626 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 627 if message.name == 'add_post_to_bluesky_reply_thread': 628 tool_call_results[message.tool_call_id] = message.status 629 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 630 elif message.name == 'ignore_notification': 631 # Check if the tool was successful 632 if hasattr(message, 'tool_return') and message.status == 'success': 633 # Parse the return value to extract category and reason 634 result_str = str(message.tool_return) 635 if 'IGNORED_NOTIFICATION::' in result_str: 636 parts = result_str.split('::') 637 if len(parts) >= 3: 638 ignore_category = parts[1] 639 ignore_reason = parts[2] 640 ignored_notification = True 641 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 642 elif message.name == 'bluesky_reply': 643 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 644 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 645 logger.error("Update the agent's tools using register_tools.py") 646 # Export agent state before terminating 647 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 648 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 649 exit(1) 650 651 # Second pass: process messages and check for successful tool calls 652 for i, message in enumerate(message_response.messages, 1): 653 # Log concise message info instead of full object 654 msg_type = getattr(message, 'message_type', 'unknown') 655 if hasattr(message, 'reasoning') and message.reasoning: 656 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 657 elif hasattr(message, 'tool_call') and message.tool_call: 658 tool_name = message.tool_call.name 659 logger.debug(f" {i}. {msg_type}: {tool_name}") 660 elif hasattr(message, 'tool_return'): 661 tool_name = getattr(message, 'name', 'unknown_tool') 662 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 663 status = getattr(message, 'status', 'unknown') 664 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 665 elif hasattr(message, 'text'): 666 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 667 else: 668 logger.debug(f" {i}. {msg_type}: <no content>") 669 670 # Check for halt_activity tool call 671 if hasattr(message, 'tool_call') and message.tool_call: 672 if message.tool_call.name == 'halt_activity': 673 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 674 try: 675 args = json.loads(message.tool_call.arguments) 676 reason = args.get('reason', 'Agent requested halt') 677 logger.info(f"Halt reason: {reason}") 678 except: 679 logger.info("Halt reason: <unable to parse>") 680 681 # Delete the queue file before terminating 682 if queue_filepath and queue_filepath.exists(): 683 queue_filepath.unlink() 684 logger.info(f"Deleted queue file: {queue_filepath.name}") 685 686 # Also mark as processed to avoid reprocessing 687 processed_uris = load_processed_notifications() 688 processed_uris.add(notification_data.get('uri', '')) 689 save_processed_notifications(processed_uris) 690 691 # Export agent state before terminating 692 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 693 694 # Exit the program 695 logger.info("=== BOT TERMINATED BY AGENT ===") 696 exit(0) 697 698 # Check for deprecated bluesky_reply tool 699 if hasattr(message, 'tool_call') and message.tool_call: 700 if message.tool_call.name == 'bluesky_reply': 701 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 702 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 703 logger.error("Update the agent's tools using register_tools.py") 704 # Export agent state before terminating 705 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 706 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 707 exit(1) 708 709 # Collect annotate_ack tool calls 710 elif message.tool_call.name == 'annotate_ack': 711 try: 712 args = json.loads(message.tool_call.arguments) 713 note = args.get('note', '') 714 if note: 715 ack_note = note 716 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 717 except json.JSONDecodeError as e: 718 logger.error(f"Failed to parse annotate_ack arguments: {e}") 719 720 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 721 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 722 tool_call_id = message.tool_call.tool_call_id 723 tool_status = tool_call_results.get(tool_call_id, 'unknown') 724 725 if tool_status == 'success': 726 try: 727 args = json.loads(message.tool_call.arguments) 728 reply_text = args.get('text', '') 729 reply_lang = args.get('lang', 'en-US') 730 731 if reply_text: # Only add if there's actual content 732 reply_candidates.append((reply_text, reply_lang)) 733 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 734 except json.JSONDecodeError as e: 735 logger.error(f"Failed to parse tool call arguments: {e}") 736 elif tool_status == 'error': 737 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 738 else: 739 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 740 741 # Check for conflicting tool calls 742 if reply_candidates and ignored_notification: 743 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 744 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 745 logger.warning("Item will be left in queue for manual review") 746 # Return False to keep in queue 747 return False 748 749 if reply_candidates: 750 # Aggregate reply posts into a thread 751 reply_messages = [] 752 reply_langs = [] 753 for text, lang in reply_candidates: 754 reply_messages.append(text) 755 reply_langs.append(lang) 756 757 # Use the first language for the entire thread (could be enhanced later) 758 reply_lang = reply_langs[0] if reply_langs else 'en-US' 759 760 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 761 762 # Display the generated reply thread 763 if len(reply_messages) == 1: 764 content = reply_messages[0] 765 title = f"Reply to @{author_handle}" 766 else: 767 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) 768 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" 769 770 # Format with Unicode characters 771 print(f"\n{title}") 772 print(f" {'' * len(title)}") 773 # Indent content lines 774 for line in content.split('\n'): 775 print(f" {line}") 776 777 # Send the reply(s) with language (unless in testing mode) 778 if testing_mode: 779 logger.info("TESTING MODE: Skipping actual Bluesky post") 780 response = True # Simulate success 781 else: 782 if len(reply_messages) == 1: 783 # Single reply - use existing function 784 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 785 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 786 response = bsky_utils.reply_to_notification( 787 client=atproto_client, 788 notification=notification_data, 789 reply_text=cleaned_text, 790 lang=reply_lang 791 ) 792 else: 793 # Multiple replies - use new threaded function 794 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 795 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 796 response = bsky_utils.reply_with_thread_to_notification( 797 client=atproto_client, 798 notification=notification_data, 799 reply_messages=cleaned_messages, 800 lang=reply_lang 801 ) 802 803 if response: 804 logger.info(f"Successfully replied to @{author_handle}") 805 806 # Acknowledge the post we're replying to with stream.thought.ack 807 try: 808 post_uri = notification_data.get('uri') 809 post_cid = notification_data.get('cid') 810 811 if post_uri and post_cid: 812 ack_result = bsky_utils.acknowledge_post( 813 client=atproto_client, 814 post_uri=post_uri, 815 post_cid=post_cid, 816 note=ack_note 817 ) 818 if ack_result: 819 if ack_note: 820 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")") 821 else: 822 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack") 823 else: 824 logger.warning(f"Failed to acknowledge post from @{author_handle}") 825 else: 826 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}") 827 except Exception as e: 828 logger.error(f"Error acknowledging post from @{author_handle}: {e}") 829 # Don't fail the entire operation if acknowledgment fails 830 831 return True 832 else: 833 logger.error(f"Failed to send reply to @{author_handle}") 834 return False 835 else: 836 # Check if notification was explicitly ignored 837 if ignored_notification: 838 logger.info(f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})") 839 return "ignored" 840 else: 841 logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder") 842 return "no_reply" 843 844 except Exception as e: 845 logger.error(f"Error processing mention: {e}") 846 return False 847 finally: 848 # Detach user blocks after agent response (success or failure) 849 if 'attached_handles' in locals() and attached_handles: 850 try: 851 logger.info(f"Detaching user blocks for handles: {attached_handles}") 852 detach_result = detach_user_blocks(attached_handles, void_agent) 853 logger.debug(f"Detach result: {detach_result}") 854 except Exception as detach_error: 855 logger.warning(f"Failed to detach user blocks: {detach_error}") 856 857 858def notification_to_dict(notification): 859 """Convert a notification object to a dictionary for JSON serialization.""" 860 return { 861 'uri': notification.uri, 862 'cid': notification.cid, 863 'reason': notification.reason, 864 'is_read': notification.is_read, 865 'indexed_at': notification.indexed_at, 866 'author': { 867 'handle': notification.author.handle, 868 'display_name': notification.author.display_name, 869 'did': notification.author.did 870 }, 871 'record': { 872 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 873 } 874 } 875 876 877def load_processed_notifications(): 878 """Load the set of processed notification URIs.""" 879 if PROCESSED_NOTIFICATIONS_FILE.exists(): 880 try: 881 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f: 882 data = json.load(f) 883 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS) 884 if len(data) > MAX_PROCESSED_NOTIFICATIONS: 885 data = data[-MAX_PROCESSED_NOTIFICATIONS:] 886 save_processed_notifications(data) 887 return set(data) 888 except Exception as e: 889 logger.error(f"Error loading processed notifications: {e}") 890 return set() 891 892 893def save_processed_notifications(processed_set): 894 """Save the set of processed notification URIs.""" 895 try: 896 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f: 897 json.dump(list(processed_set), f) 898 except Exception as e: 899 logger.error(f"Error saving processed notifications: {e}") 900 901 902def save_notification_to_queue(notification, is_priority=None): 903 """Save a notification to the queue directory with priority-based filename.""" 904 try: 905 # Check if already processed 906 processed_uris = load_processed_notifications() 907 908 # Handle both notification objects and dicts 909 if isinstance(notification, dict): 910 notif_dict = notification 911 notification_uri = notification.get('uri') 912 else: 913 notif_dict = notification_to_dict(notification) 914 notification_uri = notification.uri 915 916 if notification_uri in processed_uris: 917 logger.debug(f"Notification already processed: {notification_uri}") 918 return False 919 920 # Create JSON string 921 notif_json = json.dumps(notif_dict, sort_keys=True) 922 923 # Generate hash for filename (to avoid duplicates) 924 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 925 926 # Determine priority based on author handle or explicit priority 927 if is_priority is not None: 928 priority_prefix = "0_" if is_priority else "1_" 929 else: 930 if isinstance(notification, dict): 931 author_handle = notification.get('author', {}).get('handle', '') 932 else: 933 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 934 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 935 936 # Create filename with priority, timestamp and hash 937 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 938 reason = notif_dict.get('reason', 'unknown') 939 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" 940 filepath = QUEUE_DIR / filename 941 942 # Check if this notification URI is already in the queue 943 for existing_file in QUEUE_DIR.glob("*.json"): 944 if existing_file.name == "processed_notifications.json": 945 continue 946 try: 947 with open(existing_file, 'r') as f: 948 existing_data = json.load(f) 949 if existing_data.get('uri') == notification_uri: 950 logger.debug(f"Notification already queued (URI: {notification_uri})") 951 return False 952 except: 953 continue 954 955 # Write to file 956 with open(filepath, 'w') as f: 957 json.dump(notif_dict, f, indent=2) 958 959 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 960 logger.info(f"Queued notification ({priority_label}): {filename}") 961 return True 962 963 except Exception as e: 964 logger.error(f"Error saving notification to queue: {e}") 965 return False 966 967 968def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 969 """Load and process all notifications from the queue in priority order.""" 970 try: 971 # Get all JSON files in queue directory (excluding processed_notifications.json) 972 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 973 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 974 975 # Filter out and delete like notifications immediately 976 queue_files = [] 977 likes_deleted = 0 978 979 for filepath in all_queue_files: 980 try: 981 with open(filepath, 'r') as f: 982 notif_data = json.load(f) 983 984 # If it's a like, delete it immediately and don't process 985 if notif_data.get('reason') == 'like': 986 filepath.unlink() 987 likes_deleted += 1 988 logger.debug(f"Deleted like notification: {filepath.name}") 989 else: 990 queue_files.append(filepath) 991 except Exception as e: 992 logger.warning(f"Error checking notification file {filepath.name}: {e}") 993 queue_files.append(filepath) # Keep it in case it's valid 994 995 if likes_deleted > 0: 996 logger.info(f"Deleted {likes_deleted} like notifications from queue") 997 998 if not queue_files: 999 return 1000 1001 logger.info(f"Processing {len(queue_files)} queued notifications") 1002 1003 # Log current statistics 1004 elapsed_time = time.time() - start_time 1005 total_messages = sum(message_counters.values()) 1006 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1007 1008 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 1009 1010 for i, filepath in enumerate(queue_files, 1): 1011 # Check for new notifications periodically during queue processing 1012 if i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1: 1013 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") 1014 try: 1015 # Fetch and queue new notifications without processing them 1016 new_count = fetch_and_queue_new_notifications(atproto_client) 1017 1018 if new_count > 0: 1019 logger.info(f"Added {new_count} new notifications to queue") 1020 # Reload the queue files to include the new items 1021 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1022 queue_files = updated_queue_files 1023 logger.info(f"Queue updated: now {len(queue_files)} total items") 1024 except Exception as e: 1025 logger.error(f"Error checking for new notifications: {e}") 1026 1027 logger.info(f"Processing queue file {i}/{len(queue_files)}: {filepath.name}") 1028 try: 1029 # Load notification data 1030 with open(filepath, 'r') as f: 1031 notif_data = json.load(f) 1032 1033 # Process based on type using dict data directly 1034 success = False 1035 if notif_data['reason'] == "mention": 1036 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1037 if success: 1038 message_counters['mentions'] += 1 1039 elif notif_data['reason'] == "reply": 1040 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1041 if success: 1042 message_counters['replies'] += 1 1043 elif notif_data['reason'] == "follow": 1044 author_handle = notif_data['author']['handle'] 1045 author_display_name = notif_data['author'].get('display_name', 'no display name') 1046 follow_update = f"@{author_handle} ({author_display_name}) started following you." 1047 logger.info(f"Notifying agent about new follower: @{author_handle}") 1048 CLIENT.agents.messages.create( 1049 agent_id = void_agent.id, 1050 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 1051 ) 1052 success = True # Follow updates are always successful 1053 if success: 1054 message_counters['follows'] += 1 1055 elif notif_data['reason'] == "repost": 1056 # Skip reposts silently 1057 success = True # Skip reposts but mark as successful to remove from queue 1058 if success: 1059 message_counters['reposts_skipped'] += 1 1060 elif notif_data['reason'] == "like": 1061 # Skip likes silently 1062 success = True # Skip likes but mark as successful to remove from queue 1063 if success: 1064 message_counters.setdefault('likes_skipped', 0) 1065 message_counters['likes_skipped'] += 1 1066 else: 1067 logger.warning(f"Unknown notification type: {notif_data['reason']}") 1068 success = True # Remove unknown types from queue 1069 1070 # Handle file based on processing result 1071 if success: 1072 if testing_mode: 1073 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") 1074 else: 1075 filepath.unlink() 1076 logger.info(f"Successfully processed and removed: {filepath.name}") 1077 1078 # Mark as processed to avoid reprocessing 1079 processed_uris = load_processed_notifications() 1080 processed_uris.add(notif_data['uri']) 1081 save_processed_notifications(processed_uris) 1082 1083 elif success is None: # Special case for moving to error directory 1084 error_path = QUEUE_ERROR_DIR / filepath.name 1085 filepath.rename(error_path) 1086 logger.warning(f"Moved {filepath.name} to errors directory") 1087 1088 # Also mark as processed to avoid retrying 1089 processed_uris = load_processed_notifications() 1090 processed_uris.add(notif_data['uri']) 1091 save_processed_notifications(processed_uris) 1092 1093 elif success == "no_reply": # Special case for moving to no_reply directory 1094 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 1095 filepath.rename(no_reply_path) 1096 logger.info(f"Moved {filepath.name} to no_reply directory") 1097 1098 # Also mark as processed to avoid retrying 1099 processed_uris = load_processed_notifications() 1100 processed_uris.add(notif_data['uri']) 1101 save_processed_notifications(processed_uris) 1102 1103 elif success == "ignored": # Special case for explicitly ignored notifications 1104 # For ignored notifications, we just delete them (not move to no_reply) 1105 filepath.unlink() 1106 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1107 1108 # Also mark as processed to avoid retrying 1109 processed_uris = load_processed_notifications() 1110 processed_uris.add(notif_data['uri']) 1111 save_processed_notifications(processed_uris) 1112 1113 else: 1114 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 1115 1116 except Exception as e: 1117 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 1118 # Keep the file for retry later 1119 1120 except Exception as e: 1121 logger.error(f"Error loading queued notifications: {e}") 1122 1123 1124def fetch_and_queue_new_notifications(atproto_client): 1125 """Fetch new notifications and queue them without processing.""" 1126 try: 1127 # Get current time for marking notifications as seen 1128 logger.debug("Getting current time for notification marking...") 1129 last_seen_at = atproto_client.get_current_time_iso() 1130 1131 # Fetch ALL notifications using pagination 1132 all_notifications = [] 1133 cursor = None 1134 page_count = 0 1135 max_pages = 20 # Safety limit to prevent infinite loops 1136 1137 while page_count < max_pages: 1138 try: 1139 # Fetch notifications page 1140 if cursor: 1141 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1142 params={'cursor': cursor, 'limit': 100} 1143 ) 1144 else: 1145 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1146 params={'limit': 100} 1147 ) 1148 1149 page_count += 1 1150 page_notifications = notifications_response.notifications 1151 1152 if not page_notifications: 1153 break 1154 1155 all_notifications.extend(page_notifications) 1156 1157 # Check if there are more pages 1158 cursor = getattr(notifications_response, 'cursor', None) 1159 if not cursor: 1160 break 1161 1162 except Exception as e: 1163 logger.error(f"Error fetching notifications page {page_count}: {e}") 1164 break 1165 1166 # Now process all fetched notifications 1167 new_count = 0 1168 if all_notifications: 1169 # Mark as seen first 1170 try: 1171 atproto_client.app.bsky.notification.update_seen( 1172 data={'seenAt': last_seen_at} 1173 ) 1174 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") 1175 except Exception as e: 1176 logger.error(f"Error marking notifications as seen: {e}") 1177 1178 # Queue all new notifications (except likes and already read) 1179 for notif in all_notifications: 1180 # Skip if already read or if it's a like 1181 if (hasattr(notif, 'is_read') and notif.is_read) or (hasattr(notif, 'reason') and notif.reason == 'like'): 1182 continue 1183 1184 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif 1185 1186 # Skip likes in dict form too 1187 if notif_dict.get('reason') == 'like': 1188 continue 1189 1190 # Check if it's a priority notification 1191 is_priority = False 1192 1193 # Priority for cameron.pfiffer.org notifications 1194 author_handle = notif_dict.get('author', {}).get('handle', '') 1195 if author_handle == "cameron.pfiffer.org": 1196 is_priority = True 1197 1198 # Also check for priority keywords in mentions 1199 if notif_dict.get('reason') == 'mention': 1200 # Get the mention text to check for priority keywords 1201 record = notif_dict.get('record', {}) 1202 text = record.get('text', '') 1203 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): 1204 is_priority = True 1205 1206 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1207 new_count += 1 1208 1209 logger.info(f"Queued {new_count} new notifications and marked as seen") 1210 else: 1211 logger.debug("No new notifications to queue") 1212 1213 return new_count 1214 1215 except Exception as e: 1216 logger.error(f"Error fetching and queueing notifications: {e}") 1217 return 0 1218 1219 1220def process_notifications(void_agent, atproto_client, testing_mode=False): 1221 """Fetch new notifications, queue them, and process the queue.""" 1222 try: 1223 # Fetch and queue new notifications 1224 new_count = fetch_and_queue_new_notifications(atproto_client) 1225 1226 if new_count > 0: 1227 logger.info(f"Found {new_count} new notifications to process") 1228 1229 # Now process the entire queue (old + new notifications) 1230 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 1231 1232 except Exception as e: 1233 logger.error(f"Error processing notifications: {e}") 1234 1235 1236def send_synthesis_message(client: Letta, agent_id: str, atproto_client=None) -> None: 1237 """ 1238 Send a synthesis message to the agent every 10 minutes. 1239 This prompts the agent to synthesize its recent experiences. 1240 1241 Args: 1242 client: Letta client 1243 agent_id: Agent ID to send synthesis to 1244 atproto_client: Optional AT Protocol client for posting synthesis results 1245 """ 1246 # Track attached temporal blocks for cleanup 1247 attached_temporal_labels = [] 1248 1249 try: 1250 logger.info("🧠 Preparing synthesis with temporal journal blocks") 1251 1252 # Attach temporal blocks before synthesis 1253 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id) 1254 if not success: 1255 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway") 1256 1257 # Create enhanced synthesis prompt 1258 today = date.today() 1259 synthesis_prompt = f"""Time for synthesis and reflection. 1260 1261You have access to temporal journal blocks for recording your thoughts and experiences: 1262- void_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')}) 1263- void_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')}) 1264- void_year_{today.year}: This year's journal ({today.year}) 1265 1266These journal blocks are attached temporarily for this synthesis session. Use them to: 12671. Record significant interactions and insights from recent experiences 12682. Track patterns in conversations and user behaviors 12693. Note your evolving understanding of the digital social environment 12704. Reflect on your growth and changes in perspective 12715. Document memorable moments or interesting discoveries 1272 1273The journal entries should be cumulative - add to existing content rather than replacing it. 1274Consider both immediate experiences (daily) and longer-term patterns (monthly/yearly). 1275 1276After recording in your journals, synthesize your recent experiences into your core memory blocks 1277(zeitgeist, void-persona, void-humans) as you normally would. 1278 1279Begin your synthesis and journaling now.""" 1280 1281 logger.info("🧠 Sending enhanced synthesis prompt to agent") 1282 1283 # Send synthesis message with streaming to show tool use 1284 message_stream = client.agents.messages.create_stream( 1285 agent_id=agent_id, 1286 messages=[{"role": "user", "content": synthesis_prompt}], 1287 stream_tokens=False, 1288 max_steps=100 1289 ) 1290 1291 # Track synthesis content for potential posting 1292 synthesis_posts = [] 1293 ack_note = None 1294 1295 # Process the streaming response 1296 for chunk in message_stream: 1297 if hasattr(chunk, 'message_type'): 1298 if chunk.message_type == 'reasoning_message': 1299 if SHOW_REASONING: 1300 print("\n◆ Reasoning") 1301 print(" ─────────") 1302 for line in chunk.reasoning.split('\n'): 1303 print(f" {line}") 1304 elif chunk.message_type == 'tool_call_message': 1305 tool_name = chunk.tool_call.name 1306 try: 1307 args = json.loads(chunk.tool_call.arguments) 1308 if tool_name == 'archival_memory_search': 1309 query = args.get('query', 'unknown') 1310 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 1311 elif tool_name == 'archival_memory_insert': 1312 content = args.get('content', '') 1313 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue") 1314 elif tool_name == 'update_block': 1315 label = args.get('label', 'unknown') 1316 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', '')) 1317 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 1318 elif tool_name == 'annotate_ack': 1319 note = args.get('note', '') 1320 if note: 1321 ack_note = note 1322 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue") 1323 elif tool_name == 'add_post_to_bluesky_reply_thread': 1324 text = args.get('text', '') 1325 synthesis_posts.append(text) 1326 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue") 1327 else: 1328 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 1329 if len(args_str) > 150: 1330 args_str = args_str[:150] + "..." 1331 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 1332 except: 1333 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 1334 elif chunk.message_type == 'tool_return_message': 1335 if chunk.status == 'success': 1336 log_with_panel("Success", f"Tool result: {chunk.name}", "green") 1337 else: 1338 log_with_panel("Error", f"Tool result: {chunk.name}", "red") 1339 elif chunk.message_type == 'assistant_message': 1340 print("\n▶ Synthesis Response") 1341 print(" ──────────────────") 1342 for line in chunk.content.split('\n'): 1343 print(f" {line}") 1344 1345 if str(chunk) == 'done': 1346 break 1347 1348 logger.info("🧠 Synthesis message processed successfully") 1349 1350 # Handle synthesis acknowledgments if we have an atproto client 1351 if atproto_client and ack_note: 1352 try: 1353 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note) 1354 if result: 1355 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...") 1356 else: 1357 logger.warning("Failed to create synthesis acknowledgment") 1358 except Exception as e: 1359 logger.error(f"Error creating synthesis acknowledgment: {e}") 1360 1361 # Handle synthesis posts if any were generated 1362 if atproto_client and synthesis_posts: 1363 try: 1364 for post_text in synthesis_posts: 1365 cleaned_text = bsky_utils.remove_outside_quotes(post_text) 1366 response = bsky_utils.send_post(atproto_client, cleaned_text) 1367 if response: 1368 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...") 1369 else: 1370 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...") 1371 except Exception as e: 1372 logger.error(f"Error posting synthesis content: {e}") 1373 1374 except Exception as e: 1375 logger.error(f"Error sending synthesis message: {e}") 1376 finally: 1377 # Always detach temporal blocks after synthesis 1378 if attached_temporal_labels: 1379 logger.info("🧠 Detaching temporal journal blocks after synthesis") 1380 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels) 1381 if not detach_success: 1382 logger.warning("Some temporal blocks may not have been detached properly") 1383 1384 1385def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None: 1386 """ 1387 Detach all user blocks from the agent to prevent memory bloat. 1388 This should be called periodically to ensure clean state. 1389 """ 1390 try: 1391 # Get all blocks attached to the agent 1392 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 1393 1394 user_blocks_to_detach = [] 1395 for block in attached_blocks: 1396 if hasattr(block, 'label') and block.label.startswith('user_'): 1397 user_blocks_to_detach.append({ 1398 'label': block.label, 1399 'id': block.id 1400 }) 1401 1402 if not user_blocks_to_detach: 1403 logger.debug("No user blocks found to detach during periodic cleanup") 1404 return 1405 1406 # Detach each user block 1407 detached_count = 0 1408 for block_info in user_blocks_to_detach: 1409 try: 1410 client.agents.blocks.detach( 1411 agent_id=agent_id, 1412 block_id=str(block_info['id']) 1413 ) 1414 detached_count += 1 1415 logger.debug(f"Detached user block: {block_info['label']}") 1416 except Exception as e: 1417 logger.warning(f"Failed to detach block {block_info['label']}: {e}") 1418 1419 if detached_count > 0: 1420 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks") 1421 1422 except Exception as e: 1423 logger.error(f"Error during periodic user block cleanup: {e}") 1424 1425 1426def attach_temporal_blocks(client: Letta, agent_id: str) -> tuple: 1427 """ 1428 Attach temporal journal blocks (day, month, year) to the agent for synthesis. 1429 Creates blocks if they don't exist. 1430 1431 Returns: 1432 Tuple of (success: bool, attached_labels: list) 1433 """ 1434 try: 1435 today = date.today() 1436 1437 # Generate temporal block labels 1438 day_label = f"void_day_{today.strftime('%Y_%m_%d')}" 1439 month_label = f"void_month_{today.strftime('%Y_%m')}" 1440 year_label = f"void_year_{today.year}" 1441 1442 temporal_labels = [day_label, month_label, year_label] 1443 attached_labels = [] 1444 1445 # Get current blocks attached to agent 1446 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1447 current_block_labels = {block.label for block in current_blocks} 1448 current_block_ids = {str(block.id) for block in current_blocks} 1449 1450 for label in temporal_labels: 1451 try: 1452 # Skip if already attached 1453 if label in current_block_labels: 1454 logger.debug(f"Temporal block already attached: {label}") 1455 attached_labels.append(label) 1456 continue 1457 1458 # Check if block exists globally 1459 blocks = client.blocks.list(label=label) 1460 1461 if blocks and len(blocks) > 0: 1462 block = blocks[0] 1463 # Check if already attached by ID 1464 if str(block.id) in current_block_ids: 1465 logger.debug(f"Temporal block already attached by ID: {label}") 1466 attached_labels.append(label) 1467 continue 1468 else: 1469 # Create new temporal block with appropriate header 1470 if "day" in label: 1471 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}" 1472 initial_content = f"{header}\n\nNo entries yet for today." 1473 elif "month" in label: 1474 header = f"# Monthly Journal - {today.strftime('%B %Y')}" 1475 initial_content = f"{header}\n\nNo entries yet for this month." 1476 else: # year 1477 header = f"# Yearly Journal - {today.year}" 1478 initial_content = f"{header}\n\nNo entries yet for this year." 1479 1480 block = client.blocks.create( 1481 label=label, 1482 value=initial_content, 1483 limit=10000 # Larger limit for journal blocks 1484 ) 1485 logger.info(f"Created new temporal block: {label}") 1486 1487 # Attach the block 1488 client.agents.blocks.attach( 1489 agent_id=agent_id, 1490 block_id=str(block.id) 1491 ) 1492 attached_labels.append(label) 1493 logger.info(f"Attached temporal block: {label}") 1494 1495 except Exception as e: 1496 # Check for duplicate constraint errors 1497 error_str = str(e) 1498 if "duplicate key value violates unique constraint" in error_str: 1499 logger.debug(f"Temporal block already attached (constraint): {label}") 1500 attached_labels.append(label) 1501 else: 1502 logger.warning(f"Failed to attach temporal block {label}: {e}") 1503 1504 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}") 1505 return True, attached_labels 1506 1507 except Exception as e: 1508 logger.error(f"Error attaching temporal blocks: {e}") 1509 return False, [] 1510 1511 1512def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: list = None) -> bool: 1513 """ 1514 Detach temporal journal blocks from the agent after synthesis. 1515 1516 Args: 1517 client: Letta client 1518 agent_id: Agent ID 1519 labels_to_detach: Optional list of specific labels to detach. 1520 If None, detaches all temporal blocks. 1521 1522 Returns: 1523 bool: Success status 1524 """ 1525 try: 1526 # If no specific labels provided, generate today's labels 1527 if labels_to_detach is None: 1528 today = date.today() 1529 labels_to_detach = [ 1530 f"void_day_{today.strftime('%Y_%m_%d')}", 1531 f"void_month_{today.strftime('%Y_%m')}", 1532 f"void_year_{today.year}" 1533 ] 1534 1535 # Get current blocks and build label to ID mapping 1536 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1537 block_label_to_id = {block.label: str(block.id) for block in current_blocks} 1538 1539 detached_count = 0 1540 for label in labels_to_detach: 1541 if label in block_label_to_id: 1542 try: 1543 client.agents.blocks.detach( 1544 agent_id=agent_id, 1545 block_id=block_label_to_id[label] 1546 ) 1547 detached_count += 1 1548 logger.debug(f"Detached temporal block: {label}") 1549 except Exception as e: 1550 logger.warning(f"Failed to detach temporal block {label}: {e}") 1551 else: 1552 logger.debug(f"Temporal block not attached: {label}") 1553 1554 logger.info(f"Detached {detached_count} temporal blocks") 1555 return True 1556 1557 except Exception as e: 1558 logger.error(f"Error detaching temporal blocks: {e}") 1559 return False 1560 1561 1562def main(): 1563 # Parse command line arguments 1564 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 1565 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1566 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1567 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') 1568 # --rich option removed as we now use simple text formatting 1569 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') 1570 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 1571 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') 1572 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') 1573 args = parser.parse_args() 1574 1575 # Configure logging based on command line arguments 1576 if args.simple_logs: 1577 log_format = "void - %(levelname)s - %(message)s" 1578 else: 1579 # Create custom formatter with symbols 1580 class SymbolFormatter(logging.Formatter): 1581 """Custom formatter that adds symbols for different log levels""" 1582 1583 SYMBOLS = { 1584 logging.DEBUG: '', 1585 logging.INFO: '', 1586 logging.WARNING: '', 1587 logging.ERROR: '', 1588 logging.CRITICAL: '' 1589 } 1590 1591 def format(self, record): 1592 # Get the symbol for this log level 1593 symbol = self.SYMBOLS.get(record.levelno, '') 1594 1595 # Format time as HH:MM:SS 1596 timestamp = self.formatTime(record, "%H:%M:%S") 1597 1598 # Build the formatted message 1599 level_name = f"{record.levelname:<5}" # Left-align, 5 chars 1600 1601 # Use vertical bar as separator 1602 parts = [symbol, timestamp, '', level_name, '', record.getMessage()] 1603 1604 return ' '.join(parts) 1605 1606 # Reset logging configuration 1607 for handler in logging.root.handlers[:]: 1608 logging.root.removeHandler(handler) 1609 1610 # Create handler with custom formatter 1611 handler = logging.StreamHandler() 1612 if not args.simple_logs: 1613 handler.setFormatter(SymbolFormatter()) 1614 else: 1615 handler.setFormatter(logging.Formatter(log_format)) 1616 1617 # Configure root logger 1618 logging.root.setLevel(logging.INFO) 1619 logging.root.addHandler(handler) 1620 1621 global logger, prompt_logger, console 1622 logger = logging.getLogger("void_bot") 1623 logger.setLevel(logging.INFO) 1624 1625 # Create a separate logger for prompts (set to WARNING to hide by default) 1626 prompt_logger = logging.getLogger("void_bot.prompts") 1627 if args.reasoning: 1628 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1629 else: 1630 prompt_logger.setLevel(logging.WARNING) # Hide by default 1631 1632 # Disable httpx logging completely 1633 logging.getLogger("httpx").setLevel(logging.CRITICAL) 1634 1635 # Create Rich console for pretty printing 1636 # Console no longer used - simple text formatting 1637 1638 global TESTING_MODE, SKIP_GIT, SHOW_REASONING 1639 TESTING_MODE = args.test 1640 1641 # Store no-git flag globally for use in export_agent_state calls 1642 SKIP_GIT = args.no_git 1643 1644 # Store rich flag globally 1645 # Rich formatting no longer used 1646 1647 # Store reasoning flag globally 1648 SHOW_REASONING = args.reasoning 1649 1650 if TESTING_MODE: 1651 logger.info("=== RUNNING IN TESTING MODE ===") 1652 logger.info(" - No messages will be sent to Bluesky") 1653 logger.info(" - Queue files will not be deleted") 1654 logger.info(" - Notifications will not be marked as seen") 1655 print("\n") 1656 1657 # Check for synthesis-only mode 1658 SYNTHESIS_ONLY = args.synthesis_only 1659 if SYNTHESIS_ONLY: 1660 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===") 1661 logger.info(" - Only synthesis messages will be sent") 1662 logger.info(" - No notification processing") 1663 logger.info(" - No Bluesky client needed") 1664 print("\n") 1665 """Main bot loop that continuously monitors for notifications.""" 1666 global start_time 1667 start_time = time.time() 1668 logger.info("=== STARTING VOID BOT ===") 1669 void_agent = initialize_void() 1670 logger.info(f"Void agent initialized: {void_agent.id}") 1671 1672 # Ensure correct tools are attached for Bluesky 1673 logger.info("Configuring tools for Bluesky platform...") 1674 try: 1675 from tool_manager import ensure_platform_tools 1676 ensure_platform_tools('bluesky', void_agent.id) 1677 except Exception as e: 1678 logger.error(f"Failed to configure platform tools: {e}") 1679 logger.warning("Continuing with existing tool configuration") 1680 1681 # Check if agent has required tools 1682 if hasattr(void_agent, 'tools') and void_agent.tools: 1683 tool_names = [tool.name for tool in void_agent.tools] 1684 # Check for bluesky-related tools 1685 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1686 if not bluesky_tools: 1687 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 1688 else: 1689 logger.warning("Agent has no tools registered!") 1690 1691 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts) 1692 if not SYNTHESIS_ONLY: 1693 atproto_client = bsky_utils.default_login() 1694 logger.info("Connected to Bluesky") 1695 else: 1696 # In synthesis-only mode, still connect for acks and posts (unless in test mode) 1697 if not args.test: 1698 atproto_client = bsky_utils.default_login() 1699 logger.info("Connected to Bluesky (for synthesis acks/posts)") 1700 else: 1701 atproto_client = None 1702 logger.info("Skipping Bluesky connection (test mode)") 1703 1704 # Configure intervals 1705 CLEANUP_INTERVAL = args.cleanup_interval 1706 SYNTHESIS_INTERVAL = args.synthesis_interval 1707 1708 # Synthesis-only mode 1709 if SYNTHESIS_ONLY: 1710 if SYNTHESIS_INTERVAL <= 0: 1711 logger.error("Synthesis-only mode requires --synthesis-interval > 0") 1712 return 1713 1714 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 1715 1716 while True: 1717 try: 1718 # Send synthesis message immediately on first run 1719 logger.info("🧠 Sending synthesis message") 1720 send_synthesis_message(CLIENT, void_agent.id, atproto_client) 1721 1722 # Wait for next interval 1723 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...") 1724 sleep(SYNTHESIS_INTERVAL) 1725 1726 except KeyboardInterrupt: 1727 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===") 1728 break 1729 except Exception as e: 1730 logger.error(f"Error in synthesis loop: {e}") 1731 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...") 1732 sleep(SYNTHESIS_INTERVAL) 1733 1734 # Normal mode with notification processing 1735 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 1736 1737 cycle_count = 0 1738 1739 if CLEANUP_INTERVAL > 0: 1740 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles") 1741 else: 1742 logger.info("User block cleanup disabled") 1743 1744 if SYNTHESIS_INTERVAL > 0: 1745 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 1746 else: 1747 logger.info("Synthesis messages disabled") 1748 1749 while True: 1750 try: 1751 cycle_count += 1 1752 process_notifications(void_agent, atproto_client, TESTING_MODE) 1753 1754 # Check if synthesis interval has passed 1755 if SYNTHESIS_INTERVAL > 0: 1756 current_time = time.time() 1757 global last_synthesis_time 1758 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL: 1759 logger.info(f"{SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis") 1760 send_synthesis_message(CLIENT, void_agent.id, atproto_client) 1761 last_synthesis_time = current_time 1762 1763 # Run periodic cleanup every N cycles 1764 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 1765 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 1766 periodic_user_block_cleanup(CLIENT, void_agent.id) 1767 1768 # Log cycle completion with stats 1769 elapsed_time = time.time() - start_time 1770 total_messages = sum(message_counters.values()) 1771 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1772 1773 if total_messages > 0: 1774 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 1775 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 1776 1777 except KeyboardInterrupt: 1778 # Final stats 1779 elapsed_time = time.time() - start_time 1780 total_messages = sum(message_counters.values()) 1781 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1782 1783 logger.info("=== BOT STOPPED BY USER ===") 1784 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 1785 logger.info(f" - {message_counters['mentions']} mentions") 1786 logger.info(f" - {message_counters['replies']} replies") 1787 logger.info(f" - {message_counters['follows']} follows") 1788 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 1789 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 1790 break 1791 except Exception as e: 1792 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 1793 logger.error(f"Error details: {e}") 1794 # Wait a bit longer on errors 1795 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 1796 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 1797 1798 1799if __name__ == "__main__": 1800 main()