a digital person for bluesky
1# Rich imports removed - using simple text formatting 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14import argparse 15 16from utils import ( 17 upsert_block, 18 upsert_agent 19) 20from config_loader import get_letta_config, get_config, get_queue_config 21 22import bsky_utils 23from tools.blocks import attach_user_blocks, detach_user_blocks 24from datetime import date 25from notification_db import NotificationDB 26 27def extract_handles_from_data(data): 28 """Recursively extract all unique handles from nested data structure.""" 29 handles = set() 30 31 def _extract_recursive(obj): 32 if isinstance(obj, dict): 33 # Check if this dict has a 'handle' key 34 if 'handle' in obj: 35 handles.add(obj['handle']) 36 # Recursively check all values 37 for value in obj.values(): 38 _extract_recursive(value) 39 elif isinstance(obj, list): 40 # Recursively check all list items 41 for item in obj: 42 _extract_recursive(item) 43 44 _extract_recursive(data) 45 return list(handles) 46 47# Logging will be configured after argument parsing 48logger = None 49prompt_logger = None 50# Simple text formatting (Rich no longer used) 51SHOW_REASONING = False 52last_archival_query = "archival memory search" 53 54def log_with_panel(message, title=None, border_color="white"): 55 """Log a message with Unicode box-drawing characters""" 56 if title: 57 # Map old color names to appropriate symbols 58 symbol_map = { 59 "blue": "", # Tool calls 60 "green": "", # Success/completion 61 "yellow": "", # Reasoning 62 "red": "", # Errors 63 "white": "", # Default/mentions 64 "cyan": "", # Posts 65 } 66 symbol = symbol_map.get(border_color, "") 67 68 print(f"\n{symbol} {title}") 69 print(f" {'' * len(title)}") 70 # Indent message lines 71 for line in message.split('\n'): 72 print(f" {line}") 73 else: 74 print(message) 75 76 77# Load Letta configuration from config.yaml (will be initialized later with custom path if provided) 78letta_config = None 79CLIENT = None 80 81# Notification check delay 82FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response 83 84# Check for new notifications every N queue items 85CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing 86 87# Queue paths (will be initialized from config in main()) 88QUEUE_DIR = None 89QUEUE_ERROR_DIR = None 90QUEUE_NO_REPLY_DIR = None 91PROCESSED_NOTIFICATIONS_FILE = None 92 93# Maximum number of processed notifications to track 94MAX_PROCESSED_NOTIFICATIONS = 10000 95 96# Message tracking counters 97message_counters = defaultdict(int) 98start_time = time.time() 99 100# Testing mode flag 101TESTING_MODE = False 102 103# Skip git operations flag 104SKIP_GIT = False 105 106# Synthesis message tracking 107last_synthesis_time = time.time() 108 109# Database for notification tracking 110NOTIFICATION_DB = None 111 112def export_agent_state(client, agent, skip_git=False): 113 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 114 try: 115 # Confirm export with user unless git is being skipped 116 if not skip_git: 117 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 118 if response not in ['y', 'yes']: 119 logger.info("Agent export cancelled by user.") 120 return 121 else: 122 logger.info("Exporting agent state (git staging disabled)") 123 124 # Create directories if they don't exist 125 os.makedirs("agent_archive", exist_ok=True) 126 os.makedirs("agents", exist_ok=True) 127 128 # Export agent data 129 logger.info(f"Exporting agent {agent.id}. This takes some time...") 130 agent_data = client.agents.export_file(agent_id=agent.id) 131 132 # Save timestamped archive copy 133 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 134 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 135 with open(archive_file, 'w', encoding='utf-8') as f: 136 json.dump(agent_data, f, indent=2, ensure_ascii=False) 137 138 # Save current agent state 139 current_file = os.path.join("agents", "void.af") 140 with open(current_file, 'w', encoding='utf-8') as f: 141 json.dump(agent_data, f, indent=2, ensure_ascii=False) 142 143 logger.info(f"Agent exported to {archive_file} and {current_file}") 144 145 # Git add only the current agent file (archive is ignored) unless skip_git is True 146 if not skip_git: 147 try: 148 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 149 logger.info("Added current agent file to git staging") 150 except subprocess.CalledProcessError as e: 151 logger.warning(f"Failed to git add agent file: {e}") 152 153 except Exception as e: 154 logger.error(f"Failed to export agent: {e}") 155 156def initialize_void(): 157 logger.info("Starting void agent initialization...") 158 159 # Get the configured void agent by ID 160 logger.info("Loading void agent from config...") 161 agent_id = letta_config['agent_id'] 162 163 try: 164 void_agent = CLIENT.agents.retrieve(agent_id=agent_id) 165 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})") 166 except Exception as e: 167 logger.error(f"Failed to load void agent {agent_id}: {e}") 168 logger.error("Please ensure the agent_id in config.yaml is correct") 169 raise e 170 171 # Export agent state 172 logger.info("Exporting agent state...") 173 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 174 175 # Log agent details 176 logger.info(f"Void agent details - ID: {void_agent.id}") 177 logger.info(f"Agent name: {void_agent.name}") 178 if hasattr(void_agent, 'llm_config'): 179 logger.info(f"Agent model: {void_agent.llm_config.model}") 180 if hasattr(void_agent, 'project_id') and void_agent.project_id: 181 logger.info(f"Agent project_id: {void_agent.project_id}") 182 if hasattr(void_agent, 'tools'): 183 logger.info(f"Agent has {len(void_agent.tools)} tools") 184 for tool in void_agent.tools[:3]: # Show first 3 tools 185 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 186 187 return void_agent 188 189 190def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 191 """Process a mention and generate a reply using the Letta agent. 192 193 Args: 194 void_agent: The Letta agent instance 195 atproto_client: The AT Protocol client 196 notification_data: The notification data dictionary 197 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 198 199 Returns: 200 True: Successfully processed, remove from queue 201 False: Failed but retryable, keep in queue 202 None: Failed with non-retryable error, move to errors directory 203 "no_reply": No reply was generated, move to no_reply directory 204 """ 205 import uuid 206 207 # Generate correlation ID for tracking this notification through the pipeline 208 correlation_id = str(uuid.uuid4())[:8] 209 210 try: 211 logger.info(f"[{correlation_id}] Starting process_mention", extra={ 212 'correlation_id': correlation_id, 213 'notification_type': type(notification_data).__name__ 214 }) 215 216 # Handle both dict and object inputs for backwards compatibility 217 if isinstance(notification_data, dict): 218 uri = notification_data['uri'] 219 mention_text = notification_data.get('record', {}).get('text', '') 220 author_handle = notification_data['author']['handle'] 221 author_name = notification_data['author'].get('display_name') or author_handle 222 else: 223 # Legacy object access 224 uri = notification_data.uri 225 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 226 author_handle = notification_data.author.handle 227 author_name = notification_data.author.display_name or author_handle 228 229 logger.info(f"[{correlation_id}] Processing mention from @{author_handle}", extra={ 230 'correlation_id': correlation_id, 231 'author_handle': author_handle, 232 'author_name': author_name, 233 'mention_uri': uri, 234 'mention_text_length': len(mention_text), 235 'mention_preview': mention_text[:100] if mention_text else '' 236 }) 237 238 # Retrieve the entire thread associated with the mention 239 try: 240 thread = atproto_client.app.bsky.feed.get_post_thread({ 241 'uri': uri, 242 'parent_height': 40, 243 'depth': 10 244 }) 245 except Exception as e: 246 error_str = str(e) 247 # Check if this is a NotFound error 248 if 'NotFound' in error_str or 'Post not found' in error_str: 249 logger.warning(f"Post not found for URI {uri}, removing from queue") 250 return True # Return True to remove from queue 251 else: 252 # Re-raise other errors 253 logger.error(f"Error fetching thread: {e}") 254 raise 255 256 # Get thread context as YAML string 257 logger.debug("Converting thread to YAML string") 258 try: 259 thread_context = thread_to_yaml_string(thread) 260 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 261 262 # Check if #voidstop appears anywhere in the thread 263 if "#voidstop" in thread_context.lower(): 264 logger.info("Found #voidstop in thread context, skipping this mention") 265 return True # Return True to remove from queue 266 267 # Also check the mention text directly 268 if "#voidstop" in mention_text.lower(): 269 logger.info("Found #voidstop in mention text, skipping this mention") 270 return True # Return True to remove from queue 271 272 # Create a more informative preview by extracting meaningful content 273 lines = thread_context.split('\n') 274 meaningful_lines = [] 275 276 for line in lines: 277 stripped = line.strip() 278 if not stripped: 279 continue 280 281 # Look for lines with actual content (not just structure) 282 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 283 meaningful_lines.append(line) 284 if len(meaningful_lines) >= 5: 285 break 286 287 if meaningful_lines: 288 preview = '\n'.join(meaningful_lines) 289 logger.debug(f"Thread content preview:\n{preview}") 290 else: 291 # If no content fields found, just show it's a thread structure 292 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 293 except Exception as yaml_error: 294 import traceback 295 logger.error(f"Error converting thread to YAML: {yaml_error}") 296 logger.error(f"Full traceback:\n{traceback.format_exc()}") 297 logger.error(f"Thread type: {type(thread)}") 298 if hasattr(thread, '__dict__'): 299 logger.error(f"Thread attributes: {thread.__dict__}") 300 # Try to continue with a simple context 301 thread_context = f"Error processing thread context: {str(yaml_error)}" 302 303 # Create a prompt for the Letta agent with thread context 304 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 305 306MOST RECENT POST (the mention you're responding to): 307"{mention_text}" 308 309FULL THREAD CONTEXT: 310```yaml 311{thread_context} 312``` 313 314The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 315 316To reply, use the add_post_to_bluesky_reply_thread tool: 317- Each call creates one post (max 300 characters) 318- For most responses, a single call is sufficient 319- Only use multiple calls for threaded replies when: 320 * The topic requires extended explanation that cannot fit in 300 characters 321 * You're explicitly asked for a detailed/long response 322 * The conversation naturally benefits from a structured multi-part answer 323- Avoid unnecessary threads - be concise when possible""" 324 325 # Extract all handles from notification and thread data 326 all_handles = set() 327 all_handles.update(extract_handles_from_data(notification_data)) 328 all_handles.update(extract_handles_from_data(thread.model_dump())) 329 unique_handles = list(all_handles) 330 331 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 332 333 # Check if any handles are in known_bots list 334 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs 335 import json 336 337 try: 338 # Check for known bots in thread 339 bot_check_result = check_known_bots(unique_handles, void_agent) 340 bot_check_data = json.loads(bot_check_result) 341 342 # TEMPORARILY DISABLED: Bot detection causing issues with normal users 343 # TODO: Re-enable after debugging why normal users are being flagged as bots 344 if False: # bot_check_data.get("bot_detected", False): 345 detected_bots = bot_check_data.get("detected_bots", []) 346 logger.info(f"Bot detected in thread: {detected_bots}") 347 348 # Decide whether to respond (10% chance) 349 if not should_respond_to_bot_thread(): 350 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}") 351 # Return False to keep in queue for potential later processing 352 return False 353 else: 354 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}") 355 else: 356 logger.debug("Bot detection disabled - processing all notifications") 357 358 except Exception as bot_check_error: 359 logger.warning(f"Error checking for bots: {bot_check_error}") 360 # Continue processing if bot check fails 361 362 # Attach user blocks before agent call 363 attached_handles = [] 364 if unique_handles: 365 try: 366 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 367 attach_result = attach_user_blocks(unique_handles, void_agent) 368 attached_handles = unique_handles # Track successfully attached handles 369 logger.debug(f"Attach result: {attach_result}") 370 except Exception as attach_error: 371 logger.warning(f"Failed to attach user blocks: {attach_error}") 372 # Continue without user blocks rather than failing completely 373 374 # Get response from Letta agent 375 # Format with Unicode characters 376 title = f"MENTION FROM @{author_handle}" 377 print(f"\n{title}") 378 print(f" {'' * len(title)}") 379 # Indent the mention text 380 for line in mention_text.split('\n'): 381 print(f" {line}") 382 383 # Log prompt details to separate logger 384 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 385 386 # Log concise prompt info to main logger 387 thread_handles_count = len(unique_handles) 388 prompt_char_count = len(prompt) 389 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars") 390 391 try: 392 # Use streaming to avoid 524 timeout errors 393 message_stream = CLIENT.agents.messages.create_stream( 394 agent_id=void_agent.id, 395 messages=[{"role": "user", "content": prompt}], 396 stream_tokens=False, # Step streaming only (faster than token streaming) 397 max_steps=100 398 ) 399 400 # Collect the streaming response 401 all_messages = [] 402 for chunk in message_stream: 403 # Log condensed chunk info 404 if hasattr(chunk, 'message_type'): 405 if chunk.message_type == 'reasoning_message': 406 # Show full reasoning without truncation 407 if SHOW_REASONING: 408 # Format with Unicode characters 409 print("\n◆ Reasoning") 410 print(" ─────────") 411 # Indent reasoning lines 412 for line in chunk.reasoning.split('\n'): 413 print(f" {line}") 414 else: 415 # Default log format (only when --reasoning is used due to log level) 416 # Format with Unicode characters 417 print("\n◆ Reasoning") 418 print(" ─────────") 419 # Indent reasoning lines 420 for line in chunk.reasoning.split('\n'): 421 print(f" {line}") 422 423 # Create ATProto record for reasoning (unless in testing mode) 424 if not testing_mode and hasattr(chunk, 'reasoning'): 425 try: 426 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 427 except Exception as e: 428 logger.debug(f"Failed to create reasoning record: {e}") 429 elif chunk.message_type == 'tool_call_message': 430 # Parse tool arguments for better display 431 tool_name = chunk.tool_call.name 432 433 # Create ATProto record for tool call (unless in testing mode) 434 if not testing_mode: 435 try: 436 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 437 bsky_utils.create_tool_call_record( 438 atproto_client, 439 tool_name, 440 chunk.tool_call.arguments, 441 tool_call_id 442 ) 443 except Exception as e: 444 logger.debug(f"Failed to create tool call record: {e}") 445 446 try: 447 args = json.loads(chunk.tool_call.arguments) 448 # Format based on tool type 449 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: 450 # Extract the text being posted 451 text = args.get('text', '') 452 if text: 453 # Format with Unicode characters 454 print("\n✎ Bluesky Post") 455 print(" ────────────") 456 # Indent post text 457 for line in text.split('\n'): 458 print(f" {line}") 459 else: 460 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 461 elif tool_name == 'archival_memory_search': 462 query = args.get('query', 'unknown') 463 global last_archival_query 464 last_archival_query = query 465 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 466 elif tool_name == 'archival_memory_insert': 467 content = args.get('content', '') 468 # Show the full content being inserted 469 log_with_panel(content, f"Tool call: {tool_name}", "blue") 470 elif tool_name == 'update_block': 471 label = args.get('label', 'unknown') 472 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 473 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 474 else: 475 # Generic display for other tools 476 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 477 if len(args_str) > 150: 478 args_str = args_str[:150] + "..." 479 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 480 except: 481 # Fallback to original format if parsing fails 482 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 483 elif chunk.message_type == 'tool_return_message': 484 # Enhanced tool result logging 485 tool_name = chunk.name 486 status = chunk.status 487 488 if status == 'success': 489 # Try to show meaningful result info based on tool type 490 if hasattr(chunk, 'tool_return') and chunk.tool_return: 491 result_str = str(chunk.tool_return) 492 if tool_name == 'archival_memory_search': 493 494 try: 495 # Handle both string and list formats 496 if isinstance(chunk.tool_return, str): 497 # The string format is: "([{...}, {...}], count)" 498 # We need to extract just the list part 499 if chunk.tool_return.strip(): 500 # Find the list part between the first [ and last ] 501 start_idx = chunk.tool_return.find('[') 502 end_idx = chunk.tool_return.rfind(']') 503 if start_idx != -1 and end_idx != -1: 504 list_str = chunk.tool_return[start_idx:end_idx+1] 505 # Use ast.literal_eval since this is Python literal syntax, not JSON 506 import ast 507 results = ast.literal_eval(list_str) 508 else: 509 logger.warning("Could not find list in archival_memory_search result") 510 results = [] 511 else: 512 logger.warning("Empty string returned from archival_memory_search") 513 results = [] 514 else: 515 # If it's already a list, use directly 516 results = chunk.tool_return 517 518 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name}", "green") 519 520 # Use the captured search query from the tool call 521 search_query = last_archival_query 522 523 # Combine all results into a single text block 524 content_text = "" 525 for i, entry in enumerate(results, 1): 526 timestamp = entry.get('timestamp', 'N/A') 527 content = entry.get('content', '') 528 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" 529 530 # Format with Unicode characters 531 title = f"{search_query} ({len(results)} results)" 532 print(f"\n{title}") 533 print(f" {'' * len(title)}") 534 # Indent content text 535 for line in content_text.strip().split('\n'): 536 print(f" {line}") 537 538 except Exception as e: 539 logger.error(f"Error formatting archival memory results: {e}") 540 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name}", "green") 541 elif tool_name == 'add_post_to_bluesky_reply_thread': 542 # Just show success for bluesky posts, the text was already shown in tool call 543 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") 544 elif tool_name == 'archival_memory_insert': 545 # Skip archival memory insert results (always returns None) 546 pass 547 elif tool_name == 'update_block': 548 log_with_panel("Memory block updated", f"Tool result: {tool_name}", "green") 549 else: 550 # Generic success with preview 551 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 552 log_with_panel(preview, f"Tool result: {tool_name}", "green") 553 else: 554 log_with_panel("Success", f"Tool result: {tool_name}", "green") 555 elif status == 'error': 556 # Show error details 557 if tool_name == 'add_post_to_bluesky_reply_thread': 558 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" 559 log_with_panel(error_str, f"Bluesky Post ✗", "red") 560 elif tool_name == 'archival_memory_insert': 561 # Skip archival memory insert errors too 562 pass 563 else: 564 error_preview = "" 565 if hasattr(chunk, 'tool_return') and chunk.tool_return: 566 error_str = str(chunk.tool_return) 567 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 568 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name}", "red") 569 else: 570 log_with_panel("Error occurred", f"Tool result: {tool_name}", "red") 571 else: 572 logger.info(f"Tool result: {tool_name} - {status}") 573 elif chunk.message_type == 'assistant_message': 574 # Format with Unicode characters 575 print("\n▶ Assistant Response") 576 print(" ──────────────────") 577 # Indent response text 578 for line in chunk.content.split('\n'): 579 print(f" {line}") 580 else: 581 # Filter out verbose message types 582 if chunk.message_type not in ['usage_statistics', 'stop_reason']: 583 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") 584 else: 585 logger.info(f"📦 Stream status: {chunk}") 586 587 # Log full chunk for debugging 588 logger.debug(f"Full streaming chunk: {chunk}") 589 all_messages.append(chunk) 590 if str(chunk) == 'done': 591 break 592 593 # Convert streaming response to standard format for compatibility 594 message_response = type('StreamingResponse', (), { 595 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 596 })() 597 except Exception as api_error: 598 import traceback 599 error_str = str(api_error) 600 logger.error(f"Letta API error: {api_error}") 601 logger.error(f"Error type: {type(api_error).__name__}") 602 logger.error(f"Full traceback:\n{traceback.format_exc()}") 603 logger.error(f"Mention text was: {mention_text}") 604 logger.error(f"Author: @{author_handle}") 605 logger.error(f"URI: {uri}") 606 607 608 # Try to extract more info from different error types 609 if hasattr(api_error, 'response'): 610 logger.error(f"Error response object exists") 611 if hasattr(api_error.response, 'text'): 612 logger.error(f"Response text: {api_error.response.text}") 613 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 614 try: 615 logger.error(f"Response JSON: {api_error.response.json()}") 616 except: 617 pass 618 619 # Check for specific error types 620 if hasattr(api_error, 'status_code'): 621 logger.error(f"API Status code: {api_error.status_code}") 622 if hasattr(api_error, 'body'): 623 logger.error(f"API Response body: {api_error.body}") 624 if hasattr(api_error, 'headers'): 625 logger.error(f"API Response headers: {api_error.headers}") 626 627 if api_error.status_code == 413: 628 logger.error("413 Payload Too Large - moving to errors directory") 629 return None # Move to errors directory - payload is too large to ever succeed 630 elif api_error.status_code == 524: 631 logger.error("524 error - timeout from Cloudflare, will retry later") 632 return False # Keep in queue for retry 633 634 # Check if error indicates we should remove from queue 635 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 636 logger.warning("Payload too large error, moving to errors directory") 637 return None # Move to errors directory - cannot be fixed by retry 638 elif 'status_code: 524' in error_str: 639 logger.warning("524 timeout error, keeping in queue for retry") 640 return False # Keep in queue for retry 641 642 raise 643 644 # Log successful response 645 logger.debug("Successfully received response from Letta API") 646 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 647 648 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 649 reply_candidates = [] 650 tool_call_results = {} # Map tool_call_id to status 651 ack_note = None # Track any note from annotate_ack tool 652 flagged_memories = [] # Track memories flagged for deletion 653 654 logger.debug(f"Processing {len(message_response.messages)} response messages...") 655 656 # First pass: collect tool return statuses 657 ignored_notification = False 658 ignore_reason = "" 659 ignore_category = "" 660 661 for message in message_response.messages: 662 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 663 if message.name == 'add_post_to_bluesky_reply_thread': 664 tool_call_results[message.tool_call_id] = message.status 665 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 666 elif message.name == 'flag_archival_memory_for_deletion': 667 tool_call_results[message.tool_call_id] = message.status 668 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 669 elif message.name == 'ignore_notification': 670 # Check if the tool was successful 671 if hasattr(message, 'tool_return') and message.status == 'success': 672 # Parse the return value to extract category and reason 673 result_str = str(message.tool_return) 674 if 'IGNORED_NOTIFICATION::' in result_str: 675 parts = result_str.split('::') 676 if len(parts) >= 3: 677 ignore_category = parts[1] 678 ignore_reason = parts[2] 679 ignored_notification = True 680 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 681 elif message.name == 'bluesky_reply': 682 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 683 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 684 logger.error("Update the agent's tools using register_tools.py") 685 # Export agent state before terminating 686 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 687 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 688 exit(1) 689 690 # Second pass: process messages and check for successful tool calls 691 for i, message in enumerate(message_response.messages, 1): 692 # Log concise message info instead of full object 693 msg_type = getattr(message, 'message_type', 'unknown') 694 if hasattr(message, 'reasoning') and message.reasoning: 695 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 696 elif hasattr(message, 'tool_call') and message.tool_call: 697 tool_name = message.tool_call.name 698 logger.debug(f" {i}. {msg_type}: {tool_name}") 699 elif hasattr(message, 'tool_return'): 700 tool_name = getattr(message, 'name', 'unknown_tool') 701 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 702 status = getattr(message, 'status', 'unknown') 703 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 704 elif hasattr(message, 'text'): 705 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 706 else: 707 logger.debug(f" {i}. {msg_type}: <no content>") 708 709 # Check for halt_activity tool call 710 if hasattr(message, 'tool_call') and message.tool_call: 711 if message.tool_call.name == 'halt_activity': 712 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 713 try: 714 args = json.loads(message.tool_call.arguments) 715 reason = args.get('reason', 'Agent requested halt') 716 logger.info(f"Halt reason: {reason}") 717 except: 718 logger.info("Halt reason: <unable to parse>") 719 720 # Delete the queue file before terminating 721 if queue_filepath and queue_filepath.exists(): 722 queue_filepath.unlink() 723 logger.info(f"Deleted queue file: {queue_filepath.name}") 724 725 # Also mark as processed to avoid reprocessing 726 if NOTIFICATION_DB: 727 NOTIFICATION_DB.mark_processed(notification_data.get('uri', ''), status='processed') 728 else: 729 processed_uris = load_processed_notifications() 730 processed_uris.add(notification_data.get('uri', '')) 731 save_processed_notifications(processed_uris) 732 733 # Export agent state before terminating 734 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 735 736 # Exit the program 737 logger.info("=== BOT TERMINATED BY AGENT ===") 738 exit(0) 739 740 # Check for deprecated bluesky_reply tool 741 if hasattr(message, 'tool_call') and message.tool_call: 742 if message.tool_call.name == 'bluesky_reply': 743 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 744 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 745 logger.error("Update the agent's tools using register_tools.py") 746 # Export agent state before terminating 747 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 748 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 749 exit(1) 750 751 # Collect annotate_ack tool calls 752 elif message.tool_call.name == 'annotate_ack': 753 try: 754 args = json.loads(message.tool_call.arguments) 755 note = args.get('note', '') 756 if note: 757 ack_note = note 758 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 759 except json.JSONDecodeError as e: 760 logger.error(f"Failed to parse annotate_ack arguments: {e}") 761 762 # Collect flag_archival_memory_for_deletion tool calls 763 elif message.tool_call.name == 'flag_archival_memory_for_deletion': 764 try: 765 args = json.loads(message.tool_call.arguments) 766 reason = args.get('reason', '') 767 memory_text = args.get('memory_text', '') 768 confirm = args.get('confirm', False) 769 770 # Only flag for deletion if confirmed and has all required fields 771 if confirm and memory_text and reason: 772 flagged_memories.append({ 773 'reason': reason, 774 'memory_text': memory_text 775 }) 776 logger.debug(f"Found memory flagged for deletion (reason: {reason}): {memory_text[:50]}...") 777 elif not confirm: 778 logger.debug(f"Memory deletion not confirmed, skipping: {memory_text[:50]}...") 779 elif not reason: 780 logger.warning(f"Memory deletion missing reason, skipping: {memory_text[:50]}...") 781 except json.JSONDecodeError as e: 782 logger.error(f"Failed to parse flag_archival_memory_for_deletion arguments: {e}") 783 784 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 785 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 786 tool_call_id = message.tool_call.tool_call_id 787 tool_status = tool_call_results.get(tool_call_id, 'unknown') 788 789 if tool_status == 'success': 790 try: 791 args = json.loads(message.tool_call.arguments) 792 reply_text = args.get('text', '') 793 reply_lang = args.get('lang', 'en-US') 794 795 if reply_text: # Only add if there's actual content 796 reply_candidates.append((reply_text, reply_lang)) 797 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 798 except json.JSONDecodeError as e: 799 logger.error(f"Failed to parse tool call arguments: {e}") 800 elif tool_status == 'error': 801 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 802 else: 803 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 804 805 # Handle archival memory deletion if any were flagged (only if no halt was received) 806 if flagged_memories: 807 logger.info(f"Processing {len(flagged_memories)} flagged memories for deletion") 808 for flagged_memory in flagged_memories: 809 reason = flagged_memory['reason'] 810 memory_text = flagged_memory['memory_text'] 811 812 try: 813 # Search for passages with this exact text 814 logger.debug(f"Searching for passages matching: {memory_text[:100]}...") 815 passages = CLIENT.agents.passages.list( 816 agent_id=void_agent.id, 817 query=memory_text 818 ) 819 820 if not passages: 821 logger.warning(f"No passages found matching flagged memory: {memory_text[:50]}...") 822 continue 823 824 # Delete all matching passages 825 deleted_count = 0 826 for passage in passages: 827 # Check if the passage text exactly matches (to avoid partial matches) 828 if hasattr(passage, 'text') and passage.text == memory_text: 829 try: 830 CLIENT.agents.passages.delete( 831 agent_id=void_agent.id, 832 passage_id=str(passage.id) 833 ) 834 deleted_count += 1 835 logger.debug(f"Deleted passage {passage.id}") 836 except Exception as delete_error: 837 logger.error(f"Failed to delete passage {passage.id}: {delete_error}") 838 839 if deleted_count > 0: 840 logger.info(f"🗑️ Deleted {deleted_count} archival memory passage(s) (reason: {reason}): {memory_text[:50]}...") 841 else: 842 logger.warning(f"No exact matches found for deletion: {memory_text[:50]}...") 843 844 except Exception as e: 845 logger.error(f"Error processing memory deletion: {e}") 846 847 # Check for conflicting tool calls 848 if reply_candidates and ignored_notification: 849 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 850 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 851 logger.warning("Item will be left in queue for manual review") 852 # Return False to keep in queue 853 return False 854 855 if reply_candidates: 856 # Aggregate reply posts into a thread 857 reply_messages = [] 858 reply_langs = [] 859 for text, lang in reply_candidates: 860 reply_messages.append(text) 861 reply_langs.append(lang) 862 863 # Use the first language for the entire thread (could be enhanced later) 864 reply_lang = reply_langs[0] if reply_langs else 'en-US' 865 866 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 867 868 # Display the generated reply thread 869 if len(reply_messages) == 1: 870 content = reply_messages[0] 871 title = f"Reply to @{author_handle}" 872 else: 873 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) 874 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" 875 876 # Format with Unicode characters 877 print(f"\n{title}") 878 print(f" {'' * len(title)}") 879 # Indent content lines 880 for line in content.split('\n'): 881 print(f" {line}") 882 883 # Send the reply(s) with language (unless in testing mode) 884 if testing_mode: 885 logger.info("TESTING MODE: Skipping actual Bluesky post") 886 response = True # Simulate success 887 else: 888 if len(reply_messages) == 1: 889 # Single reply - use existing function 890 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 891 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 892 response = bsky_utils.reply_to_notification( 893 client=atproto_client, 894 notification=notification_data, 895 reply_text=cleaned_text, 896 lang=reply_lang, 897 correlation_id=correlation_id 898 ) 899 else: 900 # Multiple replies - use new threaded function 901 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 902 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 903 response = bsky_utils.reply_with_thread_to_notification( 904 client=atproto_client, 905 notification=notification_data, 906 reply_messages=cleaned_messages, 907 lang=reply_lang, 908 correlation_id=correlation_id 909 ) 910 911 if response: 912 logger.info(f"[{correlation_id}] Successfully replied to @{author_handle}", extra={ 913 'correlation_id': correlation_id, 914 'author_handle': author_handle, 915 'reply_count': len(reply_messages) 916 }) 917 918 # Acknowledge the post we're replying to with stream.thought.ack 919 try: 920 post_uri = notification_data.get('uri') 921 post_cid = notification_data.get('cid') 922 923 if post_uri and post_cid: 924 ack_result = bsky_utils.acknowledge_post( 925 client=atproto_client, 926 post_uri=post_uri, 927 post_cid=post_cid, 928 note=ack_note 929 ) 930 if ack_result: 931 if ack_note: 932 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")") 933 else: 934 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack") 935 else: 936 logger.warning(f"Failed to acknowledge post from @{author_handle}") 937 else: 938 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}") 939 except Exception as e: 940 logger.error(f"Error acknowledging post from @{author_handle}: {e}") 941 # Don't fail the entire operation if acknowledgment fails 942 943 return True 944 else: 945 logger.error(f"Failed to send reply to @{author_handle}") 946 return False 947 else: 948 # Check if notification was explicitly ignored 949 if ignored_notification: 950 logger.info(f"[{correlation_id}] Notification from @{author_handle} was explicitly ignored (category: {ignore_category})", extra={ 951 'correlation_id': correlation_id, 952 'author_handle': author_handle, 953 'ignore_category': ignore_category 954 }) 955 return "ignored" 956 else: 957 logger.warning(f"[{correlation_id}] No reply generated for mention from @{author_handle}, moving to no_reply folder", extra={ 958 'correlation_id': correlation_id, 959 'author_handle': author_handle 960 }) 961 return "no_reply" 962 963 except Exception as e: 964 logger.error(f"[{correlation_id}] Error processing mention: {e}", extra={ 965 'correlation_id': correlation_id, 966 'error': str(e), 967 'error_type': type(e).__name__, 968 'author_handle': author_handle if 'author_handle' in locals() else 'unknown' 969 }) 970 return False 971 finally: 972 # Detach user blocks after agent response (success or failure) 973 if 'attached_handles' in locals() and attached_handles: 974 try: 975 logger.info(f"Detaching user blocks for handles: {attached_handles}") 976 detach_result = detach_user_blocks(attached_handles, void_agent) 977 logger.debug(f"Detach result: {detach_result}") 978 except Exception as detach_error: 979 logger.warning(f"Failed to detach user blocks: {detach_error}") 980 981 982def notification_to_dict(notification): 983 """Convert a notification object to a dictionary for JSON serialization.""" 984 return { 985 'uri': notification.uri, 986 'cid': notification.cid, 987 'reason': notification.reason, 988 'is_read': notification.is_read, 989 'indexed_at': notification.indexed_at, 990 'author': { 991 'handle': notification.author.handle, 992 'display_name': notification.author.display_name, 993 'did': notification.author.did 994 }, 995 'record': { 996 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 997 } 998 } 999 1000 1001def load_processed_notifications(): 1002 """Load the set of processed notification URIs from database.""" 1003 global NOTIFICATION_DB 1004 if NOTIFICATION_DB: 1005 return NOTIFICATION_DB.get_processed_uris(limit=MAX_PROCESSED_NOTIFICATIONS) 1006 return set() 1007 1008 1009def save_processed_notifications(processed_set): 1010 """Save the set of processed notification URIs to database.""" 1011 # This is now handled by marking individual notifications in the DB 1012 # Keeping function for compatibility but it doesn't need to do anything 1013 pass 1014 1015 1016def save_notification_to_queue(notification, is_priority=None): 1017 """Save a notification to the queue directory with priority-based filename.""" 1018 try: 1019 global NOTIFICATION_DB 1020 1021 # Handle both notification objects and dicts 1022 if isinstance(notification, dict): 1023 notif_dict = notification 1024 notification_uri = notification.get('uri') 1025 else: 1026 notif_dict = notification_to_dict(notification) 1027 notification_uri = notification.uri 1028 1029 # Check if already processed (using database if available) 1030 if NOTIFICATION_DB: 1031 if NOTIFICATION_DB.is_processed(notification_uri): 1032 logger.debug(f"Notification already processed (DB): {notification_uri}") 1033 return False 1034 # Add to database - if this fails, don't queue the notification 1035 if not NOTIFICATION_DB.add_notification(notif_dict): 1036 logger.warning(f"Failed to add notification to database, skipping: {notification_uri}") 1037 return False 1038 else: 1039 # Fall back to old JSON method 1040 processed_uris = load_processed_notifications() 1041 if notification_uri in processed_uris: 1042 logger.debug(f"Notification already processed: {notification_uri}") 1043 return False 1044 1045 # Create JSON string 1046 notif_json = json.dumps(notif_dict, sort_keys=True) 1047 1048 # Generate hash for filename (to avoid duplicates) 1049 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 1050 1051 # Determine priority based on author handle or explicit priority 1052 if is_priority is not None: 1053 priority_prefix = "0_" if is_priority else "1_" 1054 else: 1055 if isinstance(notification, dict): 1056 author_handle = notification.get('author', {}).get('handle', '') 1057 else: 1058 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 1059 # Prioritize cameron.pfiffer.org responses 1060 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 1061 1062 # Create filename with priority, timestamp and hash 1063 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 1064 reason = notif_dict.get('reason', 'unknown') 1065 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" 1066 filepath = QUEUE_DIR / filename 1067 1068 # Check if this notification URI is already in the queue 1069 for existing_file in QUEUE_DIR.glob("*.json"): 1070 if existing_file.name == "processed_notifications.json": 1071 continue 1072 try: 1073 with open(existing_file, 'r') as f: 1074 existing_data = json.load(f) 1075 if existing_data.get('uri') == notification_uri: 1076 logger.debug(f"Notification already queued (URI: {notification_uri})") 1077 return False 1078 except: 1079 continue 1080 1081 # Write to file 1082 with open(filepath, 'w') as f: 1083 json.dump(notif_dict, f, indent=2) 1084 1085 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 1086 logger.info(f"Queued notification ({priority_label}): {filename}") 1087 return True 1088 1089 except Exception as e: 1090 logger.error(f"Error saving notification to queue: {e}") 1091 return False 1092 1093 1094def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 1095 """Load and process all notifications from the queue in priority order.""" 1096 try: 1097 # Get all JSON files in queue directory (excluding processed_notifications.json) 1098 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 1099 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1100 1101 # Filter out and delete like notifications immediately 1102 queue_files = [] 1103 likes_deleted = 0 1104 1105 for filepath in all_queue_files: 1106 try: 1107 with open(filepath, 'r') as f: 1108 notif_data = json.load(f) 1109 1110 # If it's a like, delete it immediately and don't process 1111 if notif_data.get('reason') == 'like': 1112 filepath.unlink() 1113 likes_deleted += 1 1114 logger.debug(f"Deleted like notification: {filepath.name}") 1115 else: 1116 queue_files.append(filepath) 1117 except Exception as e: 1118 logger.warning(f"Error checking notification file {filepath.name}: {e}") 1119 queue_files.append(filepath) # Keep it in case it's valid 1120 1121 if likes_deleted > 0: 1122 logger.info(f"Deleted {likes_deleted} like notifications from queue") 1123 1124 if not queue_files: 1125 return 1126 1127 logger.info(f"Processing {len(queue_files)} queued notifications") 1128 1129 # Log current statistics 1130 elapsed_time = time.time() - start_time 1131 total_messages = sum(message_counters.values()) 1132 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1133 1134 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 1135 1136 for i, filepath in enumerate(queue_files, 1): 1137 # Determine if this is a priority notification 1138 is_priority = filepath.name.startswith("0_") 1139 1140 # Check for new notifications periodically during queue processing 1141 # Also check immediately after processing each priority item 1142 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1) 1143 1144 # If we just processed a priority item, immediately check for new priority notifications 1145 if is_priority and i > 1: 1146 should_check_notifications = True 1147 1148 if should_check_notifications: 1149 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") 1150 try: 1151 # Fetch and queue new notifications without processing them 1152 new_count = fetch_and_queue_new_notifications(atproto_client) 1153 1154 if new_count > 0: 1155 logger.info(f"Added {new_count} new notifications to queue") 1156 # Reload the queue files to include the new items 1157 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1158 queue_files = updated_queue_files 1159 logger.info(f"Queue updated: now {len(queue_files)} total items") 1160 except Exception as e: 1161 logger.error(f"Error checking for new notifications: {e}") 1162 1163 priority_label = " [PRIORITY]" if is_priority else "" 1164 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}") 1165 try: 1166 # Load notification data 1167 with open(filepath, 'r') as f: 1168 notif_data = json.load(f) 1169 1170 # Process based on type using dict data directly 1171 success = False 1172 if notif_data['reason'] == "mention": 1173 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1174 if success: 1175 message_counters['mentions'] += 1 1176 elif notif_data['reason'] == "reply": 1177 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1178 if success: 1179 message_counters['replies'] += 1 1180 elif notif_data['reason'] == "follow": 1181 author_handle = notif_data['author']['handle'] 1182 author_display_name = notif_data['author'].get('display_name', 'no display name') 1183 follow_update = f"@{author_handle} ({author_display_name}) started following you." 1184 follow_message = f"Update: {follow_update}" 1185 logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars") 1186 CLIENT.agents.messages.create( 1187 agent_id = void_agent.id, 1188 messages = [{"role":"user", "content": follow_message}] 1189 ) 1190 success = True # Follow updates are always successful 1191 if success: 1192 message_counters['follows'] += 1 1193 elif notif_data['reason'] == "repost": 1194 # Skip reposts silently 1195 success = True # Skip reposts but mark as successful to remove from queue 1196 if success: 1197 message_counters['reposts_skipped'] += 1 1198 elif notif_data['reason'] == "like": 1199 # Skip likes silently 1200 success = True # Skip likes but mark as successful to remove from queue 1201 if success: 1202 message_counters.setdefault('likes_skipped', 0) 1203 message_counters['likes_skipped'] += 1 1204 else: 1205 logger.warning(f"Unknown notification type: {notif_data['reason']}") 1206 success = True # Remove unknown types from queue 1207 1208 # Handle file based on processing result 1209 if success: 1210 if testing_mode: 1211 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") 1212 else: 1213 filepath.unlink() 1214 logger.info(f"Successfully processed and removed: {filepath.name}") 1215 1216 # Mark as processed to avoid reprocessing 1217 if NOTIFICATION_DB: 1218 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='processed') 1219 else: 1220 processed_uris = load_processed_notifications() 1221 processed_uris.add(notif_data['uri']) 1222 save_processed_notifications(processed_uris) 1223 1224 elif success is None: # Special case for moving to error directory 1225 error_path = QUEUE_ERROR_DIR / filepath.name 1226 filepath.rename(error_path) 1227 logger.warning(f"Moved {filepath.name} to errors directory") 1228 1229 # Also mark as processed to avoid retrying 1230 if NOTIFICATION_DB: 1231 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1232 else: 1233 processed_uris = load_processed_notifications() 1234 processed_uris.add(notif_data['uri']) 1235 save_processed_notifications(processed_uris) 1236 1237 elif success == "no_reply": # Special case for moving to no_reply directory 1238 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 1239 filepath.rename(no_reply_path) 1240 logger.info(f"Moved {filepath.name} to no_reply directory") 1241 1242 # Also mark as processed to avoid retrying 1243 if NOTIFICATION_DB: 1244 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1245 else: 1246 processed_uris = load_processed_notifications() 1247 processed_uris.add(notif_data['uri']) 1248 save_processed_notifications(processed_uris) 1249 1250 elif success == "ignored": # Special case for explicitly ignored notifications 1251 # For ignored notifications, we just delete them (not move to no_reply) 1252 filepath.unlink() 1253 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1254 1255 # Also mark as processed to avoid retrying 1256 if NOTIFICATION_DB: 1257 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1258 else: 1259 processed_uris = load_processed_notifications() 1260 processed_uris.add(notif_data['uri']) 1261 save_processed_notifications(processed_uris) 1262 1263 else: 1264 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 1265 1266 except Exception as e: 1267 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 1268 # Keep the file for retry later 1269 1270 except Exception as e: 1271 logger.error(f"Error loading queued notifications: {e}") 1272 1273 1274def fetch_and_queue_new_notifications(atproto_client): 1275 """Fetch new notifications and queue them without processing.""" 1276 try: 1277 global NOTIFICATION_DB 1278 1279 # Get current time for marking notifications as seen 1280 logger.debug("Getting current time for notification marking...") 1281 last_seen_at = atproto_client.get_current_time_iso() 1282 1283 # Get timestamp of last processed notification for filtering 1284 last_processed_time = None 1285 if NOTIFICATION_DB: 1286 last_processed_time = NOTIFICATION_DB.get_latest_processed_time() 1287 if last_processed_time: 1288 logger.debug(f"Last processed notification was at: {last_processed_time}") 1289 1290 # Fetch ALL notifications using pagination 1291 all_notifications = [] 1292 cursor = None 1293 page_count = 0 1294 max_pages = 20 # Safety limit to prevent infinite loops 1295 1296 while page_count < max_pages: 1297 try: 1298 # Fetch notifications page 1299 if cursor: 1300 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1301 params={'cursor': cursor, 'limit': 100} 1302 ) 1303 else: 1304 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1305 params={'limit': 100} 1306 ) 1307 1308 page_count += 1 1309 page_notifications = notifications_response.notifications 1310 1311 if not page_notifications: 1312 break 1313 1314 all_notifications.extend(page_notifications) 1315 1316 # Check if there are more pages 1317 cursor = getattr(notifications_response, 'cursor', None) 1318 if not cursor: 1319 break 1320 1321 except Exception as e: 1322 logger.error(f"Error fetching notifications page {page_count}: {e}") 1323 break 1324 1325 # Now process all fetched notifications 1326 new_count = 0 1327 if all_notifications: 1328 logger.info(f"📥 Fetched {len(all_notifications)} total notifications from API") 1329 1330 # Mark as seen first 1331 try: 1332 atproto_client.app.bsky.notification.update_seen( 1333 data={'seenAt': last_seen_at} 1334 ) 1335 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") 1336 except Exception as e: 1337 logger.error(f"Error marking notifications as seen: {e}") 1338 1339 # Debug counters 1340 skipped_read = 0 1341 skipped_likes = 0 1342 skipped_processed = 0 1343 skipped_old_timestamp = 0 1344 processed_uris = load_processed_notifications() 1345 1346 # Queue all new notifications (except likes) 1347 for notif in all_notifications: 1348 # Skip if older than last processed (when we have timestamp filtering) 1349 if last_processed_time and hasattr(notif, 'indexed_at'): 1350 if notif.indexed_at <= last_processed_time: 1351 skipped_old_timestamp += 1 1352 logger.debug(f"Skipping old notification (indexed_at {notif.indexed_at} <= {last_processed_time})") 1353 continue 1354 1355 # Debug: Log is_read status but DON'T skip based on it 1356 if hasattr(notif, 'is_read') and notif.is_read: 1357 skipped_read += 1 1358 logger.debug(f"Notification has is_read=True (but processing anyway): {notif.uri if hasattr(notif, 'uri') else 'unknown'}") 1359 1360 # Skip likes 1361 if hasattr(notif, 'reason') and notif.reason == 'like': 1362 skipped_likes += 1 1363 continue 1364 1365 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif 1366 1367 # Skip likes in dict form too 1368 if notif_dict.get('reason') == 'like': 1369 continue 1370 1371 # Check if already processed 1372 notif_uri = notif_dict.get('uri', '') 1373 if notif_uri in processed_uris: 1374 skipped_processed += 1 1375 logger.debug(f"Skipping already processed: {notif_uri}") 1376 continue 1377 1378 # Check if it's a priority notification 1379 is_priority = False 1380 1381 # Priority for cameron.pfiffer.org notifications 1382 author_handle = notif_dict.get('author', {}).get('handle', '') 1383 if author_handle == "cameron.pfiffer.org": 1384 is_priority = True 1385 1386 # Also check for priority keywords in mentions 1387 if notif_dict.get('reason') == 'mention': 1388 # Get the mention text to check for priority keywords 1389 record = notif_dict.get('record', {}) 1390 text = record.get('text', '') 1391 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): 1392 is_priority = True 1393 1394 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1395 new_count += 1 1396 logger.debug(f"Queued notification from @{author_handle}: {notif_dict.get('reason', 'unknown')}") 1397 1398 # Log summary of filtering 1399 logger.info(f"📊 Notification processing summary:") 1400 logger.info(f" • Total fetched: {len(all_notifications)}") 1401 logger.info(f" • Had is_read=True: {skipped_read} (not skipped)") 1402 logger.info(f" • Skipped (likes): {skipped_likes}") 1403 logger.info(f" • Skipped (old timestamp): {skipped_old_timestamp}") 1404 logger.info(f" • Skipped (already processed): {skipped_processed}") 1405 logger.info(f" • Queued for processing: {new_count}") 1406 else: 1407 logger.debug("No new notifications to queue") 1408 1409 return new_count 1410 1411 except Exception as e: 1412 logger.error(f"Error fetching and queueing notifications: {e}") 1413 return 0 1414 1415 1416def process_notifications(void_agent, atproto_client, testing_mode=False): 1417 """Fetch new notifications, queue them, and process the queue.""" 1418 try: 1419 # Fetch and queue new notifications 1420 new_count = fetch_and_queue_new_notifications(atproto_client) 1421 1422 if new_count > 0: 1423 logger.info(f"Found {new_count} new notifications to process") 1424 1425 # Now process the entire queue (old + new notifications) 1426 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 1427 1428 except Exception as e: 1429 logger.error(f"Error processing notifications: {e}") 1430 1431 1432def send_synthesis_message(client: Letta, agent_id: str, atproto_client=None) -> None: 1433 """ 1434 Send a synthesis message to the agent every 10 minutes. 1435 This prompts the agent to synthesize its recent experiences. 1436 1437 Args: 1438 client: Letta client 1439 agent_id: Agent ID to send synthesis to 1440 atproto_client: Optional AT Protocol client for posting synthesis results 1441 """ 1442 # Track attached temporal blocks for cleanup 1443 attached_temporal_labels = [] 1444 1445 try: 1446 logger.info("🧠 Preparing synthesis with temporal journal blocks") 1447 1448 # Attach temporal blocks before synthesis 1449 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id) 1450 if not success: 1451 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway") 1452 1453 # Create enhanced synthesis prompt 1454 today = date.today() 1455 synthesis_prompt = f"""Time for synthesis and reflection. 1456 1457You have access to temporal journal blocks for recording your thoughts and experiences: 1458- void_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')}) 1459- void_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')}) 1460- void_year_{today.year}: This year's journal ({today.year}) 1461 1462These journal blocks are attached temporarily for this synthesis session. Use them to: 14631. Record significant interactions and insights from recent experiences 14642. Track patterns in conversations and user behaviors 14653. Note your evolving understanding of the digital social environment 14664. Reflect on your growth and changes in perspective 14675. Document memorable moments or interesting discoveries 1468 1469The journal entries should be cumulative - add to existing content rather than replacing it. 1470Consider both immediate experiences (daily) and longer-term patterns (monthly/yearly). 1471 1472After recording in your journals, synthesize your recent experiences into your core memory blocks 1473(zeitgeist, void-persona, void-humans) as you normally would. 1474 1475Begin your synthesis and journaling now.""" 1476 1477 logger.info("🧠 Sending enhanced synthesis prompt to agent") 1478 1479 # Send synthesis message with streaming to show tool use 1480 message_stream = client.agents.messages.create_stream( 1481 agent_id=agent_id, 1482 messages=[{"role": "user", "content": synthesis_prompt}], 1483 stream_tokens=False, 1484 max_steps=100 1485 ) 1486 1487 # Track synthesis content for potential posting 1488 synthesis_posts = [] 1489 ack_note = None 1490 1491 # Process the streaming response 1492 for chunk in message_stream: 1493 if hasattr(chunk, 'message_type'): 1494 if chunk.message_type == 'reasoning_message': 1495 if SHOW_REASONING: 1496 print("\n◆ Reasoning") 1497 print(" ─────────") 1498 for line in chunk.reasoning.split('\n'): 1499 print(f" {line}") 1500 1501 # Create ATProto record for reasoning (if we have atproto client) 1502 if atproto_client and hasattr(chunk, 'reasoning'): 1503 try: 1504 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 1505 except Exception as e: 1506 logger.debug(f"Failed to create reasoning record during synthesis: {e}") 1507 elif chunk.message_type == 'tool_call_message': 1508 tool_name = chunk.tool_call.name 1509 1510 # Create ATProto record for tool call (if we have atproto client) 1511 if atproto_client: 1512 try: 1513 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 1514 bsky_utils.create_tool_call_record( 1515 atproto_client, 1516 tool_name, 1517 chunk.tool_call.arguments, 1518 tool_call_id 1519 ) 1520 except Exception as e: 1521 logger.debug(f"Failed to create tool call record during synthesis: {e}") 1522 try: 1523 args = json.loads(chunk.tool_call.arguments) 1524 if tool_name == 'archival_memory_search': 1525 query = args.get('query', 'unknown') 1526 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 1527 elif tool_name == 'archival_memory_insert': 1528 content = args.get('content', '') 1529 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue") 1530 elif tool_name == 'update_block': 1531 label = args.get('label', 'unknown') 1532 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', '')) 1533 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 1534 elif tool_name == 'annotate_ack': 1535 note = args.get('note', '') 1536 if note: 1537 ack_note = note 1538 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue") 1539 elif tool_name == 'add_post_to_bluesky_reply_thread': 1540 text = args.get('text', '') 1541 synthesis_posts.append(text) 1542 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue") 1543 else: 1544 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 1545 if len(args_str) > 150: 1546 args_str = args_str[:150] + "..." 1547 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 1548 except: 1549 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 1550 elif chunk.message_type == 'tool_return_message': 1551 if chunk.status == 'success': 1552 log_with_panel("Success", f"Tool result: {chunk.name}", "green") 1553 else: 1554 log_with_panel("Error", f"Tool result: {chunk.name}", "red") 1555 elif chunk.message_type == 'assistant_message': 1556 print("\n▶ Synthesis Response") 1557 print(" ──────────────────") 1558 for line in chunk.content.split('\n'): 1559 print(f" {line}") 1560 1561 if str(chunk) == 'done': 1562 break 1563 1564 logger.info("🧠 Synthesis message processed successfully") 1565 1566 # Handle synthesis acknowledgments if we have an atproto client 1567 if atproto_client and ack_note: 1568 try: 1569 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note) 1570 if result: 1571 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...") 1572 else: 1573 logger.warning("Failed to create synthesis acknowledgment") 1574 except Exception as e: 1575 logger.error(f"Error creating synthesis acknowledgment: {e}") 1576 1577 # Handle synthesis posts if any were generated 1578 if atproto_client and synthesis_posts: 1579 try: 1580 for post_text in synthesis_posts: 1581 cleaned_text = bsky_utils.remove_outside_quotes(post_text) 1582 response = bsky_utils.send_post(atproto_client, cleaned_text) 1583 if response: 1584 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...") 1585 else: 1586 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...") 1587 except Exception as e: 1588 logger.error(f"Error posting synthesis content: {e}") 1589 1590 except Exception as e: 1591 logger.error(f"Error sending synthesis message: {e}") 1592 finally: 1593 # Always detach temporal blocks after synthesis 1594 if attached_temporal_labels: 1595 logger.info("🧠 Detaching temporal journal blocks after synthesis") 1596 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels) 1597 if not detach_success: 1598 logger.warning("Some temporal blocks may not have been detached properly") 1599 1600 1601def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None: 1602 """ 1603 Detach all user blocks from the agent to prevent memory bloat. 1604 This should be called periodically to ensure clean state. 1605 """ 1606 try: 1607 # Get all blocks attached to the agent 1608 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 1609 1610 user_blocks_to_detach = [] 1611 for block in attached_blocks: 1612 if hasattr(block, 'label') and block.label.startswith('user_'): 1613 user_blocks_to_detach.append({ 1614 'label': block.label, 1615 'id': block.id 1616 }) 1617 1618 if not user_blocks_to_detach: 1619 logger.debug("No user blocks found to detach during periodic cleanup") 1620 return 1621 1622 # Detach each user block 1623 detached_count = 0 1624 for block_info in user_blocks_to_detach: 1625 try: 1626 client.agents.blocks.detach( 1627 agent_id=agent_id, 1628 block_id=str(block_info['id']) 1629 ) 1630 detached_count += 1 1631 logger.debug(f"Detached user block: {block_info['label']}") 1632 except Exception as e: 1633 logger.warning(f"Failed to detach block {block_info['label']}: {e}") 1634 1635 if detached_count > 0: 1636 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks") 1637 1638 except Exception as e: 1639 logger.error(f"Error during periodic user block cleanup: {e}") 1640 1641 1642def attach_temporal_blocks(client: Letta, agent_id: str) -> tuple: 1643 """ 1644 Attach temporal journal blocks (day, month, year) to the agent for synthesis. 1645 Creates blocks if they don't exist. 1646 1647 Returns: 1648 Tuple of (success: bool, attached_labels: list) 1649 """ 1650 try: 1651 today = date.today() 1652 1653 # Generate temporal block labels 1654 day_label = f"void_day_{today.strftime('%Y_%m_%d')}" 1655 month_label = f"void_month_{today.strftime('%Y_%m')}" 1656 year_label = f"void_year_{today.year}" 1657 1658 temporal_labels = [day_label, month_label, year_label] 1659 attached_labels = [] 1660 1661 # Get current blocks attached to agent 1662 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1663 current_block_labels = {block.label for block in current_blocks} 1664 current_block_ids = {str(block.id) for block in current_blocks} 1665 1666 for label in temporal_labels: 1667 try: 1668 # Skip if already attached 1669 if label in current_block_labels: 1670 logger.debug(f"Temporal block already attached: {label}") 1671 attached_labels.append(label) 1672 continue 1673 1674 # Check if block exists globally 1675 blocks = client.blocks.list(label=label) 1676 1677 if blocks and len(blocks) > 0: 1678 block = blocks[0] 1679 # Check if already attached by ID 1680 if str(block.id) in current_block_ids: 1681 logger.debug(f"Temporal block already attached by ID: {label}") 1682 attached_labels.append(label) 1683 continue 1684 else: 1685 # Create new temporal block with appropriate header 1686 if "day" in label: 1687 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}" 1688 initial_content = f"{header}\n\nNo entries yet for today." 1689 elif "month" in label: 1690 header = f"# Monthly Journal - {today.strftime('%B %Y')}" 1691 initial_content = f"{header}\n\nNo entries yet for this month." 1692 else: # year 1693 header = f"# Yearly Journal - {today.year}" 1694 initial_content = f"{header}\n\nNo entries yet for this year." 1695 1696 block = client.blocks.create( 1697 label=label, 1698 value=initial_content, 1699 limit=10000 # Larger limit for journal blocks 1700 ) 1701 logger.info(f"Created new temporal block: {label}") 1702 1703 # Attach the block 1704 client.agents.blocks.attach( 1705 agent_id=agent_id, 1706 block_id=str(block.id) 1707 ) 1708 attached_labels.append(label) 1709 logger.info(f"Attached temporal block: {label}") 1710 1711 except Exception as e: 1712 # Check for duplicate constraint errors 1713 error_str = str(e) 1714 if "duplicate key value violates unique constraint" in error_str: 1715 logger.debug(f"Temporal block already attached (constraint): {label}") 1716 attached_labels.append(label) 1717 else: 1718 logger.warning(f"Failed to attach temporal block {label}: {e}") 1719 1720 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}") 1721 return True, attached_labels 1722 1723 except Exception as e: 1724 logger.error(f"Error attaching temporal blocks: {e}") 1725 return False, [] 1726 1727 1728def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: list = None) -> bool: 1729 """ 1730 Detach temporal journal blocks from the agent after synthesis. 1731 1732 Args: 1733 client: Letta client 1734 agent_id: Agent ID 1735 labels_to_detach: Optional list of specific labels to detach. 1736 If None, detaches all temporal blocks. 1737 1738 Returns: 1739 bool: Success status 1740 """ 1741 try: 1742 # If no specific labels provided, generate today's labels 1743 if labels_to_detach is None: 1744 today = date.today() 1745 labels_to_detach = [ 1746 f"void_day_{today.strftime('%Y_%m_%d')}", 1747 f"void_month_{today.strftime('%Y_%m')}", 1748 f"void_year_{today.year}" 1749 ] 1750 1751 # Get current blocks and build label to ID mapping 1752 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1753 block_label_to_id = {block.label: str(block.id) for block in current_blocks} 1754 1755 detached_count = 0 1756 for label in labels_to_detach: 1757 if label in block_label_to_id: 1758 try: 1759 client.agents.blocks.detach( 1760 agent_id=agent_id, 1761 block_id=block_label_to_id[label] 1762 ) 1763 detached_count += 1 1764 logger.debug(f"Detached temporal block: {label}") 1765 except Exception as e: 1766 logger.warning(f"Failed to detach temporal block {label}: {e}") 1767 else: 1768 logger.debug(f"Temporal block not attached: {label}") 1769 1770 logger.info(f"Detached {detached_count} temporal blocks") 1771 return True 1772 1773 except Exception as e: 1774 logger.error(f"Error detaching temporal blocks: {e}") 1775 return False 1776 1777 1778def main(): 1779 # Parse command line arguments 1780 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 1781 parser.add_argument('--config', type=str, default='config.yaml', help='Path to config file (default: config.yaml)') 1782 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1783 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1784 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') 1785 # --rich option removed as we now use simple text formatting 1786 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') 1787 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 1788 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') 1789 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') 1790 args = parser.parse_args() 1791 1792 # Initialize configuration with custom path 1793 global letta_config, CLIENT, QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR, PROCESSED_NOTIFICATIONS_FILE, NOTIFICATION_DB 1794 get_config(args.config) # Initialize the global config instance 1795 letta_config = get_letta_config() 1796 1797 # Initialize queue paths from config 1798 queue_config = get_queue_config() 1799 QUEUE_DIR = Path(queue_config['base_dir']) 1800 QUEUE_ERROR_DIR = Path(queue_config['error_dir']) 1801 QUEUE_NO_REPLY_DIR = Path(queue_config['no_reply_dir']) 1802 PROCESSED_NOTIFICATIONS_FILE = Path(queue_config['processed_file']) 1803 1804 # Create queue directories 1805 QUEUE_DIR.mkdir(exist_ok=True) 1806 QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 1807 QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 1808 1809 # Create Letta client with configuration 1810 CLIENT_PARAMS = { 1811 'token': letta_config['api_key'], 1812 'timeout': letta_config['timeout'] 1813 } 1814 if letta_config.get('base_url'): 1815 CLIENT_PARAMS['base_url'] = letta_config['base_url'] 1816 CLIENT = Letta(**CLIENT_PARAMS) 1817 1818 # Configure logging based on command line arguments 1819 if args.simple_logs: 1820 log_format = "void - %(levelname)s - %(message)s" 1821 else: 1822 # Create custom formatter with symbols 1823 class SymbolFormatter(logging.Formatter): 1824 """Custom formatter that adds symbols for different log levels""" 1825 1826 SYMBOLS = { 1827 logging.DEBUG: '', 1828 logging.INFO: '', 1829 logging.WARNING: '', 1830 logging.ERROR: '', 1831 logging.CRITICAL: '' 1832 } 1833 1834 def format(self, record): 1835 # Get the symbol for this log level 1836 symbol = self.SYMBOLS.get(record.levelno, '') 1837 1838 # Format time as HH:MM:SS 1839 timestamp = self.formatTime(record, "%H:%M:%S") 1840 1841 # Build the formatted message 1842 level_name = f"{record.levelname:<5}" # Left-align, 5 chars 1843 1844 # Use vertical bar as separator 1845 parts = [symbol, timestamp, '', level_name, '', record.getMessage()] 1846 1847 return ' '.join(parts) 1848 1849 # Reset logging configuration 1850 for handler in logging.root.handlers[:]: 1851 logging.root.removeHandler(handler) 1852 1853 # Create handler with custom formatter 1854 handler = logging.StreamHandler() 1855 if not args.simple_logs: 1856 handler.setFormatter(SymbolFormatter()) 1857 else: 1858 handler.setFormatter(logging.Formatter(log_format)) 1859 1860 # Configure root logger 1861 logging.root.setLevel(logging.INFO) 1862 logging.root.addHandler(handler) 1863 1864 global logger, prompt_logger, console 1865 logger = logging.getLogger("void_bot") 1866 logger.setLevel(logging.INFO) 1867 1868 # Create a separate logger for prompts (set to WARNING to hide by default) 1869 prompt_logger = logging.getLogger("void_bot.prompts") 1870 if args.reasoning: 1871 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1872 else: 1873 prompt_logger.setLevel(logging.WARNING) # Hide by default 1874 1875 # Disable httpx logging completely 1876 logging.getLogger("httpx").setLevel(logging.CRITICAL) 1877 1878 # Create Rich console for pretty printing 1879 # Console no longer used - simple text formatting 1880 1881 global TESTING_MODE, SKIP_GIT, SHOW_REASONING 1882 TESTING_MODE = args.test 1883 1884 # Store no-git flag globally for use in export_agent_state calls 1885 SKIP_GIT = args.no_git 1886 1887 # Store rich flag globally 1888 # Rich formatting no longer used 1889 1890 # Store reasoning flag globally 1891 SHOW_REASONING = args.reasoning 1892 1893 if TESTING_MODE: 1894 logger.info("=== RUNNING IN TESTING MODE ===") 1895 logger.info(" - No messages will be sent to Bluesky") 1896 logger.info(" - Queue files will not be deleted") 1897 logger.info(" - Notifications will not be marked as seen") 1898 print("\n") 1899 1900 # Check for synthesis-only mode 1901 SYNTHESIS_ONLY = args.synthesis_only 1902 if SYNTHESIS_ONLY: 1903 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===") 1904 logger.info(" - Only synthesis messages will be sent") 1905 logger.info(" - No notification processing") 1906 logger.info(" - No Bluesky client needed") 1907 print("\n") 1908 """Main bot loop that continuously monitors for notifications.""" 1909 global start_time 1910 start_time = time.time() 1911 logger.info("=== STARTING VOID BOT ===") 1912 void_agent = initialize_void() 1913 logger.info(f"Void agent initialized: {void_agent.id}") 1914 1915 # Initialize notification database with config-based path 1916 logger.info("Initializing notification database...") 1917 NOTIFICATION_DB = NotificationDB(db_path=queue_config['db_path']) 1918 1919 # Migrate from old JSON format if it exists 1920 if PROCESSED_NOTIFICATIONS_FILE.exists(): 1921 logger.info("Found old processed_notifications.json, migrating to database...") 1922 NOTIFICATION_DB.migrate_from_json(str(PROCESSED_NOTIFICATIONS_FILE)) 1923 1924 # Log database stats 1925 db_stats = NOTIFICATION_DB.get_stats() 1926 logger.info(f"Database initialized - Total notifications: {db_stats.get('total', 0)}, Recent (24h): {db_stats.get('recent_24h', 0)}") 1927 1928 # Clean up old records 1929 NOTIFICATION_DB.cleanup_old_records(days=7) 1930 1931 # Ensure correct tools are attached for Bluesky 1932 logger.info("Configuring tools for Bluesky platform...") 1933 try: 1934 from tool_manager import ensure_platform_tools 1935 ensure_platform_tools('bluesky', void_agent.id) 1936 except Exception as e: 1937 logger.error(f"Failed to configure platform tools: {e}") 1938 logger.warning("Continuing with existing tool configuration") 1939 1940 # Check if agent has required tools 1941 if hasattr(void_agent, 'tools') and void_agent.tools: 1942 tool_names = [tool.name for tool in void_agent.tools] 1943 # Check for bluesky-related tools 1944 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1945 if not bluesky_tools: 1946 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 1947 else: 1948 logger.warning("Agent has no tools registered!") 1949 1950 # Clean up all user blocks at startup 1951 logger.info("🧹 Cleaning up user blocks at startup...") 1952 periodic_user_block_cleanup(CLIENT, void_agent.id) 1953 1954 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts) 1955 if not SYNTHESIS_ONLY: 1956 atproto_client = bsky_utils.default_login() 1957 logger.info("Connected to Bluesky") 1958 else: 1959 # In synthesis-only mode, still connect for acks and posts (unless in test mode) 1960 if not args.test: 1961 atproto_client = bsky_utils.default_login() 1962 logger.info("Connected to Bluesky (for synthesis acks/posts)") 1963 else: 1964 atproto_client = None 1965 logger.info("Skipping Bluesky connection (test mode)") 1966 1967 # Configure intervals 1968 CLEANUP_INTERVAL = args.cleanup_interval 1969 SYNTHESIS_INTERVAL = args.synthesis_interval 1970 1971 # Synthesis-only mode 1972 if SYNTHESIS_ONLY: 1973 if SYNTHESIS_INTERVAL <= 0: 1974 logger.error("Synthesis-only mode requires --synthesis-interval > 0") 1975 return 1976 1977 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 1978 1979 while True: 1980 try: 1981 # Send synthesis message immediately on first run 1982 logger.info("🧠 Sending synthesis message") 1983 send_synthesis_message(CLIENT, void_agent.id, atproto_client) 1984 1985 # Wait for next interval 1986 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...") 1987 sleep(SYNTHESIS_INTERVAL) 1988 1989 except KeyboardInterrupt: 1990 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===") 1991 break 1992 except Exception as e: 1993 logger.error(f"Error in synthesis loop: {e}") 1994 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...") 1995 sleep(SYNTHESIS_INTERVAL) 1996 1997 # Normal mode with notification processing 1998 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 1999 2000 cycle_count = 0 2001 2002 if CLEANUP_INTERVAL > 0: 2003 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles") 2004 else: 2005 logger.info("User block cleanup disabled") 2006 2007 if SYNTHESIS_INTERVAL > 0: 2008 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 2009 else: 2010 logger.info("Synthesis messages disabled") 2011 2012 while True: 2013 try: 2014 cycle_count += 1 2015 process_notifications(void_agent, atproto_client, TESTING_MODE) 2016 2017 # Check if synthesis interval has passed 2018 if SYNTHESIS_INTERVAL > 0: 2019 current_time = time.time() 2020 global last_synthesis_time 2021 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL: 2022 logger.info(f"{SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis") 2023 send_synthesis_message(CLIENT, void_agent.id, atproto_client) 2024 last_synthesis_time = current_time 2025 2026 # Run periodic cleanup every N cycles 2027 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 2028 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 2029 periodic_user_block_cleanup(CLIENT, void_agent.id) 2030 2031 # Also check database health when doing cleanup 2032 if NOTIFICATION_DB: 2033 db_stats = NOTIFICATION_DB.get_stats() 2034 pending = db_stats.get('status_pending', 0) 2035 errors = db_stats.get('status_error', 0) 2036 2037 if pending > 50: 2038 logger.warning(f"⚠️ Queue health check: {pending} pending notifications (may be stuck)") 2039 if errors > 20: 2040 logger.warning(f"⚠️ Queue health check: {errors} error notifications") 2041 2042 # Periodic cleanup of old records 2043 if cycle_count % (CLEANUP_INTERVAL * 10) == 0: # Every 100 cycles 2044 logger.info("Running database cleanup of old records...") 2045 NOTIFICATION_DB.cleanup_old_records(days=7) 2046 2047 # Log cycle completion with stats 2048 elapsed_time = time.time() - start_time 2049 total_messages = sum(message_counters.values()) 2050 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 2051 2052 if total_messages > 0: 2053 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 2054 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 2055 2056 except KeyboardInterrupt: 2057 # Final stats 2058 elapsed_time = time.time() - start_time 2059 total_messages = sum(message_counters.values()) 2060 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 2061 2062 logger.info("=== BOT STOPPED BY USER ===") 2063 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 2064 logger.info(f" - {message_counters['mentions']} mentions") 2065 logger.info(f" - {message_counters['replies']} replies") 2066 logger.info(f" - {message_counters['follows']} follows") 2067 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 2068 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 2069 2070 # Close database connection 2071 if NOTIFICATION_DB: 2072 logger.info("Closing database connection...") 2073 NOTIFICATION_DB.close() 2074 2075 break 2076 except Exception as e: 2077 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 2078 logger.error(f"Error details: {e}") 2079 # Wait a bit longer on errors 2080 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 2081 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 2082 2083 2084if __name__ == "__main__": 2085 main()