a digital person for bluesky
1# Rich imports removed - using simple text formatting 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14import argparse 15 16from utils import ( 17 upsert_block, 18 upsert_agent 19) 20from config_loader import get_letta_config, get_config, get_queue_config 21 22import bsky_utils 23from tools.blocks import attach_user_blocks, detach_user_blocks 24from datetime import date 25from notification_db import NotificationDB 26 27def extract_handles_from_data(data): 28 """Recursively extract all unique handles from nested data structure.""" 29 handles = set() 30 31 def _extract_recursive(obj): 32 if isinstance(obj, dict): 33 # Check if this dict has a 'handle' key 34 if 'handle' in obj: 35 handles.add(obj['handle']) 36 # Recursively check all values 37 for value in obj.values(): 38 _extract_recursive(value) 39 elif isinstance(obj, list): 40 # Recursively check all list items 41 for item in obj: 42 _extract_recursive(item) 43 44 _extract_recursive(data) 45 return list(handles) 46 47# Logging will be configured after argument parsing 48logger = None 49prompt_logger = None 50# Simple text formatting (Rich no longer used) 51SHOW_REASONING = False 52last_archival_query = "archival memory search" 53 54def log_with_panel(message, title=None, border_color="white"): 55 """Log a message with Unicode box-drawing characters""" 56 if title: 57 # Map old color names to appropriate symbols 58 symbol_map = { 59 "blue": "", # Tool calls 60 "green": "", # Success/completion 61 "yellow": "", # Reasoning 62 "red": "", # Errors 63 "white": "", # Default/mentions 64 "cyan": "", # Posts 65 } 66 symbol = symbol_map.get(border_color, "") 67 68 print(f"\n{symbol} {title}") 69 print(f" {'' * len(title)}") 70 # Indent message lines 71 for line in message.split('\n'): 72 print(f" {line}") 73 else: 74 print(message) 75 76 77# Load Letta configuration from config.yaml (will be initialized later with custom path if provided) 78letta_config = None 79CLIENT = None 80 81# Notification check delay 82FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response 83 84# Check for new notifications every N queue items 85CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing 86 87# Queue paths (will be initialized from config in main()) 88QUEUE_DIR = None 89QUEUE_ERROR_DIR = None 90QUEUE_NO_REPLY_DIR = None 91PROCESSED_NOTIFICATIONS_FILE = None 92 93# Maximum number of processed notifications to track 94MAX_PROCESSED_NOTIFICATIONS = 10000 95 96# Message tracking counters 97message_counters = defaultdict(int) 98start_time = time.time() 99 100# Testing mode flag 101TESTING_MODE = False 102 103# Skip git operations flag 104SKIP_GIT = False 105 106# Synthesis message tracking 107last_synthesis_time = time.time() 108 109# Database for notification tracking 110NOTIFICATION_DB = None 111 112def export_agent_state(client, agent, skip_git=False): 113 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 114 try: 115 # Confirm export with user unless git is being skipped 116 if not skip_git: 117 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 118 if response not in ['y', 'yes']: 119 logger.info("Agent export cancelled by user.") 120 return 121 else: 122 logger.info("Exporting agent state (git staging disabled)") 123 124 # Create directories if they don't exist 125 os.makedirs("agent_archive", exist_ok=True) 126 os.makedirs("agents", exist_ok=True) 127 128 # Export agent data 129 logger.info(f"Exporting agent {agent.id}. This takes some time...") 130 agent_data = client.agents.export_file(agent_id=agent.id) 131 132 # Save timestamped archive copy 133 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 134 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 135 with open(archive_file, 'w', encoding='utf-8') as f: 136 json.dump(agent_data, f, indent=2, ensure_ascii=False) 137 138 # Save current agent state 139 current_file = os.path.join("agents", "void.af") 140 with open(current_file, 'w', encoding='utf-8') as f: 141 json.dump(agent_data, f, indent=2, ensure_ascii=False) 142 143 logger.info(f"Agent exported to {archive_file} and {current_file}") 144 145 # Git add only the current agent file (archive is ignored) unless skip_git is True 146 if not skip_git: 147 try: 148 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 149 logger.info("Added current agent file to git staging") 150 except subprocess.CalledProcessError as e: 151 logger.warning(f"Failed to git add agent file: {e}") 152 153 except Exception as e: 154 logger.error(f"Failed to export agent: {e}") 155 156def initialize_void(): 157 logger.info("Starting void agent initialization...") 158 159 # Get the configured void agent by ID 160 logger.info("Loading void agent from config...") 161 agent_id = letta_config['agent_id'] 162 163 try: 164 void_agent = CLIENT.agents.retrieve(agent_id=agent_id) 165 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})") 166 except Exception as e: 167 logger.error(f"Failed to load void agent {agent_id}: {e}") 168 logger.error("Please ensure the agent_id in config.yaml is correct") 169 raise e 170 171 # Export agent state 172 logger.info("Exporting agent state...") 173 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 174 175 # Log agent details 176 logger.info(f"Void agent details - ID: {void_agent.id}") 177 logger.info(f"Agent name: {void_agent.name}") 178 if hasattr(void_agent, 'llm_config'): 179 logger.info(f"Agent model: {void_agent.llm_config.model}") 180 if hasattr(void_agent, 'project_id') and void_agent.project_id: 181 logger.info(f"Agent project_id: {void_agent.project_id}") 182 if hasattr(void_agent, 'tools'): 183 logger.info(f"Agent has {len(void_agent.tools)} tools") 184 for tool in void_agent.tools[:3]: # Show first 3 tools 185 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 186 187 return void_agent 188 189 190def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 191 """Process a mention and generate a reply using the Letta agent. 192 193 Args: 194 void_agent: The Letta agent instance 195 atproto_client: The AT Protocol client 196 notification_data: The notification data dictionary 197 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 198 199 Returns: 200 True: Successfully processed, remove from queue 201 False: Failed but retryable, keep in queue 202 None: Failed with non-retryable error, move to errors directory 203 "no_reply": No reply was generated, move to no_reply directory 204 """ 205 import uuid 206 207 # Generate correlation ID for tracking this notification through the pipeline 208 correlation_id = str(uuid.uuid4())[:8] 209 210 try: 211 logger.info(f"[{correlation_id}] Starting process_mention", extra={ 212 'correlation_id': correlation_id, 213 'notification_type': type(notification_data).__name__ 214 }) 215 216 # Handle both dict and object inputs for backwards compatibility 217 if isinstance(notification_data, dict): 218 uri = notification_data['uri'] 219 mention_text = notification_data.get('record', {}).get('text', '') 220 author_handle = notification_data['author']['handle'] 221 author_name = notification_data['author'].get('display_name') or author_handle 222 else: 223 # Legacy object access 224 uri = notification_data.uri 225 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 226 author_handle = notification_data.author.handle 227 author_name = notification_data.author.display_name or author_handle 228 229 logger.info(f"[{correlation_id}] Processing mention from @{author_handle}", extra={ 230 'correlation_id': correlation_id, 231 'author_handle': author_handle, 232 'author_name': author_name, 233 'mention_uri': uri, 234 'mention_text_length': len(mention_text), 235 'mention_preview': mention_text[:100] if mention_text else '' 236 }) 237 238 # Retrieve the entire thread associated with the mention 239 try: 240 thread = atproto_client.app.bsky.feed.get_post_thread({ 241 'uri': uri, 242 'parent_height': 40, 243 'depth': 10 244 }) 245 except Exception as e: 246 error_str = str(e) 247 # Check if this is a NotFound error 248 if 'NotFound' in error_str or 'Post not found' in error_str: 249 logger.warning(f"Post not found for URI {uri}, removing from queue") 250 return True # Return True to remove from queue 251 else: 252 # Re-raise other errors 253 logger.error(f"Error fetching thread: {e}") 254 raise 255 256 # Get thread context as YAML string 257 logger.debug("Converting thread to YAML string") 258 try: 259 thread_context = thread_to_yaml_string(thread) 260 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 261 262 # Check if #voidstop appears anywhere in the thread 263 if "#voidstop" in thread_context.lower(): 264 logger.info("Found #voidstop in thread context, skipping this mention") 265 return True # Return True to remove from queue 266 267 # Also check the mention text directly 268 if "#voidstop" in mention_text.lower(): 269 logger.info("Found #voidstop in mention text, skipping this mention") 270 return True # Return True to remove from queue 271 272 # Create a more informative preview by extracting meaningful content 273 lines = thread_context.split('\n') 274 meaningful_lines = [] 275 276 for line in lines: 277 stripped = line.strip() 278 if not stripped: 279 continue 280 281 # Look for lines with actual content (not just structure) 282 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 283 meaningful_lines.append(line) 284 if len(meaningful_lines) >= 5: 285 break 286 287 if meaningful_lines: 288 preview = '\n'.join(meaningful_lines) 289 logger.debug(f"Thread content preview:\n{preview}") 290 else: 291 # If no content fields found, just show it's a thread structure 292 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 293 except Exception as yaml_error: 294 import traceback 295 logger.error(f"Error converting thread to YAML: {yaml_error}") 296 logger.error(f"Full traceback:\n{traceback.format_exc()}") 297 logger.error(f"Thread type: {type(thread)}") 298 if hasattr(thread, '__dict__'): 299 logger.error(f"Thread attributes: {thread.__dict__}") 300 # Try to continue with a simple context 301 thread_context = f"Error processing thread context: {str(yaml_error)}" 302 303 # Create a prompt for the Letta agent with thread context 304 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 305 306MOST RECENT POST: 307"{mention_text}" 308 309THREAD CONTEXT: 310```yaml 311{thread_context} 312``` 313 314If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call creates one post (max 300 characters). You may use multiple calls to create a thread if needed.""" 315 316 # Extract all handles from notification and thread data 317 all_handles = set() 318 all_handles.update(extract_handles_from_data(notification_data)) 319 all_handles.update(extract_handles_from_data(thread.model_dump())) 320 unique_handles = list(all_handles) 321 322 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 323 324 # Check if any handles are in known_bots list 325 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs 326 import json 327 328 try: 329 # Check for known bots in thread 330 bot_check_result = check_known_bots(unique_handles, void_agent) 331 bot_check_data = json.loads(bot_check_result) 332 333 # TEMPORARILY DISABLED: Bot detection causing issues with normal users 334 # TODO: Re-enable after debugging why normal users are being flagged as bots 335 if False: # bot_check_data.get("bot_detected", False): 336 detected_bots = bot_check_data.get("detected_bots", []) 337 logger.info(f"Bot detected in thread: {detected_bots}") 338 339 # Decide whether to respond (10% chance) 340 if not should_respond_to_bot_thread(): 341 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}") 342 # Return False to keep in queue for potential later processing 343 return False 344 else: 345 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}") 346 else: 347 logger.debug("Bot detection disabled - processing all notifications") 348 349 except Exception as bot_check_error: 350 logger.warning(f"Error checking for bots: {bot_check_error}") 351 # Continue processing if bot check fails 352 353 # Attach user blocks before agent call 354 attached_handles = [] 355 if unique_handles: 356 try: 357 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 358 attach_result = attach_user_blocks(unique_handles, void_agent) 359 attached_handles = unique_handles # Track successfully attached handles 360 logger.debug(f"Attach result: {attach_result}") 361 except Exception as attach_error: 362 logger.warning(f"Failed to attach user blocks: {attach_error}") 363 # Continue without user blocks rather than failing completely 364 365 # Get response from Letta agent 366 # Format with Unicode characters 367 title = f"MENTION FROM @{author_handle}" 368 print(f"\n{title}") 369 print(f" {'' * len(title)}") 370 # Indent the mention text 371 for line in mention_text.split('\n'): 372 print(f" {line}") 373 374 # Log prompt details to separate logger 375 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 376 377 # Log concise prompt info to main logger 378 thread_handles_count = len(unique_handles) 379 prompt_char_count = len(prompt) 380 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars") 381 382 try: 383 # Use streaming to avoid 524 timeout errors 384 message_stream = CLIENT.agents.messages.create_stream( 385 agent_id=void_agent.id, 386 messages=[{"role": "user", "content": prompt}], 387 stream_tokens=False, # Step streaming only (faster than token streaming) 388 max_steps=100 389 ) 390 391 # Collect the streaming response 392 all_messages = [] 393 for chunk in message_stream: 394 # Log condensed chunk info 395 if hasattr(chunk, 'message_type'): 396 if chunk.message_type == 'reasoning_message': 397 # Show full reasoning without truncation 398 if SHOW_REASONING: 399 # Format with Unicode characters 400 print("\n◆ Reasoning") 401 print(" ─────────") 402 # Indent reasoning lines 403 for line in chunk.reasoning.split('\n'): 404 print(f" {line}") 405 else: 406 # Default log format (only when --reasoning is used due to log level) 407 # Format with Unicode characters 408 print("\n◆ Reasoning") 409 print(" ─────────") 410 # Indent reasoning lines 411 for line in chunk.reasoning.split('\n'): 412 print(f" {line}") 413 414 # Create ATProto record for reasoning (unless in testing mode) 415 if not testing_mode and hasattr(chunk, 'reasoning'): 416 try: 417 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 418 except Exception as e: 419 logger.debug(f"Failed to create reasoning record: {e}") 420 elif chunk.message_type == 'tool_call_message': 421 # Parse tool arguments for better display 422 tool_name = chunk.tool_call.name 423 424 # Create ATProto record for tool call (unless in testing mode) 425 if not testing_mode: 426 try: 427 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 428 bsky_utils.create_tool_call_record( 429 atproto_client, 430 tool_name, 431 chunk.tool_call.arguments, 432 tool_call_id 433 ) 434 except Exception as e: 435 logger.debug(f"Failed to create tool call record: {e}") 436 437 try: 438 args = json.loads(chunk.tool_call.arguments) 439 # Format based on tool type 440 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: 441 # Extract the text being posted 442 text = args.get('text', '') 443 if text: 444 # Format with Unicode characters 445 print("\n✎ Bluesky Post") 446 print(" ────────────") 447 # Indent post text 448 for line in text.split('\n'): 449 print(f" {line}") 450 else: 451 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 452 elif tool_name == 'archival_memory_search': 453 query = args.get('query', 'unknown') 454 global last_archival_query 455 last_archival_query = query 456 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 457 elif tool_name == 'archival_memory_insert': 458 content = args.get('content', '') 459 # Show the full content being inserted 460 log_with_panel(content, f"Tool call: {tool_name}", "blue") 461 elif tool_name == 'update_block': 462 label = args.get('label', 'unknown') 463 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 464 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 465 else: 466 # Generic display for other tools 467 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 468 if len(args_str) > 150: 469 args_str = args_str[:150] + "..." 470 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 471 except: 472 # Fallback to original format if parsing fails 473 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 474 elif chunk.message_type == 'tool_return_message': 475 # Enhanced tool result logging 476 tool_name = chunk.name 477 status = chunk.status 478 479 if status == 'success': 480 # Try to show meaningful result info based on tool type 481 if hasattr(chunk, 'tool_return') and chunk.tool_return: 482 result_str = str(chunk.tool_return) 483 if tool_name == 'archival_memory_search': 484 485 try: 486 # Handle both string and list formats 487 if isinstance(chunk.tool_return, str): 488 # The string format is: "([{...}, {...}], count)" 489 # We need to extract just the list part 490 if chunk.tool_return.strip(): 491 # Find the list part between the first [ and last ] 492 start_idx = chunk.tool_return.find('[') 493 end_idx = chunk.tool_return.rfind(']') 494 if start_idx != -1 and end_idx != -1: 495 list_str = chunk.tool_return[start_idx:end_idx+1] 496 # Use ast.literal_eval since this is Python literal syntax, not JSON 497 import ast 498 results = ast.literal_eval(list_str) 499 else: 500 logger.warning("Could not find list in archival_memory_search result") 501 results = [] 502 else: 503 logger.warning("Empty string returned from archival_memory_search") 504 results = [] 505 else: 506 # If it's already a list, use directly 507 results = chunk.tool_return 508 509 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name}", "green") 510 511 # Use the captured search query from the tool call 512 search_query = last_archival_query 513 514 # Combine all results into a single text block 515 content_text = "" 516 for i, entry in enumerate(results, 1): 517 timestamp = entry.get('timestamp', 'N/A') 518 content = entry.get('content', '') 519 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" 520 521 # Format with Unicode characters 522 title = f"{search_query} ({len(results)} results)" 523 print(f"\n{title}") 524 print(f" {'' * len(title)}") 525 # Indent content text 526 for line in content_text.strip().split('\n'): 527 print(f" {line}") 528 529 except Exception as e: 530 logger.error(f"Error formatting archival memory results: {e}") 531 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name}", "green") 532 elif tool_name == 'add_post_to_bluesky_reply_thread': 533 # Just show success for bluesky posts, the text was already shown in tool call 534 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") 535 elif tool_name == 'archival_memory_insert': 536 # Skip archival memory insert results (always returns None) 537 pass 538 elif tool_name == 'update_block': 539 log_with_panel("Memory block updated", f"Tool result: {tool_name}", "green") 540 else: 541 # Generic success with preview 542 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 543 log_with_panel(preview, f"Tool result: {tool_name}", "green") 544 else: 545 log_with_panel("Success", f"Tool result: {tool_name}", "green") 546 elif status == 'error': 547 # Show error details 548 if tool_name == 'add_post_to_bluesky_reply_thread': 549 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" 550 log_with_panel(error_str, f"Bluesky Post ✗", "red") 551 elif tool_name == 'archival_memory_insert': 552 # Skip archival memory insert errors too 553 pass 554 else: 555 error_preview = "" 556 if hasattr(chunk, 'tool_return') and chunk.tool_return: 557 error_str = str(chunk.tool_return) 558 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 559 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name}", "red") 560 else: 561 log_with_panel("Error occurred", f"Tool result: {tool_name}", "red") 562 else: 563 logger.info(f"Tool result: {tool_name} - {status}") 564 elif chunk.message_type == 'assistant_message': 565 # Format with Unicode characters 566 print("\n▶ Assistant Response") 567 print(" ──────────────────") 568 # Indent response text 569 for line in chunk.content.split('\n'): 570 print(f" {line}") 571 else: 572 # Filter out verbose message types 573 if chunk.message_type not in ['usage_statistics', 'stop_reason']: 574 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") 575 else: 576 logger.info(f"📦 Stream status: {chunk}") 577 578 # Log full chunk for debugging 579 logger.debug(f"Full streaming chunk: {chunk}") 580 all_messages.append(chunk) 581 if str(chunk) == 'done': 582 break 583 584 # Convert streaming response to standard format for compatibility 585 message_response = type('StreamingResponse', (), { 586 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 587 })() 588 except Exception as api_error: 589 import traceback 590 error_str = str(api_error) 591 logger.error(f"Letta API error: {api_error}") 592 logger.error(f"Error type: {type(api_error).__name__}") 593 logger.error(f"Full traceback:\n{traceback.format_exc()}") 594 logger.error(f"Mention text was: {mention_text}") 595 logger.error(f"Author: @{author_handle}") 596 logger.error(f"URI: {uri}") 597 598 599 # Try to extract more info from different error types 600 if hasattr(api_error, 'response'): 601 logger.error(f"Error response object exists") 602 if hasattr(api_error.response, 'text'): 603 logger.error(f"Response text: {api_error.response.text}") 604 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 605 try: 606 logger.error(f"Response JSON: {api_error.response.json()}") 607 except: 608 pass 609 610 # Check for specific error types 611 if hasattr(api_error, 'status_code'): 612 logger.error(f"API Status code: {api_error.status_code}") 613 if hasattr(api_error, 'body'): 614 logger.error(f"API Response body: {api_error.body}") 615 if hasattr(api_error, 'headers'): 616 logger.error(f"API Response headers: {api_error.headers}") 617 618 if api_error.status_code == 413: 619 logger.error("413 Payload Too Large - moving to errors directory") 620 return None # Move to errors directory - payload is too large to ever succeed 621 elif api_error.status_code == 524: 622 logger.error("524 error - timeout from Cloudflare, will retry later") 623 return False # Keep in queue for retry 624 625 # Check if error indicates we should remove from queue 626 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 627 logger.warning("Payload too large error, moving to errors directory") 628 return None # Move to errors directory - cannot be fixed by retry 629 elif 'status_code: 524' in error_str: 630 logger.warning("524 timeout error, keeping in queue for retry") 631 return False # Keep in queue for retry 632 633 raise 634 635 # Log successful response 636 logger.debug("Successfully received response from Letta API") 637 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 638 639 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 640 reply_candidates = [] 641 tool_call_results = {} # Map tool_call_id to status 642 ack_note = None # Track any note from annotate_ack tool 643 flagged_memories = [] # Track memories flagged for deletion 644 645 logger.debug(f"Processing {len(message_response.messages)} response messages...") 646 647 # First pass: collect tool return statuses 648 ignored_notification = False 649 ignore_reason = "" 650 ignore_category = "" 651 652 logger.debug(f"Processing {len(message_response.messages)} messages from agent") 653 654 for message in message_response.messages: 655 # Log detailed message attributes for debugging 656 msg_type = getattr(message, 'message_type', 'unknown') 657 has_tool_call_id = hasattr(message, 'tool_call_id') 658 has_status = hasattr(message, 'status') 659 has_tool_return = hasattr(message, 'tool_return') 660 661 logger.debug(f"Message type={msg_type}, has_tool_call_id={has_tool_call_id}, has_status={has_status}, has_tool_return={has_tool_return}") 662 663 # Tool return messages are identified by having tool_return attribute, tool_call_id, and status 664 if has_tool_call_id and has_status and has_tool_return: 665 logger.debug(f" -> tool_call_id={message.tool_call_id}, status={message.status}") 666 667 # Store the result for ANY tool that has a return - we'll match by tool_call_id later 668 tool_call_results[message.tool_call_id] = message.status 669 logger.debug(f"Stored tool result: {message.tool_call_id} -> {message.status}") 670 671 # Handle special processing for ignore_notification 672 if message.status == 'success': 673 result_str = str(message.tool_return) if message.tool_return else "" 674 if 'IGNORED_NOTIFICATION::' in result_str: 675 parts = result_str.split('::') 676 if len(parts) >= 3: 677 ignore_category = parts[1] 678 ignore_reason = parts[2] 679 ignored_notification = True 680 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 681 682 # Check for deprecated tool in tool call messages 683 elif hasattr(message, 'tool_call') and message.tool_call: 684 if message.tool_call.name == 'bluesky_reply': 685 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 686 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 687 logger.error("Update the agent's tools using register_tools.py") 688 # Export agent state before terminating 689 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 690 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 691 exit(1) 692 693 logger.debug(f"First pass complete. Collected {len(tool_call_results)} tool call results") 694 logger.debug(f"tool_call_results: {tool_call_results}") 695 696 # Second pass: process messages and check for successful tool calls 697 for i, message in enumerate(message_response.messages, 1): 698 # Log concise message info instead of full object 699 msg_type = getattr(message, 'message_type', 'unknown') 700 if hasattr(message, 'reasoning') and message.reasoning: 701 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 702 elif hasattr(message, 'tool_call') and message.tool_call: 703 tool_name = message.tool_call.name 704 logger.debug(f" {i}. {msg_type}: {tool_name}") 705 elif hasattr(message, 'tool_return'): 706 tool_name = getattr(message, 'name', 'unknown_tool') 707 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 708 status = getattr(message, 'status', 'unknown') 709 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 710 elif hasattr(message, 'text'): 711 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 712 else: 713 logger.debug(f" {i}. {msg_type}: <no content>") 714 715 # Check for halt_activity tool call 716 if hasattr(message, 'tool_call') and message.tool_call: 717 if message.tool_call.name == 'halt_activity': 718 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 719 try: 720 args = json.loads(message.tool_call.arguments) 721 reason = args.get('reason', 'Agent requested halt') 722 logger.info(f"Halt reason: {reason}") 723 except: 724 logger.info("Halt reason: <unable to parse>") 725 726 # Delete the queue file before terminating 727 if queue_filepath and queue_filepath.exists(): 728 queue_filepath.unlink() 729 logger.info(f"Deleted queue file: {queue_filepath.name}") 730 731 # Also mark as processed to avoid reprocessing 732 if NOTIFICATION_DB: 733 NOTIFICATION_DB.mark_processed(notification_data.get('uri', ''), status='processed') 734 else: 735 processed_uris = load_processed_notifications() 736 processed_uris.add(notification_data.get('uri', '')) 737 save_processed_notifications(processed_uris) 738 739 # Export agent state before terminating 740 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 741 742 # Exit the program 743 logger.info("=== BOT TERMINATED BY AGENT ===") 744 exit(0) 745 746 # Check for deprecated bluesky_reply tool 747 if hasattr(message, 'tool_call') and message.tool_call: 748 if message.tool_call.name == 'bluesky_reply': 749 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 750 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 751 logger.error("Update the agent's tools using register_tools.py") 752 # Export agent state before terminating 753 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 754 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 755 exit(1) 756 757 # Collect annotate_ack tool calls 758 elif message.tool_call.name == 'annotate_ack': 759 try: 760 args = json.loads(message.tool_call.arguments) 761 note = args.get('note', '') 762 if note: 763 ack_note = note 764 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 765 except json.JSONDecodeError as e: 766 logger.error(f"Failed to parse annotate_ack arguments: {e}") 767 768 # Collect flag_archival_memory_for_deletion tool calls 769 elif message.tool_call.name == 'flag_archival_memory_for_deletion': 770 try: 771 args = json.loads(message.tool_call.arguments) 772 reason = args.get('reason', '') 773 memory_text = args.get('memory_text', '') 774 confirm = args.get('confirm', False) 775 776 # Only flag for deletion if confirmed and has all required fields 777 if confirm and memory_text and reason: 778 flagged_memories.append({ 779 'reason': reason, 780 'memory_text': memory_text 781 }) 782 logger.debug(f"Found memory flagged for deletion (reason: {reason}): {memory_text[:50]}...") 783 elif not confirm: 784 logger.debug(f"Memory deletion not confirmed, skipping: {memory_text[:50]}...") 785 elif not reason: 786 logger.warning(f"Memory deletion missing reason, skipping: {memory_text[:50]}...") 787 except json.JSONDecodeError as e: 788 logger.error(f"Failed to parse flag_archival_memory_for_deletion arguments: {e}") 789 790 # Collect archival_memory_insert tool calls for recording to AT Protocol 791 elif message.tool_call.name == 'archival_memory_insert': 792 try: 793 args = json.loads(message.tool_call.arguments) 794 content = args.get('content', '') 795 tags_str = args.get('tags', None) 796 797 # Parse tags from string representation if present 798 tags = None 799 if tags_str: 800 try: 801 tags = json.loads(tags_str) if isinstance(tags_str, str) else tags_str 802 except json.JSONDecodeError: 803 logger.warning(f"Failed to parse tags from archival_memory_insert: {tags_str[:50]}...") 804 805 if content: 806 # Create stream.thought.memory record 807 try: 808 memory_result = bsky_utils.create_memory_record(atproto_client, content, tags) 809 if memory_result: 810 tags_info = f" ({len(tags)} tags)" if tags else "" 811 logger.info(f"📝 Recorded archival memory to AT Protocol{tags_info}: {content[:100]}...") 812 else: 813 logger.warning(f"Failed to record archival memory to AT Protocol") 814 except Exception as e: 815 logger.error(f"Error creating memory record: {e}") 816 except json.JSONDecodeError as e: 817 logger.error(f"Failed to parse archival_memory_insert arguments: {e}") 818 819 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 820 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 821 tool_call_id = message.tool_call.tool_call_id 822 tool_status = tool_call_results.get(tool_call_id, 'unknown') 823 824 logger.debug(f"Found add_post_to_bluesky_reply_thread tool call: id={tool_call_id}, status={tool_status}") 825 logger.debug(f"Available tool_call_results: {tool_call_results}") 826 827 if tool_status == 'success': 828 try: 829 args = json.loads(message.tool_call.arguments) 830 reply_text = args.get('text', '') 831 reply_lang = args.get('lang', 'en-US') 832 833 if reply_text: # Only add if there's actual content 834 reply_candidates.append((reply_text, reply_lang)) 835 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 836 except json.JSONDecodeError as e: 837 logger.error(f"Failed to parse tool call arguments: {e}") 838 elif tool_status == 'error': 839 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 840 else: 841 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 842 logger.warning(f" Tool call ID '{tool_call_id}' not found in tool_call_results dict") 843 844 # Handle archival memory deletion if any were flagged (only if no halt was received) 845 if flagged_memories: 846 logger.info(f"Processing {len(flagged_memories)} flagged memories for deletion") 847 for flagged_memory in flagged_memories: 848 reason = flagged_memory['reason'] 849 memory_text = flagged_memory['memory_text'] 850 851 try: 852 # Search for passages with this exact text 853 logger.debug(f"Searching for passages matching: {memory_text[:100]}...") 854 passages = CLIENT.agents.passages.list( 855 agent_id=void_agent.id, 856 query=memory_text 857 ) 858 859 if not passages: 860 logger.warning(f"No passages found matching flagged memory: {memory_text[:50]}...") 861 continue 862 863 # Delete all matching passages 864 deleted_count = 0 865 for passage in passages: 866 # Check if the passage text exactly matches (to avoid partial matches) 867 if hasattr(passage, 'text') and passage.text == memory_text: 868 try: 869 CLIENT.agents.passages.delete( 870 agent_id=void_agent.id, 871 passage_id=str(passage.id) 872 ) 873 deleted_count += 1 874 logger.debug(f"Deleted passage {passage.id}") 875 except Exception as delete_error: 876 logger.error(f"Failed to delete passage {passage.id}: {delete_error}") 877 878 if deleted_count > 0: 879 logger.info(f"🗑️ Deleted {deleted_count} archival memory passage(s) (reason: {reason}): {memory_text[:50]}...") 880 else: 881 logger.warning(f"No exact matches found for deletion: {memory_text[:50]}...") 882 883 except Exception as e: 884 logger.error(f"Error processing memory deletion: {e}") 885 886 # Check for conflicting tool calls 887 if reply_candidates and ignored_notification: 888 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 889 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 890 logger.warning("Item will be left in queue for manual review") 891 # Return False to keep in queue 892 return False 893 894 if reply_candidates: 895 # Aggregate reply posts into a thread 896 reply_messages = [] 897 reply_langs = [] 898 for text, lang in reply_candidates: 899 reply_messages.append(text) 900 reply_langs.append(lang) 901 902 # Use the first language for the entire thread (could be enhanced later) 903 reply_lang = reply_langs[0] if reply_langs else 'en-US' 904 905 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 906 907 # Display the generated reply thread 908 if len(reply_messages) == 1: 909 content = reply_messages[0] 910 title = f"Reply to @{author_handle}" 911 else: 912 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) 913 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" 914 915 # Format with Unicode characters 916 print(f"\n{title}") 917 print(f" {'' * len(title)}") 918 # Indent content lines 919 for line in content.split('\n'): 920 print(f" {line}") 921 922 # Send the reply(s) with language (unless in testing mode) 923 if testing_mode: 924 logger.info("TESTING MODE: Skipping actual Bluesky post") 925 response = True # Simulate success 926 else: 927 if len(reply_messages) == 1: 928 # Single reply - use existing function 929 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 930 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 931 response = bsky_utils.reply_to_notification( 932 client=atproto_client, 933 notification=notification_data, 934 reply_text=cleaned_text, 935 lang=reply_lang, 936 correlation_id=correlation_id 937 ) 938 else: 939 # Multiple replies - use new threaded function 940 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 941 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 942 response = bsky_utils.reply_with_thread_to_notification( 943 client=atproto_client, 944 notification=notification_data, 945 reply_messages=cleaned_messages, 946 lang=reply_lang, 947 correlation_id=correlation_id 948 ) 949 950 if response: 951 logger.info(f"[{correlation_id}] Successfully replied to @{author_handle}", extra={ 952 'correlation_id': correlation_id, 953 'author_handle': author_handle, 954 'reply_count': len(reply_messages) 955 }) 956 957 # Acknowledge the post we're replying to with stream.thought.ack 958 try: 959 post_uri = notification_data.get('uri') 960 post_cid = notification_data.get('cid') 961 962 if post_uri and post_cid: 963 ack_result = bsky_utils.acknowledge_post( 964 client=atproto_client, 965 post_uri=post_uri, 966 post_cid=post_cid, 967 note=ack_note 968 ) 969 if ack_result: 970 if ack_note: 971 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")") 972 else: 973 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack") 974 else: 975 logger.warning(f"Failed to acknowledge post from @{author_handle}") 976 else: 977 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}") 978 except Exception as e: 979 logger.error(f"Error acknowledging post from @{author_handle}: {e}") 980 # Don't fail the entire operation if acknowledgment fails 981 982 return True 983 else: 984 logger.error(f"Failed to send reply to @{author_handle}") 985 return False 986 else: 987 # Check if notification was explicitly ignored 988 if ignored_notification: 989 logger.info(f"[{correlation_id}] Notification from @{author_handle} was explicitly ignored (category: {ignore_category})", extra={ 990 'correlation_id': correlation_id, 991 'author_handle': author_handle, 992 'ignore_category': ignore_category 993 }) 994 return "ignored" 995 else: 996 logger.warning(f"[{correlation_id}] No reply generated for mention from @{author_handle}, moving to no_reply folder", extra={ 997 'correlation_id': correlation_id, 998 'author_handle': author_handle 999 }) 1000 return "no_reply" 1001 1002 except Exception as e: 1003 logger.error(f"[{correlation_id}] Error processing mention: {e}", extra={ 1004 'correlation_id': correlation_id, 1005 'error': str(e), 1006 'error_type': type(e).__name__, 1007 'author_handle': author_handle if 'author_handle' in locals() else 'unknown' 1008 }) 1009 return False 1010 finally: 1011 # Detach user blocks after agent response (success or failure) 1012 if 'attached_handles' in locals() and attached_handles: 1013 try: 1014 logger.info(f"Detaching user blocks for handles: {attached_handles}") 1015 detach_result = detach_user_blocks(attached_handles, void_agent) 1016 logger.debug(f"Detach result: {detach_result}") 1017 except Exception as detach_error: 1018 logger.warning(f"Failed to detach user blocks: {detach_error}") 1019 1020 1021def notification_to_dict(notification): 1022 """Convert a notification object to a dictionary for JSON serialization.""" 1023 return { 1024 'uri': notification.uri, 1025 'cid': notification.cid, 1026 'reason': notification.reason, 1027 'is_read': notification.is_read, 1028 'indexed_at': notification.indexed_at, 1029 'author': { 1030 'handle': notification.author.handle, 1031 'display_name': notification.author.display_name, 1032 'did': notification.author.did 1033 }, 1034 'record': { 1035 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 1036 } 1037 } 1038 1039 1040def load_processed_notifications(): 1041 """Load the set of processed notification URIs from database.""" 1042 global NOTIFICATION_DB 1043 if NOTIFICATION_DB: 1044 return NOTIFICATION_DB.get_processed_uris(limit=MAX_PROCESSED_NOTIFICATIONS) 1045 return set() 1046 1047 1048def save_processed_notifications(processed_set): 1049 """Save the set of processed notification URIs to database.""" 1050 # This is now handled by marking individual notifications in the DB 1051 # Keeping function for compatibility but it doesn't need to do anything 1052 pass 1053 1054 1055def save_notification_to_queue(notification, is_priority=None): 1056 """Save a notification to the queue directory with priority-based filename.""" 1057 try: 1058 global NOTIFICATION_DB 1059 1060 # Handle both notification objects and dicts 1061 if isinstance(notification, dict): 1062 notif_dict = notification 1063 notification_uri = notification.get('uri') 1064 else: 1065 notif_dict = notification_to_dict(notification) 1066 notification_uri = notification.uri 1067 1068 # Check if already processed (using database if available) 1069 if NOTIFICATION_DB: 1070 if NOTIFICATION_DB.is_processed(notification_uri): 1071 logger.debug(f"Notification already processed (DB): {notification_uri}") 1072 return False 1073 # Add to database - if this fails, don't queue the notification 1074 if not NOTIFICATION_DB.add_notification(notif_dict): 1075 logger.warning(f"Failed to add notification to database, skipping: {notification_uri}") 1076 return False 1077 else: 1078 # Fall back to old JSON method 1079 processed_uris = load_processed_notifications() 1080 if notification_uri in processed_uris: 1081 logger.debug(f"Notification already processed: {notification_uri}") 1082 return False 1083 1084 # Create JSON string 1085 notif_json = json.dumps(notif_dict, sort_keys=True) 1086 1087 # Generate hash for filename (to avoid duplicates) 1088 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 1089 1090 # Determine priority based on author handle or explicit priority 1091 if is_priority is not None: 1092 priority_prefix = "0_" if is_priority else "1_" 1093 else: 1094 if isinstance(notification, dict): 1095 author_handle = notification.get('author', {}).get('handle', '') 1096 else: 1097 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 1098 # Prioritize cameron.pfiffer.org responses 1099 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 1100 1101 # Create filename with priority, timestamp and hash 1102 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 1103 reason = notif_dict.get('reason', 'unknown') 1104 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" 1105 filepath = QUEUE_DIR / filename 1106 1107 # Check if this notification URI is already in the queue 1108 for existing_file in QUEUE_DIR.glob("*.json"): 1109 if existing_file.name == "processed_notifications.json": 1110 continue 1111 try: 1112 with open(existing_file, 'r') as f: 1113 existing_data = json.load(f) 1114 if existing_data.get('uri') == notification_uri: 1115 logger.debug(f"Notification already queued (URI: {notification_uri})") 1116 return False 1117 except: 1118 continue 1119 1120 # Write to file 1121 with open(filepath, 'w') as f: 1122 json.dump(notif_dict, f, indent=2) 1123 1124 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 1125 logger.info(f"Queued notification ({priority_label}): {filename}") 1126 return True 1127 1128 except Exception as e: 1129 logger.error(f"Error saving notification to queue: {e}") 1130 return False 1131 1132 1133def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 1134 """Load and process all notifications from the queue in priority order.""" 1135 try: 1136 # Get all JSON files in queue directory (excluding processed_notifications.json) 1137 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 1138 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1139 1140 # Filter out and delete like notifications immediately 1141 queue_files = [] 1142 likes_deleted = 0 1143 1144 for filepath in all_queue_files: 1145 try: 1146 with open(filepath, 'r') as f: 1147 notif_data = json.load(f) 1148 1149 # If it's a like, delete it immediately and don't process 1150 if notif_data.get('reason') == 'like': 1151 filepath.unlink() 1152 likes_deleted += 1 1153 logger.debug(f"Deleted like notification: {filepath.name}") 1154 else: 1155 queue_files.append(filepath) 1156 except Exception as e: 1157 logger.warning(f"Error checking notification file {filepath.name}: {e}") 1158 queue_files.append(filepath) # Keep it in case it's valid 1159 1160 if likes_deleted > 0: 1161 logger.info(f"Deleted {likes_deleted} like notifications from queue") 1162 1163 if not queue_files: 1164 return 1165 1166 logger.info(f"Processing {len(queue_files)} queued notifications") 1167 1168 # Log current statistics 1169 elapsed_time = time.time() - start_time 1170 total_messages = sum(message_counters.values()) 1171 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1172 1173 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 1174 1175 for i, filepath in enumerate(queue_files, 1): 1176 # Determine if this is a priority notification 1177 is_priority = filepath.name.startswith("0_") 1178 1179 # Check for new notifications periodically during queue processing 1180 # Also check immediately after processing each priority item 1181 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1) 1182 1183 # If we just processed a priority item, immediately check for new priority notifications 1184 if is_priority and i > 1: 1185 should_check_notifications = True 1186 1187 if should_check_notifications: 1188 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") 1189 try: 1190 # Fetch and queue new notifications without processing them 1191 new_count = fetch_and_queue_new_notifications(atproto_client) 1192 1193 if new_count > 0: 1194 logger.info(f"Added {new_count} new notifications to queue") 1195 # Reload the queue files to include the new items 1196 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1197 queue_files = updated_queue_files 1198 logger.info(f"Queue updated: now {len(queue_files)} total items") 1199 except Exception as e: 1200 logger.error(f"Error checking for new notifications: {e}") 1201 1202 priority_label = " [PRIORITY]" if is_priority else "" 1203 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}") 1204 try: 1205 # Load notification data 1206 with open(filepath, 'r') as f: 1207 notif_data = json.load(f) 1208 1209 # Process based on type using dict data directly 1210 success = False 1211 if notif_data['reason'] == "mention": 1212 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1213 if success: 1214 message_counters['mentions'] += 1 1215 elif notif_data['reason'] == "reply": 1216 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1217 if success: 1218 message_counters['replies'] += 1 1219 elif notif_data['reason'] == "follow": 1220 author_handle = notif_data['author']['handle'] 1221 author_display_name = notif_data['author'].get('display_name', 'no display name') 1222 follow_update = f"@{author_handle} ({author_display_name}) started following you." 1223 follow_message = f"Update: {follow_update}" 1224 logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars") 1225 CLIENT.agents.messages.create( 1226 agent_id = void_agent.id, 1227 messages = [{"role":"user", "content": follow_message}] 1228 ) 1229 success = True # Follow updates are always successful 1230 if success: 1231 message_counters['follows'] += 1 1232 elif notif_data['reason'] == "repost": 1233 # Skip reposts silently 1234 success = True # Skip reposts but mark as successful to remove from queue 1235 if success: 1236 message_counters['reposts_skipped'] += 1 1237 elif notif_data['reason'] == "like": 1238 # Skip likes silently 1239 success = True # Skip likes but mark as successful to remove from queue 1240 if success: 1241 message_counters.setdefault('likes_skipped', 0) 1242 message_counters['likes_skipped'] += 1 1243 else: 1244 logger.warning(f"Unknown notification type: {notif_data['reason']}") 1245 success = True # Remove unknown types from queue 1246 1247 # Handle file based on processing result 1248 if success: 1249 if testing_mode: 1250 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") 1251 else: 1252 filepath.unlink() 1253 logger.info(f"Successfully processed and removed: {filepath.name}") 1254 1255 # Mark as processed to avoid reprocessing 1256 if NOTIFICATION_DB: 1257 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='processed') 1258 else: 1259 processed_uris = load_processed_notifications() 1260 processed_uris.add(notif_data['uri']) 1261 save_processed_notifications(processed_uris) 1262 1263 elif success is None: # Special case for moving to error directory 1264 error_path = QUEUE_ERROR_DIR / filepath.name 1265 filepath.rename(error_path) 1266 logger.warning(f"Moved {filepath.name} to errors directory") 1267 1268 # Also mark as processed to avoid retrying 1269 if NOTIFICATION_DB: 1270 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1271 else: 1272 processed_uris = load_processed_notifications() 1273 processed_uris.add(notif_data['uri']) 1274 save_processed_notifications(processed_uris) 1275 1276 elif success == "no_reply": # Special case for moving to no_reply directory 1277 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 1278 filepath.rename(no_reply_path) 1279 logger.info(f"Moved {filepath.name} to no_reply directory") 1280 1281 # Also mark as processed to avoid retrying 1282 if NOTIFICATION_DB: 1283 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1284 else: 1285 processed_uris = load_processed_notifications() 1286 processed_uris.add(notif_data['uri']) 1287 save_processed_notifications(processed_uris) 1288 1289 elif success == "ignored": # Special case for explicitly ignored notifications 1290 # For ignored notifications, we just delete them (not move to no_reply) 1291 filepath.unlink() 1292 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1293 1294 # Also mark as processed to avoid retrying 1295 if NOTIFICATION_DB: 1296 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1297 else: 1298 processed_uris = load_processed_notifications() 1299 processed_uris.add(notif_data['uri']) 1300 save_processed_notifications(processed_uris) 1301 1302 else: 1303 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 1304 1305 except Exception as e: 1306 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 1307 # Keep the file for retry later 1308 1309 except Exception as e: 1310 logger.error(f"Error loading queued notifications: {e}") 1311 1312 1313def fetch_and_queue_new_notifications(atproto_client): 1314 """Fetch new notifications and queue them without processing.""" 1315 try: 1316 global NOTIFICATION_DB 1317 1318 # Get current time for marking notifications as seen 1319 logger.debug("Getting current time for notification marking...") 1320 last_seen_at = atproto_client.get_current_time_iso() 1321 1322 # Get timestamp of last processed notification for filtering 1323 last_processed_time = None 1324 if NOTIFICATION_DB: 1325 last_processed_time = NOTIFICATION_DB.get_latest_processed_time() 1326 if last_processed_time: 1327 logger.debug(f"Last processed notification was at: {last_processed_time}") 1328 1329 # Fetch ALL notifications using pagination 1330 all_notifications = [] 1331 cursor = None 1332 page_count = 0 1333 max_pages = 20 # Safety limit to prevent infinite loops 1334 1335 while page_count < max_pages: 1336 try: 1337 # Fetch notifications page 1338 if cursor: 1339 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1340 params={'cursor': cursor, 'limit': 100} 1341 ) 1342 else: 1343 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1344 params={'limit': 100} 1345 ) 1346 1347 page_count += 1 1348 page_notifications = notifications_response.notifications 1349 1350 if not page_notifications: 1351 break 1352 1353 all_notifications.extend(page_notifications) 1354 1355 # Check if there are more pages 1356 cursor = getattr(notifications_response, 'cursor', None) 1357 if not cursor: 1358 break 1359 1360 except Exception as e: 1361 logger.error(f"Error fetching notifications page {page_count}: {e}") 1362 break 1363 1364 # Now process all fetched notifications 1365 new_count = 0 1366 if all_notifications: 1367 logger.info(f"📥 Fetched {len(all_notifications)} total notifications from API") 1368 1369 # Mark as seen first 1370 try: 1371 atproto_client.app.bsky.notification.update_seen( 1372 data={'seenAt': last_seen_at} 1373 ) 1374 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") 1375 except Exception as e: 1376 logger.error(f"Error marking notifications as seen: {e}") 1377 1378 # Debug counters 1379 skipped_read = 0 1380 skipped_likes = 0 1381 skipped_processed = 0 1382 skipped_old_timestamp = 0 1383 processed_uris = load_processed_notifications() 1384 1385 # Queue all new notifications (except likes) 1386 for notif in all_notifications: 1387 # Skip if older than last processed (when we have timestamp filtering) 1388 if last_processed_time and hasattr(notif, 'indexed_at'): 1389 if notif.indexed_at <= last_processed_time: 1390 skipped_old_timestamp += 1 1391 logger.debug(f"Skipping old notification (indexed_at {notif.indexed_at} <= {last_processed_time})") 1392 continue 1393 1394 # Debug: Log is_read status but DON'T skip based on it 1395 if hasattr(notif, 'is_read') and notif.is_read: 1396 skipped_read += 1 1397 logger.debug(f"Notification has is_read=True (but processing anyway): {notif.uri if hasattr(notif, 'uri') else 'unknown'}") 1398 1399 # Skip likes 1400 if hasattr(notif, 'reason') and notif.reason == 'like': 1401 skipped_likes += 1 1402 continue 1403 1404 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif 1405 1406 # Skip likes in dict form too 1407 if notif_dict.get('reason') == 'like': 1408 continue 1409 1410 # Check if already processed 1411 notif_uri = notif_dict.get('uri', '') 1412 if notif_uri in processed_uris: 1413 skipped_processed += 1 1414 logger.debug(f"Skipping already processed: {notif_uri}") 1415 continue 1416 1417 # Check if it's a priority notification 1418 is_priority = False 1419 1420 # Priority for cameron.pfiffer.org notifications 1421 author_handle = notif_dict.get('author', {}).get('handle', '') 1422 if author_handle == "cameron.pfiffer.org": 1423 is_priority = True 1424 1425 # Also check for priority keywords in mentions 1426 if notif_dict.get('reason') == 'mention': 1427 # Get the mention text to check for priority keywords 1428 record = notif_dict.get('record', {}) 1429 text = record.get('text', '') 1430 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): 1431 is_priority = True 1432 1433 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1434 new_count += 1 1435 logger.debug(f"Queued notification from @{author_handle}: {notif_dict.get('reason', 'unknown')}") 1436 1437 # Log summary of filtering 1438 logger.info(f"📊 Notification processing summary:") 1439 logger.info(f" • Total fetched: {len(all_notifications)}") 1440 logger.info(f" • Had is_read=True: {skipped_read} (not skipped)") 1441 logger.info(f" • Skipped (likes): {skipped_likes}") 1442 logger.info(f" • Skipped (old timestamp): {skipped_old_timestamp}") 1443 logger.info(f" • Skipped (already processed): {skipped_processed}") 1444 logger.info(f" • Queued for processing: {new_count}") 1445 else: 1446 logger.debug("No new notifications to queue") 1447 1448 return new_count 1449 1450 except Exception as e: 1451 logger.error(f"Error fetching and queueing notifications: {e}") 1452 return 0 1453 1454 1455def process_notifications(void_agent, atproto_client, testing_mode=False): 1456 """Fetch new notifications, queue them, and process the queue.""" 1457 try: 1458 # Fetch and queue new notifications 1459 new_count = fetch_and_queue_new_notifications(atproto_client) 1460 1461 if new_count > 0: 1462 logger.info(f"Found {new_count} new notifications to process") 1463 1464 # Now process the entire queue (old + new notifications) 1465 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 1466 1467 except Exception as e: 1468 logger.error(f"Error processing notifications: {e}") 1469 1470 1471def send_synthesis_message(client: Letta, agent_id: str, agent_name: str = "void", atproto_client=None) -> None: 1472 """ 1473 Send a synthesis message to the agent every 10 minutes. 1474 This prompts the agent to synthesize its recent experiences. 1475 1476 Args: 1477 client: Letta client 1478 agent_id: Agent ID to send synthesis to 1479 agent_name: Agent name for temporal block labels 1480 atproto_client: Optional AT Protocol client for posting synthesis results 1481 """ 1482 # Track attached temporal blocks for cleanup 1483 attached_temporal_labels = [] 1484 1485 try: 1486 logger.info("🧠 Preparing synthesis with temporal journal blocks") 1487 1488 # Attach temporal blocks before synthesis 1489 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id, agent_name) 1490 if not success: 1491 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway") 1492 1493 # Create synthesis prompt with agent-specific block names 1494 today = date.today() 1495 synthesis_prompt = f"""Time for synthesis. 1496 1497This is your periodic opportunity to reflect on recent experiences and update your memory. 1498 1499The following temporal journal blocks are temporarily available for this session: 1500- {agent_name}_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')}) 1501- {agent_name}_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')}) 1502- {agent_name}_year_{today.year}: This year's journal ({today.year}) 1503 1504You may use these blocks as you see fit. Synthesize your recent experiences into your memory as appropriate.""" 1505 1506 logger.info("🧠 Sending enhanced synthesis prompt to agent") 1507 1508 # Send synthesis message with streaming to show tool use 1509 message_stream = client.agents.messages.create_stream( 1510 agent_id=agent_id, 1511 messages=[{"role": "user", "content": synthesis_prompt}], 1512 stream_tokens=False, 1513 max_steps=100 1514 ) 1515 1516 # Track synthesis content for potential posting 1517 synthesis_posts = [] 1518 ack_note = None 1519 1520 # Process the streaming response 1521 for chunk in message_stream: 1522 if hasattr(chunk, 'message_type'): 1523 if chunk.message_type == 'reasoning_message': 1524 if SHOW_REASONING: 1525 print("\n◆ Reasoning") 1526 print(" ─────────") 1527 for line in chunk.reasoning.split('\n'): 1528 print(f" {line}") 1529 1530 # Create ATProto record for reasoning (if we have atproto client) 1531 if atproto_client and hasattr(chunk, 'reasoning'): 1532 try: 1533 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 1534 except Exception as e: 1535 logger.debug(f"Failed to create reasoning record during synthesis: {e}") 1536 elif chunk.message_type == 'tool_call_message': 1537 tool_name = chunk.tool_call.name 1538 1539 # Create ATProto record for tool call (if we have atproto client) 1540 if atproto_client: 1541 try: 1542 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 1543 bsky_utils.create_tool_call_record( 1544 atproto_client, 1545 tool_name, 1546 chunk.tool_call.arguments, 1547 tool_call_id 1548 ) 1549 except Exception as e: 1550 logger.debug(f"Failed to create tool call record during synthesis: {e}") 1551 try: 1552 args = json.loads(chunk.tool_call.arguments) 1553 if tool_name == 'archival_memory_search': 1554 query = args.get('query', 'unknown') 1555 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 1556 elif tool_name == 'archival_memory_insert': 1557 content = args.get('content', '') 1558 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue") 1559 1560 # Record archival memory insert to AT Protocol 1561 if atproto_client: 1562 try: 1563 tags_str = args.get('tags', None) 1564 tags = None 1565 if tags_str: 1566 try: 1567 tags = json.loads(tags_str) if isinstance(tags_str, str) else tags_str 1568 except json.JSONDecodeError: 1569 logger.warning(f"Failed to parse tags from archival_memory_insert: {tags_str[:50]}...") 1570 1571 memory_result = bsky_utils.create_memory_record(atproto_client, content, tags) 1572 if memory_result: 1573 tags_info = f" ({len(tags)} tags)" if tags else "" 1574 logger.info(f"📝 Recorded archival memory to AT Protocol{tags_info}") 1575 except Exception as e: 1576 logger.debug(f"Failed to create memory record during synthesis: {e}") 1577 elif tool_name == 'update_block': 1578 label = args.get('label', 'unknown') 1579 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', '')) 1580 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 1581 elif tool_name == 'annotate_ack': 1582 note = args.get('note', '') 1583 if note: 1584 ack_note = note 1585 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue") 1586 elif tool_name == 'add_post_to_bluesky_reply_thread': 1587 text = args.get('text', '') 1588 synthesis_posts.append(text) 1589 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue") 1590 else: 1591 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 1592 if len(args_str) > 150: 1593 args_str = args_str[:150] + "..." 1594 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 1595 except: 1596 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 1597 elif chunk.message_type == 'tool_return_message': 1598 if chunk.status == 'success': 1599 log_with_panel("Success", f"Tool result: {chunk.name}", "green") 1600 else: 1601 log_with_panel("Error", f"Tool result: {chunk.name}", "red") 1602 elif chunk.message_type == 'assistant_message': 1603 print("\n▶ Synthesis Response") 1604 print(" ──────────────────") 1605 for line in chunk.content.split('\n'): 1606 print(f" {line}") 1607 1608 if str(chunk) == 'done': 1609 break 1610 1611 logger.info("🧠 Synthesis message processed successfully") 1612 1613 # Handle synthesis acknowledgments if we have an atproto client 1614 if atproto_client and ack_note: 1615 try: 1616 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note) 1617 if result: 1618 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...") 1619 else: 1620 logger.warning("Failed to create synthesis acknowledgment") 1621 except Exception as e: 1622 logger.error(f"Error creating synthesis acknowledgment: {e}") 1623 1624 # Handle synthesis posts if any were generated 1625 if atproto_client and synthesis_posts: 1626 try: 1627 for post_text in synthesis_posts: 1628 cleaned_text = bsky_utils.remove_outside_quotes(post_text) 1629 response = bsky_utils.send_post(atproto_client, cleaned_text) 1630 if response: 1631 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...") 1632 else: 1633 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...") 1634 except Exception as e: 1635 logger.error(f"Error posting synthesis content: {e}") 1636 1637 except Exception as e: 1638 logger.error(f"Error sending synthesis message: {e}") 1639 finally: 1640 # Always detach temporal blocks after synthesis 1641 if attached_temporal_labels: 1642 logger.info("🧠 Detaching temporal journal blocks after synthesis") 1643 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels, agent_name) 1644 if not detach_success: 1645 logger.warning("Some temporal blocks may not have been detached properly") 1646 1647 1648def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None: 1649 """ 1650 Detach all user blocks from the agent to prevent memory bloat. 1651 This should be called periodically to ensure clean state. 1652 """ 1653 try: 1654 # Get all blocks attached to the agent 1655 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 1656 1657 user_blocks_to_detach = [] 1658 for block in attached_blocks: 1659 if hasattr(block, 'label') and block.label.startswith('user_'): 1660 user_blocks_to_detach.append({ 1661 'label': block.label, 1662 'id': block.id 1663 }) 1664 1665 if not user_blocks_to_detach: 1666 logger.debug("No user blocks found to detach during periodic cleanup") 1667 return 1668 1669 # Detach each user block 1670 detached_count = 0 1671 for block_info in user_blocks_to_detach: 1672 try: 1673 client.agents.blocks.detach( 1674 agent_id=agent_id, 1675 block_id=str(block_info['id']) 1676 ) 1677 detached_count += 1 1678 logger.debug(f"Detached user block: {block_info['label']}") 1679 except Exception as e: 1680 logger.warning(f"Failed to detach block {block_info['label']}: {e}") 1681 1682 if detached_count > 0: 1683 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks") 1684 1685 except Exception as e: 1686 logger.error(f"Error during periodic user block cleanup: {e}") 1687 1688 1689def attach_temporal_blocks(client: Letta, agent_id: str, agent_name: str = "void") -> tuple: 1690 """ 1691 Attach temporal journal blocks (day, month, year) to the agent for synthesis. 1692 Creates blocks if they don't exist. 1693 1694 Args: 1695 client: Letta client 1696 agent_id: Agent ID 1697 agent_name: Agent name for prefixing block labels (prevents collision across agents) 1698 1699 Returns: 1700 Tuple of (success: bool, attached_labels: list) 1701 """ 1702 try: 1703 today = date.today() 1704 1705 # Generate temporal block labels with agent-specific prefix 1706 day_label = f"{agent_name}_day_{today.strftime('%Y_%m_%d')}" 1707 month_label = f"{agent_name}_month_{today.strftime('%Y_%m')}" 1708 year_label = f"{agent_name}_year_{today.year}" 1709 1710 temporal_labels = [day_label, month_label, year_label] 1711 attached_labels = [] 1712 1713 # Get current blocks attached to agent 1714 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1715 current_block_labels = {block.label for block in current_blocks} 1716 current_block_ids = {str(block.id) for block in current_blocks} 1717 1718 for label in temporal_labels: 1719 try: 1720 # Skip if already attached 1721 if label in current_block_labels: 1722 logger.debug(f"Temporal block already attached: {label}") 1723 attached_labels.append(label) 1724 continue 1725 1726 # Check if block exists globally 1727 blocks = client.blocks.list(label=label) 1728 1729 if blocks and len(blocks) > 0: 1730 block = blocks[0] 1731 # Check if already attached by ID 1732 if str(block.id) in current_block_ids: 1733 logger.debug(f"Temporal block already attached by ID: {label}") 1734 attached_labels.append(label) 1735 continue 1736 else: 1737 # Create new temporal block with appropriate header 1738 if "day" in label: 1739 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}" 1740 initial_content = f"{header}\n\nNo entries yet for today." 1741 elif "month" in label: 1742 header = f"# Monthly Journal - {today.strftime('%B %Y')}" 1743 initial_content = f"{header}\n\nNo entries yet for this month." 1744 else: # year 1745 header = f"# Yearly Journal - {today.year}" 1746 initial_content = f"{header}\n\nNo entries yet for this year." 1747 1748 block = client.blocks.create( 1749 label=label, 1750 value=initial_content, 1751 limit=10000 # Larger limit for journal blocks 1752 ) 1753 logger.info(f"Created new temporal block: {label}") 1754 1755 # Attach the block 1756 client.agents.blocks.attach( 1757 agent_id=agent_id, 1758 block_id=str(block.id) 1759 ) 1760 attached_labels.append(label) 1761 logger.info(f"Attached temporal block: {label}") 1762 1763 except Exception as e: 1764 # Check for duplicate constraint errors 1765 error_str = str(e) 1766 if "duplicate key value violates unique constraint" in error_str: 1767 logger.debug(f"Temporal block already attached (constraint): {label}") 1768 attached_labels.append(label) 1769 else: 1770 logger.warning(f"Failed to attach temporal block {label}: {e}") 1771 1772 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}") 1773 return True, attached_labels 1774 1775 except Exception as e: 1776 logger.error(f"Error attaching temporal blocks: {e}") 1777 return False, [] 1778 1779 1780def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: list = None, agent_name: str = "void") -> bool: 1781 """ 1782 Detach temporal journal blocks from the agent after synthesis. 1783 1784 Args: 1785 client: Letta client 1786 agent_id: Agent ID 1787 labels_to_detach: Optional list of specific labels to detach. 1788 If None, detaches all temporal blocks for this agent. 1789 agent_name: Agent name for prefixing block labels (prevents collision across agents) 1790 1791 Returns: 1792 bool: Success status 1793 """ 1794 try: 1795 # If no specific labels provided, generate today's labels 1796 if labels_to_detach is None: 1797 today = date.today() 1798 labels_to_detach = [ 1799 f"{agent_name}_day_{today.strftime('%Y_%m_%d')}", 1800 f"{agent_name}_month_{today.strftime('%Y_%m')}", 1801 f"{agent_name}_year_{today.year}" 1802 ] 1803 1804 # Get current blocks and build label to ID mapping 1805 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1806 block_label_to_id = {block.label: str(block.id) for block in current_blocks} 1807 1808 detached_count = 0 1809 for label in labels_to_detach: 1810 if label in block_label_to_id: 1811 try: 1812 client.agents.blocks.detach( 1813 agent_id=agent_id, 1814 block_id=block_label_to_id[label] 1815 ) 1816 detached_count += 1 1817 logger.debug(f"Detached temporal block: {label}") 1818 except Exception as e: 1819 logger.warning(f"Failed to detach temporal block {label}: {e}") 1820 else: 1821 logger.debug(f"Temporal block not attached: {label}") 1822 1823 logger.info(f"Detached {detached_count} temporal blocks") 1824 return True 1825 1826 except Exception as e: 1827 logger.error(f"Error detaching temporal blocks: {e}") 1828 return False 1829 1830 1831def main(): 1832 # Parse command line arguments 1833 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 1834 parser.add_argument('--config', type=str, default='config.yaml', help='Path to config file (default: config.yaml)') 1835 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1836 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1837 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') 1838 # --rich option removed as we now use simple text formatting 1839 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') 1840 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 1841 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') 1842 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') 1843 parser.add_argument('--debug', action='store_true', help='Enable debug logging') 1844 args = parser.parse_args() 1845 1846 # Initialize configuration with custom path 1847 global letta_config, CLIENT, QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR, PROCESSED_NOTIFICATIONS_FILE, NOTIFICATION_DB 1848 get_config(args.config) # Initialize the global config instance 1849 letta_config = get_letta_config() 1850 1851 # Initialize queue paths from config 1852 queue_config = get_queue_config() 1853 QUEUE_DIR = Path(queue_config['base_dir']) 1854 QUEUE_ERROR_DIR = Path(queue_config['error_dir']) 1855 QUEUE_NO_REPLY_DIR = Path(queue_config['no_reply_dir']) 1856 PROCESSED_NOTIFICATIONS_FILE = Path(queue_config['processed_file']) 1857 1858 # Create queue directories 1859 QUEUE_DIR.mkdir(exist_ok=True) 1860 QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 1861 QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 1862 1863 # Create Letta client with configuration 1864 CLIENT_PARAMS = { 1865 'token': letta_config['api_key'], 1866 'timeout': letta_config['timeout'] 1867 } 1868 if letta_config.get('base_url'): 1869 CLIENT_PARAMS['base_url'] = letta_config['base_url'] 1870 CLIENT = Letta(**CLIENT_PARAMS) 1871 1872 # Configure logging based on command line arguments 1873 if args.simple_logs: 1874 log_format = "void - %(levelname)s - %(message)s" 1875 else: 1876 # Create custom formatter with symbols 1877 class SymbolFormatter(logging.Formatter): 1878 """Custom formatter that adds symbols for different log levels""" 1879 1880 SYMBOLS = { 1881 logging.DEBUG: '', 1882 logging.INFO: '', 1883 logging.WARNING: '', 1884 logging.ERROR: '', 1885 logging.CRITICAL: '' 1886 } 1887 1888 def format(self, record): 1889 # Get the symbol for this log level 1890 symbol = self.SYMBOLS.get(record.levelno, '') 1891 1892 # Format time as HH:MM:SS 1893 timestamp = self.formatTime(record, "%H:%M:%S") 1894 1895 # Build the formatted message 1896 level_name = f"{record.levelname:<5}" # Left-align, 5 chars 1897 1898 # Use vertical bar as separator 1899 parts = [symbol, timestamp, '', level_name, '', record.getMessage()] 1900 1901 return ' '.join(parts) 1902 1903 # Reset logging configuration 1904 for handler in logging.root.handlers[:]: 1905 logging.root.removeHandler(handler) 1906 1907 # Create handler with custom formatter 1908 handler = logging.StreamHandler() 1909 if not args.simple_logs: 1910 handler.setFormatter(SymbolFormatter()) 1911 else: 1912 handler.setFormatter(logging.Formatter(log_format)) 1913 1914 # Configure root logger 1915 logging.root.setLevel(logging.INFO) 1916 logging.root.addHandler(handler) 1917 1918 global logger, prompt_logger, console 1919 logger = logging.getLogger("void_bot") 1920 logger.setLevel(logging.DEBUG if args.debug else logging.INFO) 1921 1922 # Create a separate logger for prompts (set to WARNING to hide by default) 1923 prompt_logger = logging.getLogger("void_bot.prompts") 1924 if args.reasoning: 1925 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1926 else: 1927 prompt_logger.setLevel(logging.WARNING) # Hide by default 1928 1929 # Disable httpx logging completely 1930 logging.getLogger("httpx").setLevel(logging.CRITICAL) 1931 1932 # Create Rich console for pretty printing 1933 # Console no longer used - simple text formatting 1934 1935 global TESTING_MODE, SKIP_GIT, SHOW_REASONING 1936 TESTING_MODE = args.test 1937 1938 # Store no-git flag globally for use in export_agent_state calls 1939 SKIP_GIT = args.no_git 1940 1941 # Store rich flag globally 1942 # Rich formatting no longer used 1943 1944 # Store reasoning flag globally 1945 SHOW_REASONING = args.reasoning 1946 1947 if TESTING_MODE: 1948 logger.info("=== RUNNING IN TESTING MODE ===") 1949 logger.info(" - No messages will be sent to Bluesky") 1950 logger.info(" - Queue files will not be deleted") 1951 logger.info(" - Notifications will not be marked as seen") 1952 print("\n") 1953 1954 # Check for synthesis-only mode 1955 SYNTHESIS_ONLY = args.synthesis_only 1956 if SYNTHESIS_ONLY: 1957 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===") 1958 logger.info(" - Only synthesis messages will be sent") 1959 logger.info(" - No notification processing") 1960 logger.info(" - No Bluesky client needed") 1961 print("\n") 1962 """Main bot loop that continuously monitors for notifications.""" 1963 global start_time 1964 start_time = time.time() 1965 logger.info("=== STARTING VOID BOT ===") 1966 void_agent = initialize_void() 1967 logger.info(f"Void agent initialized: {void_agent.id}") 1968 1969 # Initialize notification database with config-based path 1970 logger.info("Initializing notification database...") 1971 NOTIFICATION_DB = NotificationDB(db_path=queue_config['db_path']) 1972 1973 # Migrate from old JSON format if it exists 1974 if PROCESSED_NOTIFICATIONS_FILE.exists(): 1975 logger.info("Found old processed_notifications.json, migrating to database...") 1976 NOTIFICATION_DB.migrate_from_json(str(PROCESSED_NOTIFICATIONS_FILE)) 1977 1978 # Log database stats 1979 db_stats = NOTIFICATION_DB.get_stats() 1980 logger.info(f"Database initialized - Total notifications: {db_stats.get('total', 0)}, Recent (24h): {db_stats.get('recent_24h', 0)}") 1981 1982 # Clean up old records 1983 NOTIFICATION_DB.cleanup_old_records(days=7) 1984 1985 # Ensure correct tools are attached for Bluesky 1986 logger.info("Configuring tools for Bluesky platform...") 1987 try: 1988 from tool_manager import ensure_platform_tools 1989 ensure_platform_tools('bluesky', void_agent.id) 1990 except Exception as e: 1991 logger.error(f"Failed to configure platform tools: {e}") 1992 logger.warning("Continuing with existing tool configuration") 1993 1994 # Check if agent has required tools 1995 if hasattr(void_agent, 'tools') and void_agent.tools: 1996 tool_names = [tool.name for tool in void_agent.tools] 1997 # Check for bluesky-related tools 1998 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1999 if not bluesky_tools: 2000 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 2001 else: 2002 logger.warning("Agent has no tools registered!") 2003 2004 # Clean up all user blocks at startup 2005 logger.info("🧹 Cleaning up user blocks at startup...") 2006 periodic_user_block_cleanup(CLIENT, void_agent.id) 2007 2008 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts) 2009 if not SYNTHESIS_ONLY: 2010 atproto_client = bsky_utils.default_login() 2011 logger.info("Connected to Bluesky") 2012 else: 2013 # In synthesis-only mode, still connect for acks and posts (unless in test mode) 2014 if not args.test: 2015 atproto_client = bsky_utils.default_login() 2016 logger.info("Connected to Bluesky (for synthesis acks/posts)") 2017 else: 2018 atproto_client = None 2019 logger.info("Skipping Bluesky connection (test mode)") 2020 2021 # Configure intervals 2022 CLEANUP_INTERVAL = args.cleanup_interval 2023 SYNTHESIS_INTERVAL = args.synthesis_interval 2024 2025 # Synthesis-only mode 2026 if SYNTHESIS_ONLY: 2027 if SYNTHESIS_INTERVAL <= 0: 2028 logger.error("Synthesis-only mode requires --synthesis-interval > 0") 2029 return 2030 2031 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 2032 2033 while True: 2034 try: 2035 # Send synthesis message immediately on first run 2036 logger.info("🧠 Sending synthesis message") 2037 send_synthesis_message(CLIENT, void_agent.id, void_agent.name, atproto_client) 2038 2039 # Wait for next interval 2040 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...") 2041 sleep(SYNTHESIS_INTERVAL) 2042 2043 except KeyboardInterrupt: 2044 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===") 2045 break 2046 except Exception as e: 2047 logger.error(f"Error in synthesis loop: {e}") 2048 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...") 2049 sleep(SYNTHESIS_INTERVAL) 2050 2051 # Normal mode with notification processing 2052 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 2053 2054 cycle_count = 0 2055 2056 if CLEANUP_INTERVAL > 0: 2057 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles") 2058 else: 2059 logger.info("User block cleanup disabled") 2060 2061 if SYNTHESIS_INTERVAL > 0: 2062 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 2063 else: 2064 logger.info("Synthesis messages disabled") 2065 2066 while True: 2067 try: 2068 cycle_count += 1 2069 process_notifications(void_agent, atproto_client, TESTING_MODE) 2070 2071 # Check if synthesis interval has passed 2072 if SYNTHESIS_INTERVAL > 0: 2073 current_time = time.time() 2074 global last_synthesis_time 2075 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL: 2076 logger.info(f"{SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis") 2077 send_synthesis_message(CLIENT, void_agent.id, void_agent.name, atproto_client) 2078 last_synthesis_time = current_time 2079 2080 # Run periodic cleanup every N cycles 2081 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 2082 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 2083 periodic_user_block_cleanup(CLIENT, void_agent.id) 2084 2085 # Check if autofollow is enabled and sync followers 2086 from config_loader import get_bluesky_config 2087 bluesky_config = get_bluesky_config() 2088 if bluesky_config.get('autofollow', False): 2089 logger.info("🔄 Syncing followers (autofollow enabled)") 2090 try: 2091 sync_result = bsky_utils.sync_followers(atproto_client, dry_run=False) 2092 if 'error' in sync_result: 2093 logger.error(f"Autofollow failed: {sync_result['error']}") 2094 else: 2095 if sync_result['newly_followed']: 2096 logger.info(f"✓ Followed {len(sync_result['newly_followed'])} new users: {', '.join(sync_result['newly_followed'])}") 2097 else: 2098 logger.debug(f"No new followers to follow back ({sync_result['followers_count']} followers, {sync_result['following_count']} following)") 2099 if sync_result.get('errors'): 2100 logger.warning(f"Some follows failed: {len(sync_result['errors'])} errors") 2101 except Exception as e: 2102 logger.error(f"Error during autofollow sync: {e}") 2103 2104 # Also check database health when doing cleanup 2105 if NOTIFICATION_DB: 2106 db_stats = NOTIFICATION_DB.get_stats() 2107 pending = db_stats.get('status_pending', 0) 2108 errors = db_stats.get('status_error', 0) 2109 2110 if pending > 50: 2111 logger.warning(f"⚠️ Queue health check: {pending} pending notifications (may be stuck)") 2112 if errors > 20: 2113 logger.warning(f"⚠️ Queue health check: {errors} error notifications") 2114 2115 # Periodic cleanup of old records 2116 if cycle_count % (CLEANUP_INTERVAL * 10) == 0: # Every 100 cycles 2117 logger.info("Running database cleanup of old records...") 2118 NOTIFICATION_DB.cleanup_old_records(days=7) 2119 2120 # Log cycle completion with stats 2121 elapsed_time = time.time() - start_time 2122 total_messages = sum(message_counters.values()) 2123 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 2124 2125 if total_messages > 0: 2126 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 2127 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 2128 2129 except KeyboardInterrupt: 2130 # Final stats 2131 elapsed_time = time.time() - start_time 2132 total_messages = sum(message_counters.values()) 2133 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 2134 2135 logger.info("=== BOT STOPPED BY USER ===") 2136 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 2137 logger.info(f" - {message_counters['mentions']} mentions") 2138 logger.info(f" - {message_counters['replies']} replies") 2139 logger.info(f" - {message_counters['follows']} follows") 2140 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 2141 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 2142 2143 # Close database connection 2144 if NOTIFICATION_DB: 2145 logger.info("Closing database connection...") 2146 NOTIFICATION_DB.close() 2147 2148 break 2149 except Exception as e: 2150 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 2151 logger.error(f"Error details: {e}") 2152 # Wait a bit longer on errors 2153 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 2154 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 2155 2156 2157if __name__ == "__main__": 2158 main()