a digital person for bluesky
at main 108 kB view raw
1# Rich imports removed - using simple text formatting 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string, count_thread_posts 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14import argparse 15 16from utils import ( 17 upsert_block, 18 upsert_agent 19) 20from config_loader import get_letta_config, get_config, get_queue_config 21 22import bsky_utils 23from datetime import date 24from notification_db import NotificationDB 25 26def extract_handles_from_data(data): 27 """Recursively extract all unique handles from nested data structure.""" 28 handles = set() 29 30 def _extract_recursive(obj): 31 if isinstance(obj, dict): 32 # Check if this dict has a 'handle' key 33 if 'handle' in obj: 34 handles.add(obj['handle']) 35 # Recursively check all values 36 for value in obj.values(): 37 _extract_recursive(value) 38 elif isinstance(obj, list): 39 # Recursively check all list items 40 for item in obj: 41 _extract_recursive(item) 42 43 _extract_recursive(data) 44 return list(handles) 45 46# Logging will be configured after argument parsing 47logger = None 48prompt_logger = None 49# Simple text formatting (Rich no longer used) 50SHOW_REASONING = False 51last_archival_query = "archival memory search" 52 53def log_with_panel(message, title=None, border_color="white"): 54 """Log a message with Unicode box-drawing characters""" 55 if title: 56 # Map old color names to appropriate symbols 57 symbol_map = { 58 "blue": "", # Tool calls 59 "green": "", # Success/completion 60 "yellow": "", # Reasoning 61 "red": "", # Errors 62 "white": "", # Default/mentions 63 "cyan": "", # Posts 64 } 65 symbol = symbol_map.get(border_color, "") 66 67 print(f"\n{symbol} {title}") 68 print(f" {'' * len(title)}") 69 # Indent message lines 70 for line in message.split('\n'): 71 print(f" {line}") 72 else: 73 print(message) 74 75 76# Load Letta configuration from config.yaml (will be initialized later with custom path if provided) 77letta_config = None 78CLIENT = None 79 80# Notification check delay 81FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response 82 83# Check for new notifications every N queue items 84CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing 85 86# Queue paths (will be initialized from config in main()) 87QUEUE_DIR = None 88QUEUE_ERROR_DIR = None 89QUEUE_NO_REPLY_DIR = None 90PROCESSED_NOTIFICATIONS_FILE = None 91 92# Maximum number of processed notifications to track 93MAX_PROCESSED_NOTIFICATIONS = 10000 94 95# Message tracking counters 96message_counters = defaultdict(int) 97start_time = time.time() 98 99# Testing mode flag 100TESTING_MODE = False 101 102# Skip git operations flag 103SKIP_GIT = False 104 105# Synthesis message tracking 106last_synthesis_time = time.time() 107 108# Database for notification tracking 109NOTIFICATION_DB = None 110 111def export_agent_state(client, agent, skip_git=False): 112 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 113 try: 114 # Confirm export with user unless git is being skipped 115 if not skip_git: 116 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 117 if response not in ['y', 'yes']: 118 logger.info("Agent export cancelled by user.") 119 return 120 else: 121 logger.info("Exporting agent state (git staging disabled)") 122 123 # Create directories if they don't exist 124 os.makedirs("agent_archive", exist_ok=True) 125 os.makedirs("agents", exist_ok=True) 126 127 # Export agent data 128 logger.info(f"Exporting agent {agent.id}. This takes some time...") 129 agent_data = client.agents.export_file(agent_id=agent.id) 130 131 # Save timestamped archive copy 132 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 133 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 134 with open(archive_file, 'w', encoding='utf-8') as f: 135 json.dump(agent_data, f, indent=2, ensure_ascii=False) 136 137 # Save current agent state 138 current_file = os.path.join("agents", "void.af") 139 with open(current_file, 'w', encoding='utf-8') as f: 140 json.dump(agent_data, f, indent=2, ensure_ascii=False) 141 142 logger.info(f"Agent exported to {archive_file} and {current_file}") 143 144 # Git add only the current agent file (archive is ignored) unless skip_git is True 145 if not skip_git: 146 try: 147 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 148 logger.info("Added current agent file to git staging") 149 except subprocess.CalledProcessError as e: 150 logger.warning(f"Failed to git add agent file: {e}") 151 152 except Exception as e: 153 logger.error(f"Failed to export agent: {e}") 154 155def initialize_void(): 156 logger.info("Starting void agent initialization...") 157 158 # Get the configured void agent by ID 159 logger.info("Loading void agent from config...") 160 agent_id = letta_config['agent_id'] 161 162 try: 163 void_agent = CLIENT.agents.retrieve(agent_id=agent_id) 164 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})") 165 except Exception as e: 166 logger.error(f"Failed to load void agent {agent_id}: {e}") 167 logger.error("Please ensure the agent_id in config.yaml is correct") 168 raise e 169 170 # Export agent state 171 logger.info("Exporting agent state...") 172 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 173 174 # Log agent details 175 logger.info(f"Void agent details - ID: {void_agent.id}") 176 logger.info(f"Agent name: {void_agent.name}") 177 if hasattr(void_agent, 'llm_config'): 178 logger.info(f"Agent model: {void_agent.llm_config.model}") 179 if hasattr(void_agent, 'project_id') and void_agent.project_id: 180 logger.info(f"Agent project_id: {void_agent.project_id}") 181 if hasattr(void_agent, 'tools'): 182 logger.info(f"Agent has {len(void_agent.tools)} tools") 183 for tool in void_agent.tools[:3]: # Show first 3 tools 184 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 185 186 return void_agent 187 188 189def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 190 """Process a mention and generate a reply using the Letta agent. 191 192 Args: 193 void_agent: The Letta agent instance 194 atproto_client: The AT Protocol client 195 notification_data: The notification data dictionary 196 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 197 198 Returns: 199 True: Successfully processed, remove from queue 200 False: Failed but retryable, keep in queue 201 None: Failed with non-retryable error, move to errors directory 202 "no_reply": No reply was generated, move to no_reply directory 203 """ 204 import uuid 205 206 # Generate correlation ID for tracking this notification through the pipeline 207 correlation_id = str(uuid.uuid4())[:8] 208 209 try: 210 logger.info(f"[{correlation_id}] Starting process_mention", extra={ 211 'correlation_id': correlation_id, 212 'notification_type': type(notification_data).__name__ 213 }) 214 215 # Handle both dict and object inputs for backwards compatibility 216 if isinstance(notification_data, dict): 217 uri = notification_data['uri'] 218 mention_text = notification_data.get('record', {}).get('text', '') 219 author_handle = notification_data['author']['handle'] 220 author_name = notification_data['author'].get('display_name') or author_handle 221 else: 222 # Legacy object access 223 uri = notification_data.uri 224 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 225 author_handle = notification_data.author.handle 226 author_name = notification_data.author.display_name or author_handle 227 228 logger.info(f"[{correlation_id}] Processing mention from @{author_handle}", extra={ 229 'correlation_id': correlation_id, 230 'author_handle': author_handle, 231 'author_name': author_name, 232 'mention_uri': uri, 233 'mention_text_length': len(mention_text), 234 'mention_preview': mention_text[:100] if mention_text else '' 235 }) 236 237 # Check if handle is in allowed list (if configured) 238 allowed_handles = get_config().get('bot.allowed_handles', []) 239 if allowed_handles and author_handle not in allowed_handles: 240 logger.info(f"[{correlation_id}] Skipping mention from @{author_handle} (not in allowed_handles)") 241 return True # Remove from queue 242 243 # Retrieve the entire thread associated with the mention 244 try: 245 thread = atproto_client.app.bsky.feed.get_post_thread({ 246 'uri': uri, 247 'parent_height': 40, 248 'depth': 10 249 }) 250 except Exception as e: 251 error_str = str(e) 252 # Check if this is a NotFound error 253 if 'NotFound' in error_str or 'Post not found' in error_str: 254 logger.warning(f"Post not found for URI {uri}, removing from queue") 255 return True # Return True to remove from queue 256 elif 'InternalServerError' in error_str: 257 # Bluesky sometimes returns InternalServerError for deleted posts 258 # Verify if post actually exists using getRecord 259 try: 260 parts = uri.replace('at://', '').split('/') 261 repo, collection, rkey = parts[0], parts[1], parts[2] 262 atproto_client.com.atproto.repo.get_record({ 263 'repo': repo, 'collection': collection, 'rkey': rkey 264 }) 265 # Post exists, this is a real server error - re-raise 266 logger.error(f"Error fetching thread (post exists, server error): {e}") 267 raise 268 except Exception as verify_e: 269 if 'RecordNotFound' in str(verify_e) or 'not found' in str(verify_e).lower(): 270 logger.warning(f"Post deleted (verified via getRecord), removing from queue: {uri}") 271 return True # Remove from queue 272 # Some other verification error, re-raise original 273 logger.error(f"Error fetching thread: {e}") 274 raise 275 else: 276 # Re-raise other errors 277 logger.error(f"Error fetching thread: {e}") 278 raise 279 280 # Check thread length against configured maximum 281 max_thread_posts = get_config().get('bot.max_thread_posts', 0) 282 if max_thread_posts > 0: 283 thread_post_count = count_thread_posts(thread) 284 if thread_post_count > max_thread_posts: 285 logger.info(f"Thread too long ({thread_post_count} posts > {max_thread_posts} max), skipping this mention") 286 return True # Return True to remove from queue 287 288 # Get thread context as YAML string 289 logger.debug("Converting thread to YAML string") 290 try: 291 thread_context = thread_to_yaml_string(thread) 292 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 293 294 # Check if #voidstop appears anywhere in the thread 295 if "#voidstop" in thread_context.lower(): 296 logger.info("Found #voidstop in thread context, skipping this mention") 297 return True # Return True to remove from queue 298 299 # Also check the mention text directly 300 if "#voidstop" in mention_text.lower(): 301 logger.info("Found #voidstop in mention text, skipping this mention") 302 return True # Return True to remove from queue 303 304 # Create a more informative preview by extracting meaningful content 305 lines = thread_context.split('\n') 306 meaningful_lines = [] 307 308 for line in lines: 309 stripped = line.strip() 310 if not stripped: 311 continue 312 313 # Look for lines with actual content (not just structure) 314 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 315 meaningful_lines.append(line) 316 if len(meaningful_lines) >= 5: 317 break 318 319 if meaningful_lines: 320 preview = '\n'.join(meaningful_lines) 321 logger.debug(f"Thread content preview:\n{preview}") 322 else: 323 # If no content fields found, just show it's a thread structure 324 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 325 except Exception as yaml_error: 326 import traceback 327 logger.error(f"Error converting thread to YAML: {yaml_error}") 328 logger.error(f"Full traceback:\n{traceback.format_exc()}") 329 logger.error(f"Thread type: {type(thread)}") 330 if hasattr(thread, '__dict__'): 331 logger.error(f"Thread attributes: {thread.__dict__}") 332 # Try to continue with a simple context 333 thread_context = f"Error processing thread context: {str(yaml_error)}" 334 335 # Create a prompt for the Letta agent with thread context 336 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 337 338MOST RECENT POST: 339"{mention_text}" 340 341THREAD CONTEXT: 342```yaml 343{thread_context} 344``` 345 346If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call creates one post (max 300 characters). You may use multiple calls to create a thread if needed.""" 347 348 # Extract all handles from notification and thread data 349 all_handles = set() 350 all_handles.update(extract_handles_from_data(notification_data)) 351 all_handles.update(extract_handles_from_data(thread.model_dump())) 352 unique_handles = list(all_handles) 353 354 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 355 356 # Check if any handles are in known_bots list 357 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs 358 import json 359 360 try: 361 # Check for known bots in thread 362 bot_check_result = check_known_bots(unique_handles, void_agent) 363 bot_check_data = json.loads(bot_check_result) 364 365 # TEMPORARILY DISABLED: Bot detection causing issues with normal users 366 # TODO: Re-enable after debugging why normal users are being flagged as bots 367 if False: # bot_check_data.get("bot_detected", False): 368 detected_bots = bot_check_data.get("detected_bots", []) 369 logger.info(f"Bot detected in thread: {detected_bots}") 370 371 # Decide whether to respond (10% chance) 372 if not should_respond_to_bot_thread(): 373 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}") 374 # Return False to keep in queue for potential later processing 375 return False 376 else: 377 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}") 378 else: 379 logger.debug("Bot detection disabled - processing all notifications") 380 381 except Exception as bot_check_error: 382 logger.warning(f"Error checking for bots: {bot_check_error}") 383 # Continue processing if bot check fails 384 385 # Get response from Letta agent 386 # Format with Unicode characters 387 title = f"MENTION FROM @{author_handle}" 388 print(f"\n{title}") 389 print(f" {'' * len(title)}") 390 # Indent the mention text 391 for line in mention_text.split('\n'): 392 print(f" {line}") 393 394 # Log prompt details to separate logger 395 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 396 397 # Log concise prompt info to main logger 398 thread_handles_count = len(unique_handles) 399 prompt_char_count = len(prompt) 400 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars") 401 402 try: 403 # Use streaming to avoid 524 timeout errors 404 message_stream = CLIENT.agents.messages.create_stream( 405 agent_id=void_agent.id, 406 messages=[{"role": "user", "content": prompt}], 407 stream_tokens=False, # Step streaming only (faster than token streaming) 408 max_steps=100 409 ) 410 411 # Collect the streaming response 412 all_messages = [] 413 for chunk in message_stream: 414 # Log condensed chunk info 415 if hasattr(chunk, 'message_type'): 416 if chunk.message_type == 'reasoning_message': 417 # Show full reasoning without truncation 418 if SHOW_REASONING: 419 # Format with Unicode characters 420 print("\n◆ Reasoning") 421 print(" ─────────") 422 # Indent reasoning lines 423 for line in chunk.reasoning.split('\n'): 424 print(f" {line}") 425 else: 426 # Default log format (only when --reasoning is used due to log level) 427 # Format with Unicode characters 428 print("\n◆ Reasoning") 429 print(" ─────────") 430 # Indent reasoning lines 431 for line in chunk.reasoning.split('\n'): 432 print(f" {line}") 433 434 # Create ATProto record for reasoning (unless in testing mode) 435 if not testing_mode and hasattr(chunk, 'reasoning'): 436 try: 437 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 438 except Exception as e: 439 logger.debug(f"Failed to create reasoning record: {e}") 440 elif chunk.message_type == 'tool_call_message': 441 # Parse tool arguments for better display 442 tool_name = chunk.tool_call.name 443 444 # Create ATProto record for tool call (unless in testing mode) 445 if not testing_mode: 446 try: 447 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 448 bsky_utils.create_tool_call_record( 449 atproto_client, 450 tool_name, 451 chunk.tool_call.arguments, 452 tool_call_id 453 ) 454 except Exception as e: 455 logger.debug(f"Failed to create tool call record: {e}") 456 457 try: 458 args = json.loads(chunk.tool_call.arguments) 459 # Format based on tool type 460 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: 461 # Extract the text being posted 462 text = args.get('text', '') 463 if text: 464 # Format with Unicode characters 465 print("\n✎ Bluesky Post") 466 print(" ────────────") 467 # Indent post text 468 for line in text.split('\n'): 469 print(f" {line}") 470 else: 471 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 472 elif tool_name == 'archival_memory_search': 473 query = args.get('query', 'unknown') 474 global last_archival_query 475 last_archival_query = query 476 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 477 elif tool_name == 'archival_memory_insert': 478 content = args.get('content', '') 479 # Show the full content being inserted 480 log_with_panel(content, f"Tool call: {tool_name}", "blue") 481 elif tool_name == 'update_block': 482 label = args.get('label', 'unknown') 483 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 484 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 485 else: 486 # Generic display for other tools 487 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 488 if len(args_str) > 150: 489 args_str = args_str[:150] + "..." 490 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 491 except: 492 # Fallback to original format if parsing fails 493 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 494 elif chunk.message_type == 'tool_return_message': 495 # Enhanced tool result logging 496 tool_name = chunk.name 497 status = chunk.status 498 499 if status == 'success': 500 # Try to show meaningful result info based on tool type 501 if hasattr(chunk, 'tool_return') and chunk.tool_return: 502 result_str = str(chunk.tool_return) 503 if tool_name == 'archival_memory_search': 504 505 try: 506 # Handle both string and list formats 507 if isinstance(chunk.tool_return, str): 508 # The string format is: "([{...}, {...}], count)" 509 # We need to extract just the list part 510 if chunk.tool_return.strip(): 511 # Find the list part between the first [ and last ] 512 start_idx = chunk.tool_return.find('[') 513 end_idx = chunk.tool_return.rfind(']') 514 if start_idx != -1 and end_idx != -1: 515 list_str = chunk.tool_return[start_idx:end_idx+1] 516 # Use ast.literal_eval since this is Python literal syntax, not JSON 517 import ast 518 results = ast.literal_eval(list_str) 519 else: 520 logger.warning("Could not find list in archival_memory_search result") 521 results = [] 522 else: 523 logger.warning("Empty string returned from archival_memory_search") 524 results = [] 525 else: 526 # If it's already a list, use directly 527 results = chunk.tool_return 528 529 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name}", "green") 530 531 # Use the captured search query from the tool call 532 search_query = last_archival_query 533 534 # Combine all results into a single text block 535 content_text = "" 536 for i, entry in enumerate(results, 1): 537 timestamp = entry.get('timestamp', 'N/A') 538 content = entry.get('content', '') 539 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" 540 541 # Format with Unicode characters 542 title = f"{search_query} ({len(results)} results)" 543 print(f"\n{title}") 544 print(f" {'' * len(title)}") 545 # Indent content text 546 for line in content_text.strip().split('\n'): 547 print(f" {line}") 548 549 except Exception as e: 550 logger.error(f"Error formatting archival memory results: {e}") 551 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name}", "green") 552 elif tool_name == 'add_post_to_bluesky_reply_thread': 553 # Just show success for bluesky posts, the text was already shown in tool call 554 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") 555 elif tool_name == 'archival_memory_insert': 556 # Skip archival memory insert results (always returns None) 557 pass 558 elif tool_name == 'update_block': 559 log_with_panel("Memory block updated", f"Tool result: {tool_name}", "green") 560 else: 561 # Generic success with preview 562 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 563 log_with_panel(preview, f"Tool result: {tool_name}", "green") 564 else: 565 log_with_panel("Success", f"Tool result: {tool_name}", "green") 566 elif status == 'error': 567 # Show error details 568 if tool_name == 'add_post_to_bluesky_reply_thread': 569 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" 570 log_with_panel(error_str, f"Bluesky Post ✗", "red") 571 elif tool_name == 'archival_memory_insert': 572 # Skip archival memory insert errors too 573 pass 574 else: 575 error_preview = "" 576 if hasattr(chunk, 'tool_return') and chunk.tool_return: 577 error_str = str(chunk.tool_return) 578 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 579 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name}", "red") 580 else: 581 log_with_panel("Error occurred", f"Tool result: {tool_name}", "red") 582 else: 583 logger.info(f"Tool result: {tool_name} - {status}") 584 elif chunk.message_type == 'assistant_message': 585 # Format with Unicode characters 586 print("\n▶ Assistant Response") 587 print(" ──────────────────") 588 # Indent response text 589 for line in chunk.content.split('\n'): 590 print(f" {line}") 591 elif chunk.message_type == 'error_message': 592 # Dump full error object 593 logger.error(f"Agent error_message: {chunk}") 594 if hasattr(chunk, 'model_dump'): 595 logger.error(f"Agent error (dict): {chunk.model_dump()}") 596 elif hasattr(chunk, '__dict__'): 597 logger.error(f"Agent error (vars): {vars(chunk)}") 598 else: 599 # Filter out verbose message types 600 if chunk.message_type not in ['usage_statistics', 'stop_reason']: 601 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") 602 else: 603 logger.info(f"📦 Stream status: {chunk}") 604 605 # Log full chunk for debugging 606 logger.debug(f"Full streaming chunk: {chunk}") 607 all_messages.append(chunk) 608 if str(chunk) == 'done': 609 break 610 611 # Convert streaming response to standard format for compatibility 612 message_response = type('StreamingResponse', (), { 613 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 614 })() 615 except Exception as api_error: 616 import traceback 617 error_str = str(api_error) 618 logger.error(f"Letta API error: {api_error}") 619 logger.error(f"Error type: {type(api_error).__name__}") 620 logger.error(f"Full traceback:\n{traceback.format_exc()}") 621 logger.error(f"Mention text was: {mention_text}") 622 logger.error(f"Author: @{author_handle}") 623 logger.error(f"URI: {uri}") 624 625 626 # Try to extract more info from different error types 627 if hasattr(api_error, 'response'): 628 logger.error(f"Error response object exists") 629 if hasattr(api_error.response, 'text'): 630 logger.error(f"Response text: {api_error.response.text}") 631 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 632 try: 633 logger.error(f"Response JSON: {api_error.response.json()}") 634 except: 635 pass 636 637 # Check for specific error types 638 if hasattr(api_error, 'status_code'): 639 logger.error(f"API Status code: {api_error.status_code}") 640 if hasattr(api_error, 'body'): 641 logger.error(f"API Response body: {api_error.body}") 642 if hasattr(api_error, 'headers'): 643 logger.error(f"API Response headers: {api_error.headers}") 644 645 if api_error.status_code == 413: 646 logger.error("413 Payload Too Large - moving to errors directory") 647 return None # Move to errors directory - payload is too large to ever succeed 648 elif api_error.status_code == 524: 649 logger.error("524 error - timeout from Cloudflare, will retry later") 650 return False # Keep in queue for retry 651 652 # Check if error indicates we should remove from queue 653 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 654 logger.warning("Payload too large error, moving to errors directory") 655 return None # Move to errors directory - cannot be fixed by retry 656 elif 'status_code: 524' in error_str: 657 logger.warning("524 timeout error, keeping in queue for retry") 658 return False # Keep in queue for retry 659 660 raise 661 662 # Log successful response 663 logger.debug("Successfully received response from Letta API") 664 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 665 666 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 667 reply_candidates = [] 668 tool_call_results = {} # Map tool_call_id to status 669 ack_note = None # Track any note from annotate_ack tool 670 flagged_memories = [] # Track memories flagged for deletion 671 672 logger.debug(f"Processing {len(message_response.messages)} response messages...") 673 674 # First pass: collect tool return statuses 675 ignored_notification = False 676 ignore_reason = "" 677 ignore_category = "" 678 679 logger.debug(f"Processing {len(message_response.messages)} messages from agent") 680 681 for message in message_response.messages: 682 # Log detailed message attributes for debugging 683 msg_type = getattr(message, 'message_type', 'unknown') 684 has_tool_call_id = hasattr(message, 'tool_call_id') 685 has_status = hasattr(message, 'status') 686 has_tool_return = hasattr(message, 'tool_return') 687 688 logger.debug(f"Message type={msg_type}, has_tool_call_id={has_tool_call_id}, has_status={has_status}, has_tool_return={has_tool_return}") 689 690 # Tool return messages are identified by having tool_return attribute, tool_call_id, and status 691 if has_tool_call_id and has_status and has_tool_return: 692 logger.debug(f" -> tool_call_id={message.tool_call_id}, status={message.status}") 693 694 # Store the result for ANY tool that has a return - we'll match by tool_call_id later 695 tool_call_results[message.tool_call_id] = message.status 696 logger.debug(f"Stored tool result: {message.tool_call_id} -> {message.status}") 697 698 # Handle special processing for ignore_notification 699 if message.status == 'success': 700 result_str = str(message.tool_return) if message.tool_return else "" 701 if 'IGNORED_NOTIFICATION::' in result_str: 702 parts = result_str.split('::') 703 if len(parts) >= 3: 704 ignore_category = parts[1] 705 ignore_reason = parts[2] 706 ignored_notification = True 707 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 708 709 # Check for deprecated tool in tool call messages 710 elif hasattr(message, 'tool_call') and message.tool_call: 711 if message.tool_call.name == 'bluesky_reply': 712 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 713 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 714 logger.error("Update the agent's tools using register_tools.py") 715 # Export agent state before terminating 716 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 717 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 718 exit(1) 719 720 logger.debug(f"First pass complete. Collected {len(tool_call_results)} tool call results") 721 logger.debug(f"tool_call_results: {tool_call_results}") 722 723 # Second pass: process messages and check for successful tool calls 724 for i, message in enumerate(message_response.messages, 1): 725 # Log concise message info instead of full object 726 msg_type = getattr(message, 'message_type', 'unknown') 727 if hasattr(message, 'reasoning') and message.reasoning: 728 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 729 elif hasattr(message, 'tool_call') and message.tool_call: 730 tool_name = message.tool_call.name 731 logger.debug(f" {i}. {msg_type}: {tool_name}") 732 elif hasattr(message, 'tool_return'): 733 tool_name = getattr(message, 'name', 'unknown_tool') 734 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 735 status = getattr(message, 'status', 'unknown') 736 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 737 elif hasattr(message, 'text'): 738 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 739 else: 740 logger.debug(f" {i}. {msg_type}: <no content>") 741 742 # Check for halt_activity tool call 743 if hasattr(message, 'tool_call') and message.tool_call: 744 if message.tool_call.name == 'halt_activity': 745 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 746 try: 747 args = json.loads(message.tool_call.arguments) 748 reason = args.get('reason', 'Agent requested halt') 749 logger.info(f"Halt reason: {reason}") 750 except: 751 logger.info("Halt reason: <unable to parse>") 752 753 # Delete the queue file before terminating 754 if queue_filepath and queue_filepath.exists(): 755 queue_filepath.unlink() 756 logger.info(f"Deleted queue file: {queue_filepath.name}") 757 758 # Also mark as processed to avoid reprocessing 759 if NOTIFICATION_DB: 760 NOTIFICATION_DB.mark_processed(notification_data.get('uri', ''), status='processed') 761 else: 762 processed_uris = load_processed_notifications() 763 processed_uris.add(notification_data.get('uri', '')) 764 save_processed_notifications(processed_uris) 765 766 # Export agent state before terminating 767 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 768 769 # Exit the program 770 logger.info("=== BOT TERMINATED BY AGENT ===") 771 exit(0) 772 773 # Check for deprecated bluesky_reply tool 774 if hasattr(message, 'tool_call') and message.tool_call: 775 if message.tool_call.name == 'bluesky_reply': 776 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 777 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 778 logger.error("Update the agent's tools using register_tools.py") 779 # Export agent state before terminating 780 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 781 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 782 exit(1) 783 784 # Collect annotate_ack tool calls 785 elif message.tool_call.name == 'annotate_ack': 786 try: 787 args = json.loads(message.tool_call.arguments) 788 note = args.get('note', '') 789 if note: 790 ack_note = note 791 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 792 except json.JSONDecodeError as e: 793 logger.error(f"Failed to parse annotate_ack arguments: {e}") 794 795 # Collect flag_archival_memory_for_deletion tool calls 796 elif message.tool_call.name == 'flag_archival_memory_for_deletion': 797 try: 798 args = json.loads(message.tool_call.arguments) 799 reason = args.get('reason', '') 800 memory_text = args.get('memory_text', '') 801 confirm = args.get('confirm', False) 802 803 # Only flag for deletion if confirmed and has all required fields 804 if confirm and memory_text and reason: 805 flagged_memories.append({ 806 'reason': reason, 807 'memory_text': memory_text 808 }) 809 logger.debug(f"Found memory flagged for deletion (reason: {reason}): {memory_text[:50]}...") 810 elif not confirm: 811 logger.debug(f"Memory deletion not confirmed, skipping: {memory_text[:50]}...") 812 elif not reason: 813 logger.warning(f"Memory deletion missing reason, skipping: {memory_text[:50]}...") 814 except json.JSONDecodeError as e: 815 logger.error(f"Failed to parse flag_archival_memory_for_deletion arguments: {e}") 816 817 # Collect archival_memory_insert tool calls for recording to AT Protocol 818 elif message.tool_call.name == 'archival_memory_insert': 819 try: 820 args = json.loads(message.tool_call.arguments) 821 content = args.get('content', '') 822 tags_str = args.get('tags', None) 823 824 # Parse tags from string representation if present 825 tags = None 826 if tags_str: 827 try: 828 tags = json.loads(tags_str) if isinstance(tags_str, str) else tags_str 829 except json.JSONDecodeError: 830 logger.warning(f"Failed to parse tags from archival_memory_insert: {tags_str[:50]}...") 831 832 if content: 833 # Create stream.thought.memory record 834 try: 835 memory_result = bsky_utils.create_memory_record(atproto_client, content, tags) 836 if memory_result: 837 tags_info = f" ({len(tags)} tags)" if tags else "" 838 logger.info(f"📝 Recorded archival memory to AT Protocol{tags_info}: {content[:100]}...") 839 else: 840 logger.warning(f"Failed to record archival memory to AT Protocol") 841 except Exception as e: 842 logger.error(f"Error creating memory record: {e}") 843 except json.JSONDecodeError as e: 844 logger.error(f"Failed to parse archival_memory_insert arguments: {e}") 845 846 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 847 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 848 tool_call_id = message.tool_call.tool_call_id 849 tool_status = tool_call_results.get(tool_call_id, 'unknown') 850 851 logger.debug(f"Found add_post_to_bluesky_reply_thread tool call: id={tool_call_id}, status={tool_status}") 852 logger.debug(f"Available tool_call_results: {tool_call_results}") 853 854 if tool_status == 'success': 855 try: 856 args = json.loads(message.tool_call.arguments) 857 reply_text = args.get('text', '') 858 reply_lang = args.get('lang', 'en-US') 859 860 if reply_text: # Only add if there's actual content 861 reply_candidates.append((reply_text, reply_lang)) 862 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 863 except json.JSONDecodeError as e: 864 logger.error(f"Failed to parse tool call arguments: {e}") 865 elif tool_status == 'error': 866 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 867 else: 868 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 869 logger.warning(f" Tool call ID '{tool_call_id}' not found in tool_call_results dict") 870 871 # Handle archival memory deletion if any were flagged (only if no halt was received) 872 if flagged_memories: 873 logger.info(f"Processing {len(flagged_memories)} flagged memories for deletion") 874 for flagged_memory in flagged_memories: 875 reason = flagged_memory['reason'] 876 memory_text = flagged_memory['memory_text'] 877 878 try: 879 # Search for passages with this exact text 880 logger.debug(f"Searching for passages matching: {memory_text[:100]}...") 881 passages = CLIENT.agents.passages.list( 882 agent_id=void_agent.id, 883 query=memory_text 884 ) 885 886 if not passages: 887 logger.warning(f"No passages found matching flagged memory: {memory_text[:50]}...") 888 continue 889 890 # Delete all matching passages 891 deleted_count = 0 892 for passage in passages: 893 # Check if the passage text exactly matches (to avoid partial matches) 894 if hasattr(passage, 'text') and passage.text == memory_text: 895 try: 896 CLIENT.agents.passages.delete( 897 agent_id=void_agent.id, 898 passage_id=str(passage.id) 899 ) 900 deleted_count += 1 901 logger.debug(f"Deleted passage {passage.id}") 902 except Exception as delete_error: 903 logger.error(f"Failed to delete passage {passage.id}: {delete_error}") 904 905 if deleted_count > 0: 906 logger.info(f"🗑️ Deleted {deleted_count} archival memory passage(s) (reason: {reason}): {memory_text[:50]}...") 907 else: 908 logger.warning(f"No exact matches found for deletion: {memory_text[:50]}...") 909 910 except Exception as e: 911 logger.error(f"Error processing memory deletion: {e}") 912 913 # Check for conflicting tool calls 914 if reply_candidates and ignored_notification: 915 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 916 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 917 logger.warning("Item will be left in queue for manual review") 918 # Return False to keep in queue 919 return False 920 921 if reply_candidates: 922 # Aggregate reply posts into a thread 923 reply_messages = [] 924 reply_langs = [] 925 for text, lang in reply_candidates: 926 reply_messages.append(text) 927 reply_langs.append(lang) 928 929 # Use the first language for the entire thread (could be enhanced later) 930 reply_lang = reply_langs[0] if reply_langs else 'en-US' 931 932 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 933 934 # Display the generated reply thread 935 if len(reply_messages) == 1: 936 content = reply_messages[0] 937 title = f"Reply to @{author_handle}" 938 else: 939 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) 940 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" 941 942 # Format with Unicode characters 943 print(f"\n{title}") 944 print(f" {'' * len(title)}") 945 # Indent content lines 946 for line in content.split('\n'): 947 print(f" {line}") 948 949 # Send the reply(s) with language (unless in testing mode) 950 if testing_mode: 951 logger.info("TESTING MODE: Skipping actual Bluesky post") 952 response = True # Simulate success 953 else: 954 if len(reply_messages) == 1: 955 # Single reply - use existing function 956 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 957 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 958 response = bsky_utils.reply_to_notification( 959 client=atproto_client, 960 notification=notification_data, 961 reply_text=cleaned_text, 962 lang=reply_lang, 963 correlation_id=correlation_id 964 ) 965 else: 966 # Multiple replies - use new threaded function 967 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 968 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 969 response = bsky_utils.reply_with_thread_to_notification( 970 client=atproto_client, 971 notification=notification_data, 972 reply_messages=cleaned_messages, 973 lang=reply_lang, 974 correlation_id=correlation_id 975 ) 976 977 if response: 978 logger.info(f"[{correlation_id}] Successfully replied to @{author_handle}", extra={ 979 'correlation_id': correlation_id, 980 'author_handle': author_handle, 981 'reply_count': len(reply_messages) 982 }) 983 984 # Acknowledge the post we're replying to with stream.thought.ack 985 try: 986 post_uri = notification_data.get('uri') 987 post_cid = notification_data.get('cid') 988 989 if post_uri and post_cid: 990 ack_result = bsky_utils.acknowledge_post( 991 client=atproto_client, 992 post_uri=post_uri, 993 post_cid=post_cid, 994 note=ack_note 995 ) 996 if ack_result: 997 if ack_note: 998 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")") 999 else: 1000 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack") 1001 else: 1002 logger.warning(f"Failed to acknowledge post from @{author_handle}") 1003 else: 1004 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}") 1005 except Exception as e: 1006 logger.error(f"Error acknowledging post from @{author_handle}: {e}") 1007 # Don't fail the entire operation if acknowledgment fails 1008 1009 return True 1010 else: 1011 logger.error(f"Failed to send reply to @{author_handle}") 1012 return False 1013 else: 1014 # Check if notification was explicitly ignored 1015 if ignored_notification: 1016 logger.info(f"[{correlation_id}] Notification from @{author_handle} was explicitly ignored (category: {ignore_category})", extra={ 1017 'correlation_id': correlation_id, 1018 'author_handle': author_handle, 1019 'ignore_category': ignore_category 1020 }) 1021 return "ignored" 1022 else: 1023 logger.warning(f"[{correlation_id}] No reply generated for mention from @{author_handle}, moving to no_reply folder", extra={ 1024 'correlation_id': correlation_id, 1025 'author_handle': author_handle 1026 }) 1027 return "no_reply" 1028 1029 except Exception as e: 1030 logger.error(f"[{correlation_id}] Error processing mention: {e}", extra={ 1031 'correlation_id': correlation_id, 1032 'error': str(e), 1033 'error_type': type(e).__name__, 1034 'author_handle': author_handle if 'author_handle' in locals() else 'unknown' 1035 }) 1036 return False 1037 1038 1039def notification_to_dict(notification): 1040 """Convert a notification object to a dictionary for JSON serialization.""" 1041 return { 1042 'uri': notification.uri, 1043 'cid': notification.cid, 1044 'reason': notification.reason, 1045 'is_read': notification.is_read, 1046 'indexed_at': notification.indexed_at, 1047 'author': { 1048 'handle': notification.author.handle, 1049 'display_name': notification.author.display_name, 1050 'did': notification.author.did 1051 }, 1052 'record': { 1053 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 1054 } 1055 } 1056 1057 1058def load_processed_notifications(): 1059 """Load the set of processed notification URIs from database.""" 1060 global NOTIFICATION_DB 1061 if NOTIFICATION_DB: 1062 return NOTIFICATION_DB.get_processed_uris(limit=MAX_PROCESSED_NOTIFICATIONS) 1063 return set() 1064 1065 1066def save_processed_notifications(processed_set): 1067 """Save the set of processed notification URIs to database.""" 1068 # This is now handled by marking individual notifications in the DB 1069 # Keeping function for compatibility but it doesn't need to do anything 1070 pass 1071 1072 1073def save_notification_to_queue(notification, is_priority=None): 1074 """Save a notification to the queue directory with priority-based filename.""" 1075 try: 1076 global NOTIFICATION_DB 1077 1078 # Handle both notification objects and dicts 1079 if isinstance(notification, dict): 1080 notif_dict = notification 1081 notification_uri = notification.get('uri') 1082 else: 1083 notif_dict = notification_to_dict(notification) 1084 notification_uri = notification.uri 1085 1086 # Check if already processed (using database if available) 1087 if NOTIFICATION_DB: 1088 if NOTIFICATION_DB.is_processed(notification_uri): 1089 logger.debug(f"Notification already processed (DB): {notification_uri}") 1090 return False 1091 # Add to database - if this fails, don't queue the notification 1092 if not NOTIFICATION_DB.add_notification(notif_dict): 1093 logger.warning(f"Failed to add notification to database, skipping: {notification_uri}") 1094 return False 1095 else: 1096 # Fall back to old JSON method 1097 processed_uris = load_processed_notifications() 1098 if notification_uri in processed_uris: 1099 logger.debug(f"Notification already processed: {notification_uri}") 1100 return False 1101 1102 # Create JSON string 1103 notif_json = json.dumps(notif_dict, sort_keys=True) 1104 1105 # Generate hash for filename (to avoid duplicates) 1106 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 1107 1108 # Determine priority based on author handle or explicit priority 1109 if is_priority is not None: 1110 priority_prefix = "0_" if is_priority else "1_" 1111 else: 1112 if isinstance(notification, dict): 1113 author_handle = notification.get('author', {}).get('handle', '') 1114 else: 1115 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 1116 # Prioritize cameron.pfiffer.org responses 1117 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 1118 1119 # Create filename with priority, timestamp and hash 1120 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 1121 reason = notif_dict.get('reason', 'unknown') 1122 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" 1123 filepath = QUEUE_DIR / filename 1124 1125 # Check if this notification URI is already in the queue 1126 for existing_file in QUEUE_DIR.glob("*.json"): 1127 if existing_file.name == "processed_notifications.json": 1128 continue 1129 try: 1130 with open(existing_file, 'r') as f: 1131 existing_data = json.load(f) 1132 if existing_data.get('uri') == notification_uri: 1133 logger.debug(f"Notification already queued (URI: {notification_uri})") 1134 return False 1135 except: 1136 continue 1137 1138 # Write to file 1139 with open(filepath, 'w') as f: 1140 json.dump(notif_dict, f, indent=2) 1141 1142 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 1143 logger.info(f"Queued notification ({priority_label}): {filename}") 1144 return True 1145 1146 except Exception as e: 1147 logger.error(f"Error saving notification to queue: {e}") 1148 return False 1149 1150 1151def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 1152 """Load and process all notifications from the queue in priority order.""" 1153 try: 1154 # Get all JSON files in queue directory (excluding processed_notifications.json) 1155 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 1156 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1157 1158 # Filter out and delete like notifications immediately 1159 queue_files = [] 1160 likes_deleted = 0 1161 1162 for filepath in all_queue_files: 1163 try: 1164 with open(filepath, 'r') as f: 1165 notif_data = json.load(f) 1166 1167 # If it's a like, delete it immediately and don't process 1168 if notif_data.get('reason') == 'like': 1169 filepath.unlink() 1170 likes_deleted += 1 1171 logger.debug(f"Deleted like notification: {filepath.name}") 1172 else: 1173 queue_files.append(filepath) 1174 except Exception as e: 1175 logger.warning(f"Error checking notification file {filepath.name}: {e}") 1176 queue_files.append(filepath) # Keep it in case it's valid 1177 1178 if likes_deleted > 0: 1179 logger.info(f"Deleted {likes_deleted} like notifications from queue") 1180 1181 if not queue_files: 1182 return 1183 1184 logger.info(f"Processing {len(queue_files)} queued notifications") 1185 1186 # Log current statistics 1187 elapsed_time = time.time() - start_time 1188 total_messages = sum(message_counters.values()) 1189 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1190 1191 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 1192 1193 for i, filepath in enumerate(queue_files, 1): 1194 # Determine if this is a priority notification 1195 is_priority = filepath.name.startswith("0_") 1196 1197 # Check for new notifications periodically during queue processing 1198 # Also check immediately after processing each priority item 1199 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1) 1200 1201 # If we just processed a priority item, immediately check for new priority notifications 1202 if is_priority and i > 1: 1203 should_check_notifications = True 1204 1205 if should_check_notifications: 1206 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") 1207 try: 1208 # Fetch and queue new notifications without processing them 1209 new_count = fetch_and_queue_new_notifications(atproto_client) 1210 1211 if new_count > 0: 1212 logger.info(f"Added {new_count} new notifications to queue") 1213 # Reload the queue files to include the new items 1214 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1215 queue_files = updated_queue_files 1216 logger.info(f"Queue updated: now {len(queue_files)} total items") 1217 except Exception as e: 1218 logger.error(f"Error checking for new notifications: {e}") 1219 1220 priority_label = " [PRIORITY]" if is_priority else "" 1221 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}") 1222 try: 1223 # Load notification data 1224 with open(filepath, 'r') as f: 1225 notif_data = json.load(f) 1226 1227 # Process based on type using dict data directly 1228 success = False 1229 if notif_data['reason'] == "mention": 1230 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1231 if success: 1232 message_counters['mentions'] += 1 1233 elif notif_data['reason'] == "reply": 1234 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1235 if success: 1236 message_counters['replies'] += 1 1237 elif notif_data['reason'] == "follow": 1238 # author_handle = notif_data['author']['handle'] 1239 # author_display_name = notif_data['author'].get('display_name', 'no display name') 1240 # follow_update = f"@{author_handle} ({author_display_name}) started following you." 1241 # follow_message = f"Update: {follow_update}" 1242 # logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars") 1243 # CLIENT.agents.messages.create( 1244 # agent_id = void_agent.id, 1245 # messages = [{"role":"user", "content": follow_message}] 1246 # ) 1247 success = True # Follow updates are always successful 1248 # if success: 1249 # message_counters['follows'] += 1 1250 elif notif_data['reason'] == "repost": 1251 # Skip reposts silently 1252 success = True # Skip reposts but mark as successful to remove from queue 1253 if success: 1254 message_counters['reposts_skipped'] += 1 1255 elif notif_data['reason'] == "like": 1256 # Skip likes silently 1257 success = True # Skip likes but mark as successful to remove from queue 1258 if success: 1259 message_counters.setdefault('likes_skipped', 0) 1260 message_counters['likes_skipped'] += 1 1261 else: 1262 logger.warning(f"Unknown notification type: {notif_data['reason']}") 1263 success = True # Remove unknown types from queue 1264 1265 # Handle file based on processing result 1266 if success: 1267 if testing_mode: 1268 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") 1269 else: 1270 filepath.unlink() 1271 logger.info(f"Successfully processed and removed: {filepath.name}") 1272 1273 # Mark as processed to avoid reprocessing 1274 if NOTIFICATION_DB: 1275 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='processed') 1276 else: 1277 processed_uris = load_processed_notifications() 1278 processed_uris.add(notif_data['uri']) 1279 save_processed_notifications(processed_uris) 1280 1281 elif success is None: # Special case for moving to error directory 1282 error_path = QUEUE_ERROR_DIR / filepath.name 1283 filepath.rename(error_path) 1284 logger.warning(f"Moved {filepath.name} to errors directory") 1285 1286 # Also mark as processed to avoid retrying 1287 if NOTIFICATION_DB: 1288 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1289 else: 1290 processed_uris = load_processed_notifications() 1291 processed_uris.add(notif_data['uri']) 1292 save_processed_notifications(processed_uris) 1293 1294 elif success == "no_reply": # Special case for moving to no_reply directory 1295 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 1296 filepath.rename(no_reply_path) 1297 logger.info(f"Moved {filepath.name} to no_reply directory") 1298 1299 # Also mark as processed to avoid retrying 1300 if NOTIFICATION_DB: 1301 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1302 else: 1303 processed_uris = load_processed_notifications() 1304 processed_uris.add(notif_data['uri']) 1305 save_processed_notifications(processed_uris) 1306 1307 elif success == "ignored": # Special case for explicitly ignored notifications 1308 # For ignored notifications, we just delete them (not move to no_reply) 1309 filepath.unlink() 1310 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1311 1312 # Also mark as processed to avoid retrying 1313 if NOTIFICATION_DB: 1314 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1315 else: 1316 processed_uris = load_processed_notifications() 1317 processed_uris.add(notif_data['uri']) 1318 save_processed_notifications(processed_uris) 1319 1320 else: 1321 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 1322 1323 except Exception as e: 1324 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 1325 # Keep the file for retry later 1326 1327 except Exception as e: 1328 logger.error(f"Error loading queued notifications: {e}") 1329 1330 1331def fetch_and_queue_new_notifications(atproto_client): 1332 """Fetch new notifications and queue them without processing.""" 1333 try: 1334 global NOTIFICATION_DB 1335 1336 # Get current time for marking notifications as seen 1337 logger.debug("Getting current time for notification marking...") 1338 last_seen_at = atproto_client.get_current_time_iso() 1339 1340 # Get timestamp of last processed notification for filtering 1341 last_processed_time = None 1342 if NOTIFICATION_DB: 1343 last_processed_time = NOTIFICATION_DB.get_latest_processed_time() 1344 if last_processed_time: 1345 logger.debug(f"Last processed notification was at: {last_processed_time}") 1346 1347 # Fetch ALL notifications using pagination 1348 all_notifications = [] 1349 cursor = None 1350 page_count = 0 1351 max_pages = 20 # Safety limit to prevent infinite loops 1352 1353 while page_count < max_pages: 1354 try: 1355 # Fetch notifications page 1356 if cursor: 1357 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1358 params={'cursor': cursor, 'limit': 100} 1359 ) 1360 else: 1361 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1362 params={'limit': 100} 1363 ) 1364 1365 page_count += 1 1366 page_notifications = notifications_response.notifications 1367 1368 if not page_notifications: 1369 break 1370 1371 all_notifications.extend(page_notifications) 1372 1373 # Check if there are more pages 1374 cursor = getattr(notifications_response, 'cursor', None) 1375 if not cursor: 1376 break 1377 1378 except Exception as e: 1379 logger.error(f"Error fetching notifications page {page_count}: {e}") 1380 break 1381 1382 # Now process all fetched notifications 1383 new_count = 0 1384 if all_notifications: 1385 logger.info(f"📥 Fetched {len(all_notifications)} total notifications from API") 1386 1387 # Mark as seen first 1388 try: 1389 atproto_client.app.bsky.notification.update_seen( 1390 data={'seenAt': last_seen_at} 1391 ) 1392 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") 1393 except Exception as e: 1394 logger.error(f"Error marking notifications as seen: {e}") 1395 1396 # Debug counters 1397 skipped_read = 0 1398 skipped_likes = 0 1399 skipped_processed = 0 1400 skipped_old_timestamp = 0 1401 processed_uris = load_processed_notifications() 1402 1403 # Queue all new notifications (except likes) 1404 for notif in all_notifications: 1405 # Skip if older than last processed (when we have timestamp filtering) 1406 if last_processed_time and hasattr(notif, 'indexed_at'): 1407 if notif.indexed_at <= last_processed_time: 1408 skipped_old_timestamp += 1 1409 logger.debug(f"Skipping old notification (indexed_at {notif.indexed_at} <= {last_processed_time})") 1410 continue 1411 1412 # Debug: Log is_read status but DON'T skip based on it 1413 if hasattr(notif, 'is_read') and notif.is_read: 1414 skipped_read += 1 1415 logger.debug(f"Notification has is_read=True (but processing anyway): {notif.uri if hasattr(notif, 'uri') else 'unknown'}") 1416 1417 # Skip likes 1418 if hasattr(notif, 'reason') and notif.reason == 'like': 1419 skipped_likes += 1 1420 continue 1421 1422 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif 1423 1424 # Skip likes in dict form too 1425 if notif_dict.get('reason') == 'like': 1426 continue 1427 1428 # Check if already processed 1429 notif_uri = notif_dict.get('uri', '') 1430 if notif_uri in processed_uris: 1431 skipped_processed += 1 1432 logger.debug(f"Skipping already processed: {notif_uri}") 1433 continue 1434 1435 # Check if it's a priority notification 1436 is_priority = False 1437 1438 # Priority for cameron.pfiffer.org notifications 1439 author_handle = notif_dict.get('author', {}).get('handle', '') 1440 if author_handle == "cameron.pfiffer.org": 1441 is_priority = True 1442 1443 # Also check for priority keywords in mentions 1444 if notif_dict.get('reason') == 'mention': 1445 # Get the mention text to check for priority keywords 1446 record = notif_dict.get('record', {}) 1447 text = record.get('text', '') 1448 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): 1449 is_priority = True 1450 1451 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1452 new_count += 1 1453 logger.debug(f"Queued notification from @{author_handle}: {notif_dict.get('reason', 'unknown')}") 1454 1455 # Log summary of filtering 1456 logger.info(f"📊 Notification processing summary:") 1457 logger.info(f" • Total fetched: {len(all_notifications)}") 1458 logger.info(f" • Had is_read=True: {skipped_read} (not skipped)") 1459 logger.info(f" • Skipped (likes): {skipped_likes}") 1460 logger.info(f" • Skipped (old timestamp): {skipped_old_timestamp}") 1461 logger.info(f" • Skipped (already processed): {skipped_processed}") 1462 logger.info(f" • Queued for processing: {new_count}") 1463 else: 1464 logger.debug("No new notifications to queue") 1465 1466 return new_count 1467 1468 except Exception as e: 1469 logger.error(f"Error fetching and queueing notifications: {e}") 1470 return 0 1471 1472 1473def process_notifications(void_agent, atproto_client, testing_mode=False): 1474 """Fetch new notifications, queue them, and process the queue.""" 1475 try: 1476 # Fetch and queue new notifications 1477 new_count = fetch_and_queue_new_notifications(atproto_client) 1478 1479 if new_count > 0: 1480 logger.info(f"Found {new_count} new notifications to process") 1481 1482 # Now process the entire queue (old + new notifications) 1483 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 1484 1485 except Exception as e: 1486 logger.error(f"Error processing notifications: {e}") 1487 1488 1489def send_synthesis_message(client: Letta, agent_id: str, agent_name: str = "void", atproto_client=None) -> None: 1490 """ 1491 Send a synthesis message to the agent every 10 minutes. 1492 This prompts the agent to synthesize its recent experiences. 1493 1494 Args: 1495 client: Letta client 1496 agent_id: Agent ID to send synthesis to 1497 agent_name: Agent name for temporal block labels 1498 atproto_client: Optional AT Protocol client for posting synthesis results 1499 """ 1500 # Track attached temporal blocks for cleanup 1501 attached_temporal_labels = [] 1502 1503 try: 1504 logger.info("🧠 Preparing synthesis with temporal journal blocks") 1505 1506 # Attach temporal blocks before synthesis 1507 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id, agent_name) 1508 if not success: 1509 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway") 1510 1511 # Create synthesis prompt with agent-specific block names 1512 today = date.today() 1513 synthesis_prompt = f"""Time for synthesis. 1514 1515This is your periodic opportunity to reflect on recent experiences and update your memory. 1516 1517The following temporal journal blocks are temporarily available for this session: 1518- {agent_name}_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')}) 1519- {agent_name}_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')}) 1520- {agent_name}_year_{today.year}: This year's journal ({today.year}) 1521 1522You may use these blocks as you see fit. Synthesize your recent experiences into your memory as appropriate.""" 1523 1524 logger.info("🧠 Sending enhanced synthesis prompt to agent") 1525 1526 # Send synthesis message with streaming to show tool use 1527 message_stream = client.agents.messages.create_stream( 1528 agent_id=agent_id, 1529 messages=[{"role": "user", "content": synthesis_prompt}], 1530 stream_tokens=False, 1531 max_steps=100 1532 ) 1533 1534 # Track synthesis content for potential posting 1535 synthesis_posts = [] 1536 ack_note = None 1537 1538 # Process the streaming response 1539 for chunk in message_stream: 1540 if hasattr(chunk, 'message_type'): 1541 if chunk.message_type == 'reasoning_message': 1542 if SHOW_REASONING: 1543 print("\n◆ Reasoning") 1544 print(" ─────────") 1545 for line in chunk.reasoning.split('\n'): 1546 print(f" {line}") 1547 1548 # Create ATProto record for reasoning (if we have atproto client) 1549 if atproto_client and hasattr(chunk, 'reasoning'): 1550 try: 1551 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 1552 except Exception as e: 1553 logger.debug(f"Failed to create reasoning record during synthesis: {e}") 1554 elif chunk.message_type == 'tool_call_message': 1555 tool_name = chunk.tool_call.name 1556 1557 # Create ATProto record for tool call (if we have atproto client) 1558 if atproto_client: 1559 try: 1560 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 1561 bsky_utils.create_tool_call_record( 1562 atproto_client, 1563 tool_name, 1564 chunk.tool_call.arguments, 1565 tool_call_id 1566 ) 1567 except Exception as e: 1568 logger.debug(f"Failed to create tool call record during synthesis: {e}") 1569 try: 1570 args = json.loads(chunk.tool_call.arguments) 1571 if tool_name == 'archival_memory_search': 1572 query = args.get('query', 'unknown') 1573 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 1574 elif tool_name == 'archival_memory_insert': 1575 content = args.get('content', '') 1576 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue") 1577 1578 # Record archival memory insert to AT Protocol 1579 if atproto_client: 1580 try: 1581 tags_str = args.get('tags', None) 1582 tags = None 1583 if tags_str: 1584 try: 1585 tags = json.loads(tags_str) if isinstance(tags_str, str) else tags_str 1586 except json.JSONDecodeError: 1587 logger.warning(f"Failed to parse tags from archival_memory_insert: {tags_str[:50]}...") 1588 1589 memory_result = bsky_utils.create_memory_record(atproto_client, content, tags) 1590 if memory_result: 1591 tags_info = f" ({len(tags)} tags)" if tags else "" 1592 logger.info(f"📝 Recorded archival memory to AT Protocol{tags_info}") 1593 except Exception as e: 1594 logger.debug(f"Failed to create memory record during synthesis: {e}") 1595 elif tool_name == 'update_block': 1596 label = args.get('label', 'unknown') 1597 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', '')) 1598 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 1599 elif tool_name == 'annotate_ack': 1600 note = args.get('note', '') 1601 if note: 1602 ack_note = note 1603 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue") 1604 elif tool_name == 'add_post_to_bluesky_reply_thread': 1605 text = args.get('text', '') 1606 synthesis_posts.append(text) 1607 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue") 1608 else: 1609 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 1610 if len(args_str) > 150: 1611 args_str = args_str[:150] + "..." 1612 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 1613 except: 1614 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 1615 elif chunk.message_type == 'tool_return_message': 1616 if chunk.status == 'success': 1617 log_with_panel("Success", f"Tool result: {chunk.name}", "green") 1618 else: 1619 log_with_panel("Error", f"Tool result: {chunk.name}", "red") 1620 elif chunk.message_type == 'assistant_message': 1621 print("\n▶ Synthesis Response") 1622 print(" ──────────────────") 1623 for line in chunk.content.split('\n'): 1624 print(f" {line}") 1625 elif chunk.message_type == 'error_message': 1626 # Dump full error object 1627 logger.error(f"Synthesis error_message: {chunk}") 1628 if hasattr(chunk, 'model_dump'): 1629 logger.error(f"Synthesis error (dict): {chunk.model_dump()}") 1630 elif hasattr(chunk, '__dict__'): 1631 logger.error(f"Synthesis error (vars): {vars(chunk)}") 1632 1633 if str(chunk) == 'done': 1634 break 1635 1636 logger.info("🧠 Synthesis message processed successfully") 1637 1638 # Handle synthesis acknowledgments if we have an atproto client 1639 if atproto_client and ack_note: 1640 try: 1641 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note) 1642 if result: 1643 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...") 1644 else: 1645 logger.warning("Failed to create synthesis acknowledgment") 1646 except Exception as e: 1647 logger.error(f"Error creating synthesis acknowledgment: {e}") 1648 1649 # Handle synthesis posts if any were generated 1650 if atproto_client and synthesis_posts: 1651 try: 1652 for post_text in synthesis_posts: 1653 cleaned_text = bsky_utils.remove_outside_quotes(post_text) 1654 response = bsky_utils.send_post(atproto_client, cleaned_text) 1655 if response: 1656 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...") 1657 else: 1658 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...") 1659 except Exception as e: 1660 logger.error(f"Error posting synthesis content: {e}") 1661 1662 except Exception as e: 1663 logger.error(f"Error sending synthesis message: {e}") 1664 finally: 1665 # Always detach temporal blocks after synthesis 1666 if attached_temporal_labels: 1667 logger.info("🧠 Detaching temporal journal blocks after synthesis") 1668 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels, agent_name) 1669 if not detach_success: 1670 logger.warning("Some temporal blocks may not have been detached properly") 1671 1672 1673def attach_temporal_blocks(client: Letta, agent_id: str, agent_name: str = "void") -> tuple: 1674 """ 1675 Attach temporal journal blocks (day, month, year) to the agent for synthesis. 1676 Creates blocks if they don't exist. 1677 1678 Args: 1679 client: Letta client 1680 agent_id: Agent ID 1681 agent_name: Agent name for prefixing block labels (prevents collision across agents) 1682 1683 Returns: 1684 Tuple of (success: bool, attached_labels: list) 1685 """ 1686 try: 1687 today = date.today() 1688 1689 # Generate temporal block labels with agent-specific prefix 1690 day_label = f"{agent_name}_day_{today.strftime('%Y_%m_%d')}" 1691 month_label = f"{agent_name}_month_{today.strftime('%Y_%m')}" 1692 year_label = f"{agent_name}_year_{today.year}" 1693 1694 temporal_labels = [day_label, month_label, year_label] 1695 attached_labels = [] 1696 1697 # Get current blocks attached to agent 1698 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1699 current_block_labels = {block.label for block in current_blocks} 1700 current_block_ids = {str(block.id) for block in current_blocks} 1701 1702 for label in temporal_labels: 1703 try: 1704 # Skip if already attached 1705 if label in current_block_labels: 1706 logger.debug(f"Temporal block already attached: {label}") 1707 attached_labels.append(label) 1708 continue 1709 1710 # Check if block exists globally 1711 blocks = client.blocks.list(label=label) 1712 1713 if blocks and len(blocks) > 0: 1714 block = blocks[0] 1715 # Check if already attached by ID 1716 if str(block.id) in current_block_ids: 1717 logger.debug(f"Temporal block already attached by ID: {label}") 1718 attached_labels.append(label) 1719 continue 1720 else: 1721 # Create new temporal block with appropriate header 1722 if "day" in label: 1723 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}" 1724 initial_content = f"{header}\n\nNo entries yet for today." 1725 elif "month" in label: 1726 header = f"# Monthly Journal - {today.strftime('%B %Y')}" 1727 initial_content = f"{header}\n\nNo entries yet for this month." 1728 else: # year 1729 header = f"# Yearly Journal - {today.year}" 1730 initial_content = f"{header}\n\nNo entries yet for this year." 1731 1732 block = client.blocks.create( 1733 label=label, 1734 value=initial_content, 1735 limit=10000 # Larger limit for journal blocks 1736 ) 1737 logger.info(f"Created new temporal block: {label}") 1738 1739 # Attach the block 1740 client.agents.blocks.attach( 1741 agent_id=agent_id, 1742 block_id=str(block.id) 1743 ) 1744 attached_labels.append(label) 1745 logger.info(f"Attached temporal block: {label}") 1746 1747 except Exception as e: 1748 # Check for duplicate constraint errors 1749 error_str = str(e) 1750 if "duplicate key value violates unique constraint" in error_str: 1751 logger.debug(f"Temporal block already attached (constraint): {label}") 1752 attached_labels.append(label) 1753 else: 1754 logger.warning(f"Failed to attach temporal block {label}: {e}") 1755 1756 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}") 1757 return True, attached_labels 1758 1759 except Exception as e: 1760 logger.error(f"Error attaching temporal blocks: {e}") 1761 return False, [] 1762 1763 1764def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: list = None, agent_name: str = "void") -> bool: 1765 """ 1766 Detach temporal journal blocks from the agent after synthesis. 1767 1768 Args: 1769 client: Letta client 1770 agent_id: Agent ID 1771 labels_to_detach: Optional list of specific labels to detach. 1772 If None, detaches all temporal blocks for this agent. 1773 agent_name: Agent name for prefixing block labels (prevents collision across agents) 1774 1775 Returns: 1776 bool: Success status 1777 """ 1778 try: 1779 # If no specific labels provided, generate today's labels 1780 if labels_to_detach is None: 1781 today = date.today() 1782 labels_to_detach = [ 1783 f"{agent_name}_day_{today.strftime('%Y_%m_%d')}", 1784 f"{agent_name}_month_{today.strftime('%Y_%m')}", 1785 f"{agent_name}_year_{today.year}" 1786 ] 1787 1788 # Get current blocks and build label to ID mapping 1789 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1790 block_label_to_id = {block.label: str(block.id) for block in current_blocks} 1791 1792 detached_count = 0 1793 for label in labels_to_detach: 1794 if label in block_label_to_id: 1795 try: 1796 client.agents.blocks.detach( 1797 agent_id=agent_id, 1798 block_id=block_label_to_id[label] 1799 ) 1800 detached_count += 1 1801 logger.debug(f"Detached temporal block: {label}") 1802 except Exception as e: 1803 logger.warning(f"Failed to detach temporal block {label}: {e}") 1804 else: 1805 logger.debug(f"Temporal block not attached: {label}") 1806 1807 logger.info(f"Detached {detached_count} temporal blocks") 1808 return True 1809 1810 except Exception as e: 1811 logger.error(f"Error detaching temporal blocks: {e}") 1812 return False 1813 1814 1815def main(): 1816 # Parse command line arguments 1817 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 1818 parser.add_argument('--config', type=str, default='configs/config.yaml', help='Path to config file (default: configs/config.yaml)') 1819 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1820 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1821 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') 1822 # --rich option removed as we now use simple text formatting 1823 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') 1824 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 1825 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') 1826 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') 1827 parser.add_argument('--debug', action='store_true', help='Enable debug logging') 1828 args = parser.parse_args() 1829 1830 # Initialize configuration with custom path 1831 global letta_config, CLIENT, QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR, PROCESSED_NOTIFICATIONS_FILE, NOTIFICATION_DB 1832 get_config(args.config) # Initialize the global config instance 1833 letta_config = get_letta_config() 1834 1835 # Initialize queue paths from config 1836 queue_config = get_queue_config() 1837 QUEUE_DIR = Path(queue_config['base_dir']) 1838 QUEUE_ERROR_DIR = Path(queue_config['error_dir']) 1839 QUEUE_NO_REPLY_DIR = Path(queue_config['no_reply_dir']) 1840 PROCESSED_NOTIFICATIONS_FILE = Path(queue_config['processed_file']) 1841 1842 # Get bot name for logging 1843 bot_name = queue_config['bot_name'] 1844 1845 # Create queue directories 1846 QUEUE_DIR.mkdir(exist_ok=True) 1847 QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 1848 QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 1849 1850 # Create Letta client with configuration 1851 CLIENT_PARAMS = { 1852 'token': letta_config['api_key'], 1853 'timeout': letta_config['timeout'] 1854 } 1855 if letta_config.get('base_url'): 1856 CLIENT_PARAMS['base_url'] = letta_config['base_url'] 1857 CLIENT = Letta(**CLIENT_PARAMS) 1858 1859 # Configure logging based on command line arguments 1860 if args.simple_logs: 1861 log_format = f"{bot_name} - %(levelname)s - %(message)s" 1862 else: 1863 # Create custom formatter with symbols 1864 class SymbolFormatter(logging.Formatter): 1865 """Custom formatter that adds symbols for different log levels""" 1866 1867 SYMBOLS = { 1868 logging.DEBUG: '', 1869 logging.INFO: '', 1870 logging.WARNING: '', 1871 logging.ERROR: '', 1872 logging.CRITICAL: '' 1873 } 1874 1875 def __init__(self, bot_name): 1876 super().__init__() 1877 self.bot_name = bot_name 1878 1879 def format(self, record): 1880 # Get the symbol for this log level 1881 symbol = self.SYMBOLS.get(record.levelno, '') 1882 1883 # Format time as HH:MM:SS 1884 timestamp = self.formatTime(record, "%H:%M:%S") 1885 1886 # Build the formatted message 1887 level_name = f"{record.levelname:<5}" # Left-align, 5 chars 1888 1889 # Use vertical bar as separator 1890 parts = [symbol, timestamp, '', self.bot_name, '', level_name, '', record.getMessage()] 1891 1892 return ' '.join(parts) 1893 1894 # Reset logging configuration 1895 for handler in logging.root.handlers[:]: 1896 logging.root.removeHandler(handler) 1897 1898 # Create handler with custom formatter 1899 handler = logging.StreamHandler() 1900 if not args.simple_logs: 1901 handler.setFormatter(SymbolFormatter(bot_name)) 1902 else: 1903 handler.setFormatter(logging.Formatter(log_format)) 1904 1905 # Configure root logger 1906 logging.root.setLevel(logging.INFO) 1907 logging.root.addHandler(handler) 1908 1909 global logger, prompt_logger, console 1910 logger = logging.getLogger("void_bot") 1911 logger.setLevel(logging.DEBUG if args.debug else logging.INFO) 1912 1913 # Create a separate logger for prompts (set to WARNING to hide by default) 1914 prompt_logger = logging.getLogger("void_bot.prompts") 1915 if args.reasoning: 1916 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1917 else: 1918 prompt_logger.setLevel(logging.WARNING) # Hide by default 1919 1920 # Disable httpx logging completely 1921 logging.getLogger("httpx").setLevel(logging.CRITICAL) 1922 1923 # Create Rich console for pretty printing 1924 # Console no longer used - simple text formatting 1925 1926 global TESTING_MODE, SKIP_GIT, SHOW_REASONING 1927 TESTING_MODE = args.test 1928 1929 # Store no-git flag globally for use in export_agent_state calls 1930 SKIP_GIT = args.no_git 1931 1932 # Store rich flag globally 1933 # Rich formatting no longer used 1934 1935 # Store reasoning flag globally 1936 SHOW_REASONING = args.reasoning 1937 1938 if TESTING_MODE: 1939 logger.info("=== RUNNING IN TESTING MODE ===") 1940 logger.info(" - No messages will be sent to Bluesky") 1941 logger.info(" - Queue files will not be deleted") 1942 logger.info(" - Notifications will not be marked as seen") 1943 print("\n") 1944 1945 # Check for synthesis-only mode 1946 SYNTHESIS_ONLY = args.synthesis_only 1947 if SYNTHESIS_ONLY: 1948 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===") 1949 logger.info(" - Only synthesis messages will be sent") 1950 logger.info(" - No notification processing") 1951 logger.info(" - No Bluesky client needed") 1952 print("\n") 1953 """Main bot loop that continuously monitors for notifications.""" 1954 global start_time 1955 start_time = time.time() 1956 logger.info("=== STARTING VOID BOT ===") 1957 void_agent = initialize_void() 1958 logger.info(f"Void agent initialized: {void_agent.id}") 1959 1960 # Initialize notification database with config-based path 1961 logger.info("Initializing notification database...") 1962 NOTIFICATION_DB = NotificationDB(db_path=queue_config['db_path']) 1963 1964 # Migrate from old JSON format if it exists 1965 if PROCESSED_NOTIFICATIONS_FILE.exists(): 1966 logger.info("Found old processed_notifications.json, migrating to database...") 1967 NOTIFICATION_DB.migrate_from_json(str(PROCESSED_NOTIFICATIONS_FILE)) 1968 1969 # Log database stats 1970 db_stats = NOTIFICATION_DB.get_stats() 1971 logger.info(f"Database initialized - Total notifications: {db_stats.get('total', 0)}, Recent (24h): {db_stats.get('recent_24h', 0)}") 1972 1973 # Clean up old records 1974 NOTIFICATION_DB.cleanup_old_records(days=7) 1975 1976 # Ensure correct tools are attached for Bluesky 1977 logger.info("Configuring tools for Bluesky platform...") 1978 try: 1979 from tool_manager import ensure_platform_tools 1980 ensure_platform_tools('bluesky', void_agent.id) 1981 except Exception as e: 1982 logger.error(f"Failed to configure platform tools: {e}") 1983 logger.warning("Continuing with existing tool configuration") 1984 1985 # Check if agent has required tools 1986 if hasattr(void_agent, 'tools') and void_agent.tools: 1987 tool_names = [tool.name for tool in void_agent.tools] 1988 # Check for bluesky-related tools 1989 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1990 if not bluesky_tools: 1991 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 1992 else: 1993 logger.warning("Agent has no tools registered!") 1994 1995 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts) 1996 if not SYNTHESIS_ONLY: 1997 atproto_client = bsky_utils.default_login() 1998 logger.info("Connected to Bluesky") 1999 else: 2000 # In synthesis-only mode, still connect for acks and posts (unless in test mode) 2001 if not args.test: 2002 atproto_client = bsky_utils.default_login() 2003 logger.info("Connected to Bluesky (for synthesis acks/posts)") 2004 else: 2005 atproto_client = None 2006 logger.info("Skipping Bluesky connection (test mode)") 2007 2008 # Configure intervals 2009 CLEANUP_INTERVAL = args.cleanup_interval 2010 SYNTHESIS_INTERVAL = args.synthesis_interval 2011 2012 # Synthesis-only mode 2013 if SYNTHESIS_ONLY: 2014 if SYNTHESIS_INTERVAL <= 0: 2015 logger.error("Synthesis-only mode requires --synthesis-interval > 0") 2016 return 2017 2018 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 2019 2020 while True: 2021 try: 2022 # Send synthesis message immediately on first run 2023 logger.info("🧠 Sending synthesis message") 2024 send_synthesis_message(CLIENT, void_agent.id, void_agent.name, atproto_client) 2025 2026 # Wait for next interval 2027 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...") 2028 sleep(SYNTHESIS_INTERVAL) 2029 2030 except KeyboardInterrupt: 2031 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===") 2032 break 2033 except Exception as e: 2034 logger.error(f"Error in synthesis loop: {e}") 2035 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...") 2036 sleep(SYNTHESIS_INTERVAL) 2037 2038 # Normal mode with notification processing 2039 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 2040 2041 cycle_count = 0 2042 2043 if CLEANUP_INTERVAL > 0: 2044 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles") 2045 else: 2046 logger.info("User block cleanup disabled") 2047 2048 if SYNTHESIS_INTERVAL > 0: 2049 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 2050 else: 2051 logger.info("Synthesis messages disabled") 2052 2053 while True: 2054 try: 2055 cycle_count += 1 2056 process_notifications(void_agent, atproto_client, TESTING_MODE) 2057 2058 # Check if synthesis interval has passed 2059 if SYNTHESIS_INTERVAL > 0: 2060 current_time = time.time() 2061 global last_synthesis_time 2062 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL: 2063 logger.info(f"{SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis") 2064 send_synthesis_message(CLIENT, void_agent.id, void_agent.name, atproto_client) 2065 last_synthesis_time = current_time 2066 2067 # Run periodic cleanup every N cycles 2068 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 2069 # Check if autofollow is enabled and sync followers 2070 from config_loader import get_bluesky_config 2071 bluesky_config = get_bluesky_config() 2072 if bluesky_config.get('autofollow', False): 2073 logger.info("🔄 Syncing followers (autofollow enabled)") 2074 try: 2075 sync_result = bsky_utils.sync_followers(atproto_client, dry_run=False) 2076 if 'error' in sync_result: 2077 logger.error(f"Autofollow failed: {sync_result['error']}") 2078 else: 2079 if sync_result['newly_followed']: 2080 logger.info(f"✓ Followed {len(sync_result['newly_followed'])} new users: {', '.join(sync_result['newly_followed'])}") 2081 else: 2082 logger.debug(f"No new followers to follow back ({sync_result['followers_count']} followers, {sync_result['following_count']} following)") 2083 if sync_result.get('errors'): 2084 logger.warning(f"Some follows failed: {len(sync_result['errors'])} errors") 2085 except Exception as e: 2086 logger.error(f"Error during autofollow sync: {e}") 2087 2088 # Also check database health when doing cleanup 2089 if NOTIFICATION_DB: 2090 db_stats = NOTIFICATION_DB.get_stats() 2091 pending = db_stats.get('status_pending', 0) 2092 errors = db_stats.get('status_error', 0) 2093 2094 if pending > 50: 2095 logger.warning(f"⚠️ Queue health check: {pending} pending notifications (may be stuck)") 2096 if errors > 20: 2097 logger.warning(f"⚠️ Queue health check: {errors} error notifications") 2098 2099 # Periodic cleanup of old records 2100 if cycle_count % (CLEANUP_INTERVAL * 10) == 0: # Every 100 cycles 2101 logger.info("Running database cleanup of old records...") 2102 NOTIFICATION_DB.cleanup_old_records(days=7) 2103 2104 # Log cycle completion with stats 2105 elapsed_time = time.time() - start_time 2106 total_messages = sum(message_counters.values()) 2107 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 2108 2109 if total_messages > 0: 2110 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 2111 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 2112 2113 except KeyboardInterrupt: 2114 # Final stats 2115 elapsed_time = time.time() - start_time 2116 total_messages = sum(message_counters.values()) 2117 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 2118 2119 logger.info("=== BOT STOPPED BY USER ===") 2120 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 2121 logger.info(f" - {message_counters['mentions']} mentions") 2122 logger.info(f" - {message_counters['replies']} replies") 2123 logger.info(f" - {message_counters['follows']} follows") 2124 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 2125 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 2126 2127 # Close database connection 2128 if NOTIFICATION_DB: 2129 logger.info("Closing database connection...") 2130 NOTIFICATION_DB.close() 2131 2132 break 2133 except Exception as e: 2134 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 2135 logger.error(f"Error details: {e}") 2136 # Wait a bit longer on errors 2137 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 2138 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 2139 2140 2141if __name__ == "__main__": 2142 main()