a digital person for bluesky
1# Rich imports removed - using simple text formatting 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14import argparse 15 16from utils import ( 17 upsert_block, 18 upsert_agent 19) 20 21import bsky_utils 22from tools.blocks import attach_user_blocks, detach_user_blocks 23from datetime import date 24from notification_db import NotificationDB 25 26def extract_handles_from_data(data): 27 """Recursively extract all unique handles from nested data structure.""" 28 handles = set() 29 30 def _extract_recursive(obj): 31 if isinstance(obj, dict): 32 # Check if this dict has a 'handle' key 33 if 'handle' in obj: 34 handles.add(obj['handle']) 35 # Recursively check all values 36 for value in obj.values(): 37 _extract_recursive(value) 38 elif isinstance(obj, list): 39 # Recursively check all list items 40 for item in obj: 41 _extract_recursive(item) 42 43 _extract_recursive(data) 44 return list(handles) 45 46# Logging will be configured after argument parsing 47logger = None 48prompt_logger = None 49# Simple text formatting (Rich no longer used) 50SHOW_REASONING = False 51last_archival_query = "archival memory search" 52 53def log_with_panel(message, title=None, border_color="white"): 54 """Log a message with Unicode box-drawing characters""" 55 if title: 56 # Map old color names to appropriate symbols 57 symbol_map = { 58 "blue": "", # Tool calls 59 "green": "", # Success/completion 60 "yellow": "", # Reasoning 61 "red": "", # Errors 62 "white": "", # Default/mentions 63 "cyan": "", # Posts 64 } 65 symbol = symbol_map.get(border_color, "") 66 67 print(f"\n{symbol} {title}") 68 print(f" {'' * len(title)}") 69 # Indent message lines 70 for line in message.split('\n'): 71 print(f" {line}") 72 else: 73 print(message) 74 75 76# Create a client with extended timeout for LLM operations 77CLIENT= Letta( 78 token=os.environ["LETTA_API_KEY"], 79 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout 80) 81 82# Use the "Bluesky" project 83PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 84 85# Notification check delay 86FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response 87 88# Check for new notifications every N queue items 89CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing 90 91# Queue directory 92QUEUE_DIR = Path("queue") 93QUEUE_DIR.mkdir(exist_ok=True) 94QUEUE_ERROR_DIR = Path("queue/errors") 95QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 96QUEUE_NO_REPLY_DIR = Path("queue/no_reply") 97QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 98PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json") 99 100# Maximum number of processed notifications to track 101MAX_PROCESSED_NOTIFICATIONS = 10000 102 103# Message tracking counters 104message_counters = defaultdict(int) 105start_time = time.time() 106 107# Testing mode flag 108TESTING_MODE = False 109 110# Skip git operations flag 111SKIP_GIT = False 112 113# Synthesis message tracking 114last_synthesis_time = time.time() 115 116# Database for notification tracking 117NOTIFICATION_DB = None 118 119def export_agent_state(client, agent, skip_git=False): 120 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 121 try: 122 # Confirm export with user unless git is being skipped 123 if not skip_git: 124 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 125 if response not in ['y', 'yes']: 126 logger.info("Agent export cancelled by user.") 127 return 128 else: 129 logger.info("Exporting agent state (git staging disabled)") 130 131 # Create directories if they don't exist 132 os.makedirs("agent_archive", exist_ok=True) 133 os.makedirs("agents", exist_ok=True) 134 135 # Export agent data 136 logger.info(f"Exporting agent {agent.id}. This takes some time...") 137 agent_data = client.agents.export_file(agent_id=agent.id) 138 139 # Save timestamped archive copy 140 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 141 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 142 with open(archive_file, 'w', encoding='utf-8') as f: 143 json.dump(agent_data, f, indent=2, ensure_ascii=False) 144 145 # Save current agent state 146 current_file = os.path.join("agents", "void.af") 147 with open(current_file, 'w', encoding='utf-8') as f: 148 json.dump(agent_data, f, indent=2, ensure_ascii=False) 149 150 logger.info(f"Agent exported to {archive_file} and {current_file}") 151 152 # Git add only the current agent file (archive is ignored) unless skip_git is True 153 if not skip_git: 154 try: 155 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 156 logger.info("Added current agent file to git staging") 157 except subprocess.CalledProcessError as e: 158 logger.warning(f"Failed to git add agent file: {e}") 159 160 except Exception as e: 161 logger.error(f"Failed to export agent: {e}") 162 163def initialize_void(): 164 logger.info("Starting void agent initialization...") 165 166 # Get the configured void agent by ID 167 logger.info("Loading void agent from config...") 168 from config_loader import get_letta_config 169 letta_config = get_letta_config() 170 agent_id = letta_config['agent_id'] 171 172 try: 173 void_agent = CLIENT.agents.retrieve(agent_id=agent_id) 174 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})") 175 except Exception as e: 176 logger.error(f"Failed to load void agent {agent_id}: {e}") 177 logger.error("Please ensure the agent_id in config.yaml is correct") 178 raise e 179 180 # Export agent state 181 logger.info("Exporting agent state...") 182 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 183 184 # Log agent details 185 logger.info(f"Void agent details - ID: {void_agent.id}") 186 logger.info(f"Agent name: {void_agent.name}") 187 if hasattr(void_agent, 'llm_config'): 188 logger.info(f"Agent model: {void_agent.llm_config.model}") 189 logger.info(f"Agent project_id: {void_agent.project_id}") 190 if hasattr(void_agent, 'tools'): 191 logger.info(f"Agent has {len(void_agent.tools)} tools") 192 for tool in void_agent.tools[:3]: # Show first 3 tools 193 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 194 195 return void_agent 196 197 198def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 199 """Process a mention and generate a reply using the Letta agent. 200 201 Args: 202 void_agent: The Letta agent instance 203 atproto_client: The AT Protocol client 204 notification_data: The notification data dictionary 205 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 206 207 Returns: 208 True: Successfully processed, remove from queue 209 False: Failed but retryable, keep in queue 210 None: Failed with non-retryable error, move to errors directory 211 "no_reply": No reply was generated, move to no_reply directory 212 """ 213 import uuid 214 215 # Generate correlation ID for tracking this notification through the pipeline 216 correlation_id = str(uuid.uuid4())[:8] 217 218 try: 219 logger.info(f"[{correlation_id}] Starting process_mention", extra={ 220 'correlation_id': correlation_id, 221 'notification_type': type(notification_data).__name__ 222 }) 223 224 # Handle both dict and object inputs for backwards compatibility 225 if isinstance(notification_data, dict): 226 uri = notification_data['uri'] 227 mention_text = notification_data.get('record', {}).get('text', '') 228 author_handle = notification_data['author']['handle'] 229 author_name = notification_data['author'].get('display_name') or author_handle 230 else: 231 # Legacy object access 232 uri = notification_data.uri 233 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 234 author_handle = notification_data.author.handle 235 author_name = notification_data.author.display_name or author_handle 236 237 logger.info(f"[{correlation_id}] Processing mention from @{author_handle}", extra={ 238 'correlation_id': correlation_id, 239 'author_handle': author_handle, 240 'author_name': author_name, 241 'mention_uri': uri, 242 'mention_text_length': len(mention_text), 243 'mention_preview': mention_text[:100] if mention_text else '' 244 }) 245 246 # Retrieve the entire thread associated with the mention 247 try: 248 thread = atproto_client.app.bsky.feed.get_post_thread({ 249 'uri': uri, 250 'parent_height': 40, 251 'depth': 10 252 }) 253 except Exception as e: 254 error_str = str(e) 255 # Check if this is a NotFound error 256 if 'NotFound' in error_str or 'Post not found' in error_str: 257 logger.warning(f"Post not found for URI {uri}, removing from queue") 258 return True # Return True to remove from queue 259 else: 260 # Re-raise other errors 261 logger.error(f"Error fetching thread: {e}") 262 raise 263 264 # Get thread context as YAML string 265 logger.debug("Converting thread to YAML string") 266 try: 267 thread_context = thread_to_yaml_string(thread) 268 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 269 270 # Check if #voidstop appears anywhere in the thread 271 if "#voidstop" in thread_context.lower(): 272 logger.info("Found #voidstop in thread context, skipping this mention") 273 return True # Return True to remove from queue 274 275 # Also check the mention text directly 276 if "#voidstop" in mention_text.lower(): 277 logger.info("Found #voidstop in mention text, skipping this mention") 278 return True # Return True to remove from queue 279 280 # Create a more informative preview by extracting meaningful content 281 lines = thread_context.split('\n') 282 meaningful_lines = [] 283 284 for line in lines: 285 stripped = line.strip() 286 if not stripped: 287 continue 288 289 # Look for lines with actual content (not just structure) 290 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 291 meaningful_lines.append(line) 292 if len(meaningful_lines) >= 5: 293 break 294 295 if meaningful_lines: 296 preview = '\n'.join(meaningful_lines) 297 logger.debug(f"Thread content preview:\n{preview}") 298 else: 299 # If no content fields found, just show it's a thread structure 300 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 301 except Exception as yaml_error: 302 import traceback 303 logger.error(f"Error converting thread to YAML: {yaml_error}") 304 logger.error(f"Full traceback:\n{traceback.format_exc()}") 305 logger.error(f"Thread type: {type(thread)}") 306 if hasattr(thread, '__dict__'): 307 logger.error(f"Thread attributes: {thread.__dict__}") 308 # Try to continue with a simple context 309 thread_context = f"Error processing thread context: {str(yaml_error)}" 310 311 # Create a prompt for the Letta agent with thread context 312 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 313 314MOST RECENT POST (the mention you're responding to): 315"{mention_text}" 316 317FULL THREAD CONTEXT: 318```yaml 319{thread_context} 320``` 321 322The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 323 324To reply, use the add_post_to_bluesky_reply_thread tool: 325- Each call creates one post (max 300 characters) 326- For most responses, a single call is sufficient 327- Only use multiple calls for threaded replies when: 328 * The topic requires extended explanation that cannot fit in 300 characters 329 * You're explicitly asked for a detailed/long response 330 * The conversation naturally benefits from a structured multi-part answer 331- Avoid unnecessary threads - be concise when possible""" 332 333 # Extract all handles from notification and thread data 334 all_handles = set() 335 all_handles.update(extract_handles_from_data(notification_data)) 336 all_handles.update(extract_handles_from_data(thread.model_dump())) 337 unique_handles = list(all_handles) 338 339 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 340 341 # Check if any handles are in known_bots list 342 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs 343 import json 344 345 try: 346 # Check for known bots in thread 347 bot_check_result = check_known_bots(unique_handles, void_agent) 348 bot_check_data = json.loads(bot_check_result) 349 350 # TEMPORARILY DISABLED: Bot detection causing issues with normal users 351 # TODO: Re-enable after debugging why normal users are being flagged as bots 352 if False: # bot_check_data.get("bot_detected", False): 353 detected_bots = bot_check_data.get("detected_bots", []) 354 logger.info(f"Bot detected in thread: {detected_bots}") 355 356 # Decide whether to respond (10% chance) 357 if not should_respond_to_bot_thread(): 358 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}") 359 # Return False to keep in queue for potential later processing 360 return False 361 else: 362 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}") 363 else: 364 logger.debug("Bot detection disabled - processing all notifications") 365 366 except Exception as bot_check_error: 367 logger.warning(f"Error checking for bots: {bot_check_error}") 368 # Continue processing if bot check fails 369 370 # Attach user blocks before agent call 371 attached_handles = [] 372 if unique_handles: 373 try: 374 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 375 attach_result = attach_user_blocks(unique_handles, void_agent) 376 attached_handles = unique_handles # Track successfully attached handles 377 logger.debug(f"Attach result: {attach_result}") 378 except Exception as attach_error: 379 logger.warning(f"Failed to attach user blocks: {attach_error}") 380 # Continue without user blocks rather than failing completely 381 382 # Get response from Letta agent 383 # Format with Unicode characters 384 title = f"MENTION FROM @{author_handle}" 385 print(f"\n{title}") 386 print(f" {'' * len(title)}") 387 # Indent the mention text 388 for line in mention_text.split('\n'): 389 print(f" {line}") 390 391 # Log prompt details to separate logger 392 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 393 394 # Log concise prompt info to main logger 395 thread_handles_count = len(unique_handles) 396 prompt_char_count = len(prompt) 397 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars") 398 399 try: 400 # Use streaming to avoid 524 timeout errors 401 message_stream = CLIENT.agents.messages.create_stream( 402 agent_id=void_agent.id, 403 messages=[{"role": "user", "content": prompt}], 404 stream_tokens=False, # Step streaming only (faster than token streaming) 405 max_steps=100 406 ) 407 408 # Collect the streaming response 409 all_messages = [] 410 for chunk in message_stream: 411 # Log condensed chunk info 412 if hasattr(chunk, 'message_type'): 413 if chunk.message_type == 'reasoning_message': 414 # Show full reasoning without truncation 415 if SHOW_REASONING: 416 # Format with Unicode characters 417 print("\n◆ Reasoning") 418 print(" ─────────") 419 # Indent reasoning lines 420 for line in chunk.reasoning.split('\n'): 421 print(f" {line}") 422 else: 423 # Default log format (only when --reasoning is used due to log level) 424 # Format with Unicode characters 425 print("\n◆ Reasoning") 426 print(" ─────────") 427 # Indent reasoning lines 428 for line in chunk.reasoning.split('\n'): 429 print(f" {line}") 430 431 # Create ATProto record for reasoning (unless in testing mode) 432 if not testing_mode and hasattr(chunk, 'reasoning'): 433 try: 434 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 435 except Exception as e: 436 logger.debug(f"Failed to create reasoning record: {e}") 437 elif chunk.message_type == 'tool_call_message': 438 # Parse tool arguments for better display 439 tool_name = chunk.tool_call.name 440 441 # Create ATProto record for tool call (unless in testing mode) 442 if not testing_mode: 443 try: 444 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 445 bsky_utils.create_tool_call_record( 446 atproto_client, 447 tool_name, 448 chunk.tool_call.arguments, 449 tool_call_id 450 ) 451 except Exception as e: 452 logger.debug(f"Failed to create tool call record: {e}") 453 454 try: 455 args = json.loads(chunk.tool_call.arguments) 456 # Format based on tool type 457 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: 458 # Extract the text being posted 459 text = args.get('text', '') 460 if text: 461 # Format with Unicode characters 462 print("\n✎ Bluesky Post") 463 print(" ────────────") 464 # Indent post text 465 for line in text.split('\n'): 466 print(f" {line}") 467 else: 468 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 469 elif tool_name == 'archival_memory_search': 470 query = args.get('query', 'unknown') 471 global last_archival_query 472 last_archival_query = query 473 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 474 elif tool_name == 'archival_memory_insert': 475 content = args.get('content', '') 476 # Show the full content being inserted 477 log_with_panel(content, f"Tool call: {tool_name}", "blue") 478 elif tool_name == 'update_block': 479 label = args.get('label', 'unknown') 480 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 481 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 482 else: 483 # Generic display for other tools 484 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 485 if len(args_str) > 150: 486 args_str = args_str[:150] + "..." 487 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 488 except: 489 # Fallback to original format if parsing fails 490 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 491 elif chunk.message_type == 'tool_return_message': 492 # Enhanced tool result logging 493 tool_name = chunk.name 494 status = chunk.status 495 496 if status == 'success': 497 # Try to show meaningful result info based on tool type 498 if hasattr(chunk, 'tool_return') and chunk.tool_return: 499 result_str = str(chunk.tool_return) 500 if tool_name == 'archival_memory_search': 501 502 try: 503 # Handle both string and list formats 504 if isinstance(chunk.tool_return, str): 505 # The string format is: "([{...}, {...}], count)" 506 # We need to extract just the list part 507 if chunk.tool_return.strip(): 508 # Find the list part between the first [ and last ] 509 start_idx = chunk.tool_return.find('[') 510 end_idx = chunk.tool_return.rfind(']') 511 if start_idx != -1 and end_idx != -1: 512 list_str = chunk.tool_return[start_idx:end_idx+1] 513 # Use ast.literal_eval since this is Python literal syntax, not JSON 514 import ast 515 results = ast.literal_eval(list_str) 516 else: 517 logger.warning("Could not find list in archival_memory_search result") 518 results = [] 519 else: 520 logger.warning("Empty string returned from archival_memory_search") 521 results = [] 522 else: 523 # If it's already a list, use directly 524 results = chunk.tool_return 525 526 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name}", "green") 527 528 # Use the captured search query from the tool call 529 search_query = last_archival_query 530 531 # Combine all results into a single text block 532 content_text = "" 533 for i, entry in enumerate(results, 1): 534 timestamp = entry.get('timestamp', 'N/A') 535 content = entry.get('content', '') 536 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" 537 538 # Format with Unicode characters 539 title = f"{search_query} ({len(results)} results)" 540 print(f"\n{title}") 541 print(f" {'' * len(title)}") 542 # Indent content text 543 for line in content_text.strip().split('\n'): 544 print(f" {line}") 545 546 except Exception as e: 547 logger.error(f"Error formatting archival memory results: {e}") 548 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name}", "green") 549 elif tool_name == 'add_post_to_bluesky_reply_thread': 550 # Just show success for bluesky posts, the text was already shown in tool call 551 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") 552 elif tool_name == 'archival_memory_insert': 553 # Skip archival memory insert results (always returns None) 554 pass 555 elif tool_name == 'update_block': 556 log_with_panel("Memory block updated", f"Tool result: {tool_name}", "green") 557 else: 558 # Generic success with preview 559 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 560 log_with_panel(preview, f"Tool result: {tool_name}", "green") 561 else: 562 log_with_panel("Success", f"Tool result: {tool_name}", "green") 563 elif status == 'error': 564 # Show error details 565 if tool_name == 'add_post_to_bluesky_reply_thread': 566 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" 567 log_with_panel(error_str, f"Bluesky Post ✗", "red") 568 elif tool_name == 'archival_memory_insert': 569 # Skip archival memory insert errors too 570 pass 571 else: 572 error_preview = "" 573 if hasattr(chunk, 'tool_return') and chunk.tool_return: 574 error_str = str(chunk.tool_return) 575 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 576 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name}", "red") 577 else: 578 log_with_panel("Error occurred", f"Tool result: {tool_name}", "red") 579 else: 580 logger.info(f"Tool result: {tool_name} - {status}") 581 elif chunk.message_type == 'assistant_message': 582 # Format with Unicode characters 583 print("\n▶ Assistant Response") 584 print(" ──────────────────") 585 # Indent response text 586 for line in chunk.content.split('\n'): 587 print(f" {line}") 588 else: 589 # Filter out verbose message types 590 if chunk.message_type not in ['usage_statistics', 'stop_reason']: 591 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") 592 else: 593 logger.info(f"📦 Stream status: {chunk}") 594 595 # Log full chunk for debugging 596 logger.debug(f"Full streaming chunk: {chunk}") 597 all_messages.append(chunk) 598 if str(chunk) == 'done': 599 break 600 601 # Convert streaming response to standard format for compatibility 602 message_response = type('StreamingResponse', (), { 603 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 604 })() 605 except Exception as api_error: 606 import traceback 607 error_str = str(api_error) 608 logger.error(f"Letta API error: {api_error}") 609 logger.error(f"Error type: {type(api_error).__name__}") 610 logger.error(f"Full traceback:\n{traceback.format_exc()}") 611 logger.error(f"Mention text was: {mention_text}") 612 logger.error(f"Author: @{author_handle}") 613 logger.error(f"URI: {uri}") 614 615 616 # Try to extract more info from different error types 617 if hasattr(api_error, 'response'): 618 logger.error(f"Error response object exists") 619 if hasattr(api_error.response, 'text'): 620 logger.error(f"Response text: {api_error.response.text}") 621 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 622 try: 623 logger.error(f"Response JSON: {api_error.response.json()}") 624 except: 625 pass 626 627 # Check for specific error types 628 if hasattr(api_error, 'status_code'): 629 logger.error(f"API Status code: {api_error.status_code}") 630 if hasattr(api_error, 'body'): 631 logger.error(f"API Response body: {api_error.body}") 632 if hasattr(api_error, 'headers'): 633 logger.error(f"API Response headers: {api_error.headers}") 634 635 if api_error.status_code == 413: 636 logger.error("413 Payload Too Large - moving to errors directory") 637 return None # Move to errors directory - payload is too large to ever succeed 638 elif api_error.status_code == 524: 639 logger.error("524 error - timeout from Cloudflare, will retry later") 640 return False # Keep in queue for retry 641 642 # Check if error indicates we should remove from queue 643 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 644 logger.warning("Payload too large error, moving to errors directory") 645 return None # Move to errors directory - cannot be fixed by retry 646 elif 'status_code: 524' in error_str: 647 logger.warning("524 timeout error, keeping in queue for retry") 648 return False # Keep in queue for retry 649 650 raise 651 652 # Log successful response 653 logger.debug("Successfully received response from Letta API") 654 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 655 656 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 657 reply_candidates = [] 658 tool_call_results = {} # Map tool_call_id to status 659 ack_note = None # Track any note from annotate_ack tool 660 661 logger.debug(f"Processing {len(message_response.messages)} response messages...") 662 663 # First pass: collect tool return statuses 664 ignored_notification = False 665 ignore_reason = "" 666 ignore_category = "" 667 668 for message in message_response.messages: 669 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 670 if message.name == 'add_post_to_bluesky_reply_thread': 671 tool_call_results[message.tool_call_id] = message.status 672 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 673 elif message.name == 'ignore_notification': 674 # Check if the tool was successful 675 if hasattr(message, 'tool_return') and message.status == 'success': 676 # Parse the return value to extract category and reason 677 result_str = str(message.tool_return) 678 if 'IGNORED_NOTIFICATION::' in result_str: 679 parts = result_str.split('::') 680 if len(parts) >= 3: 681 ignore_category = parts[1] 682 ignore_reason = parts[2] 683 ignored_notification = True 684 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 685 elif message.name == 'bluesky_reply': 686 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 687 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 688 logger.error("Update the agent's tools using register_tools.py") 689 # Export agent state before terminating 690 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 691 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 692 exit(1) 693 694 # Second pass: process messages and check for successful tool calls 695 for i, message in enumerate(message_response.messages, 1): 696 # Log concise message info instead of full object 697 msg_type = getattr(message, 'message_type', 'unknown') 698 if hasattr(message, 'reasoning') and message.reasoning: 699 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 700 elif hasattr(message, 'tool_call') and message.tool_call: 701 tool_name = message.tool_call.name 702 logger.debug(f" {i}. {msg_type}: {tool_name}") 703 elif hasattr(message, 'tool_return'): 704 tool_name = getattr(message, 'name', 'unknown_tool') 705 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 706 status = getattr(message, 'status', 'unknown') 707 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 708 elif hasattr(message, 'text'): 709 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 710 else: 711 logger.debug(f" {i}. {msg_type}: <no content>") 712 713 # Check for halt_activity tool call 714 if hasattr(message, 'tool_call') and message.tool_call: 715 if message.tool_call.name == 'halt_activity': 716 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 717 try: 718 args = json.loads(message.tool_call.arguments) 719 reason = args.get('reason', 'Agent requested halt') 720 logger.info(f"Halt reason: {reason}") 721 except: 722 logger.info("Halt reason: <unable to parse>") 723 724 # Delete the queue file before terminating 725 if queue_filepath and queue_filepath.exists(): 726 queue_filepath.unlink() 727 logger.info(f"Deleted queue file: {queue_filepath.name}") 728 729 # Also mark as processed to avoid reprocessing 730 if NOTIFICATION_DB: 731 NOTIFICATION_DB.mark_processed(notification_data.get('uri', ''), status='processed') 732 else: 733 processed_uris = load_processed_notifications() 734 processed_uris.add(notification_data.get('uri', '')) 735 save_processed_notifications(processed_uris) 736 737 # Export agent state before terminating 738 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 739 740 # Exit the program 741 logger.info("=== BOT TERMINATED BY AGENT ===") 742 exit(0) 743 744 # Check for deprecated bluesky_reply tool 745 if hasattr(message, 'tool_call') and message.tool_call: 746 if message.tool_call.name == 'bluesky_reply': 747 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 748 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 749 logger.error("Update the agent's tools using register_tools.py") 750 # Export agent state before terminating 751 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 752 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 753 exit(1) 754 755 # Collect annotate_ack tool calls 756 elif message.tool_call.name == 'annotate_ack': 757 try: 758 args = json.loads(message.tool_call.arguments) 759 note = args.get('note', '') 760 if note: 761 ack_note = note 762 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 763 except json.JSONDecodeError as e: 764 logger.error(f"Failed to parse annotate_ack arguments: {e}") 765 766 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 767 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 768 tool_call_id = message.tool_call.tool_call_id 769 tool_status = tool_call_results.get(tool_call_id, 'unknown') 770 771 if tool_status == 'success': 772 try: 773 args = json.loads(message.tool_call.arguments) 774 reply_text = args.get('text', '') 775 reply_lang = args.get('lang', 'en-US') 776 777 if reply_text: # Only add if there's actual content 778 reply_candidates.append((reply_text, reply_lang)) 779 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 780 except json.JSONDecodeError as e: 781 logger.error(f"Failed to parse tool call arguments: {e}") 782 elif tool_status == 'error': 783 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 784 else: 785 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 786 787 # Check for conflicting tool calls 788 if reply_candidates and ignored_notification: 789 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 790 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 791 logger.warning("Item will be left in queue for manual review") 792 # Return False to keep in queue 793 return False 794 795 if reply_candidates: 796 # Aggregate reply posts into a thread 797 reply_messages = [] 798 reply_langs = [] 799 for text, lang in reply_candidates: 800 reply_messages.append(text) 801 reply_langs.append(lang) 802 803 # Use the first language for the entire thread (could be enhanced later) 804 reply_lang = reply_langs[0] if reply_langs else 'en-US' 805 806 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 807 808 # Display the generated reply thread 809 if len(reply_messages) == 1: 810 content = reply_messages[0] 811 title = f"Reply to @{author_handle}" 812 else: 813 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) 814 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" 815 816 # Format with Unicode characters 817 print(f"\n{title}") 818 print(f" {'' * len(title)}") 819 # Indent content lines 820 for line in content.split('\n'): 821 print(f" {line}") 822 823 # Send the reply(s) with language (unless in testing mode) 824 if testing_mode: 825 logger.info("TESTING MODE: Skipping actual Bluesky post") 826 response = True # Simulate success 827 else: 828 if len(reply_messages) == 1: 829 # Single reply - use existing function 830 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 831 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 832 response = bsky_utils.reply_to_notification( 833 client=atproto_client, 834 notification=notification_data, 835 reply_text=cleaned_text, 836 lang=reply_lang, 837 correlation_id=correlation_id 838 ) 839 else: 840 # Multiple replies - use new threaded function 841 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 842 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 843 response = bsky_utils.reply_with_thread_to_notification( 844 client=atproto_client, 845 notification=notification_data, 846 reply_messages=cleaned_messages, 847 lang=reply_lang, 848 correlation_id=correlation_id 849 ) 850 851 if response: 852 logger.info(f"[{correlation_id}] Successfully replied to @{author_handle}", extra={ 853 'correlation_id': correlation_id, 854 'author_handle': author_handle, 855 'reply_count': len(reply_messages) 856 }) 857 858 # Acknowledge the post we're replying to with stream.thought.ack 859 try: 860 post_uri = notification_data.get('uri') 861 post_cid = notification_data.get('cid') 862 863 if post_uri and post_cid: 864 ack_result = bsky_utils.acknowledge_post( 865 client=atproto_client, 866 post_uri=post_uri, 867 post_cid=post_cid, 868 note=ack_note 869 ) 870 if ack_result: 871 if ack_note: 872 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")") 873 else: 874 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack") 875 else: 876 logger.warning(f"Failed to acknowledge post from @{author_handle}") 877 else: 878 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}") 879 except Exception as e: 880 logger.error(f"Error acknowledging post from @{author_handle}: {e}") 881 # Don't fail the entire operation if acknowledgment fails 882 883 return True 884 else: 885 logger.error(f"Failed to send reply to @{author_handle}") 886 return False 887 else: 888 # Check if notification was explicitly ignored 889 if ignored_notification: 890 logger.info(f"[{correlation_id}] Notification from @{author_handle} was explicitly ignored (category: {ignore_category})", extra={ 891 'correlation_id': correlation_id, 892 'author_handle': author_handle, 893 'ignore_category': ignore_category 894 }) 895 return "ignored" 896 else: 897 logger.warning(f"[{correlation_id}] No reply generated for mention from @{author_handle}, moving to no_reply folder", extra={ 898 'correlation_id': correlation_id, 899 'author_handle': author_handle 900 }) 901 return "no_reply" 902 903 except Exception as e: 904 logger.error(f"[{correlation_id}] Error processing mention: {e}", extra={ 905 'correlation_id': correlation_id, 906 'error': str(e), 907 'error_type': type(e).__name__, 908 'author_handle': author_handle if 'author_handle' in locals() else 'unknown' 909 }) 910 return False 911 finally: 912 # Detach user blocks after agent response (success or failure) 913 if 'attached_handles' in locals() and attached_handles: 914 try: 915 logger.info(f"Detaching user blocks for handles: {attached_handles}") 916 detach_result = detach_user_blocks(attached_handles, void_agent) 917 logger.debug(f"Detach result: {detach_result}") 918 except Exception as detach_error: 919 logger.warning(f"Failed to detach user blocks: {detach_error}") 920 921 922def notification_to_dict(notification): 923 """Convert a notification object to a dictionary for JSON serialization.""" 924 return { 925 'uri': notification.uri, 926 'cid': notification.cid, 927 'reason': notification.reason, 928 'is_read': notification.is_read, 929 'indexed_at': notification.indexed_at, 930 'author': { 931 'handle': notification.author.handle, 932 'display_name': notification.author.display_name, 933 'did': notification.author.did 934 }, 935 'record': { 936 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 937 } 938 } 939 940 941def load_processed_notifications(): 942 """Load the set of processed notification URIs from database.""" 943 global NOTIFICATION_DB 944 if NOTIFICATION_DB: 945 return NOTIFICATION_DB.get_processed_uris(limit=MAX_PROCESSED_NOTIFICATIONS) 946 return set() 947 948 949def save_processed_notifications(processed_set): 950 """Save the set of processed notification URIs to database.""" 951 # This is now handled by marking individual notifications in the DB 952 # Keeping function for compatibility but it doesn't need to do anything 953 pass 954 955 956def save_notification_to_queue(notification, is_priority=None): 957 """Save a notification to the queue directory with priority-based filename.""" 958 try: 959 global NOTIFICATION_DB 960 961 # Handle both notification objects and dicts 962 if isinstance(notification, dict): 963 notif_dict = notification 964 notification_uri = notification.get('uri') 965 else: 966 notif_dict = notification_to_dict(notification) 967 notification_uri = notification.uri 968 969 # Check if already processed (using database if available) 970 if NOTIFICATION_DB: 971 if NOTIFICATION_DB.is_processed(notification_uri): 972 logger.debug(f"Notification already processed (DB): {notification_uri}") 973 return False 974 # Add to database 975 NOTIFICATION_DB.add_notification(notif_dict) 976 else: 977 # Fall back to old JSON method 978 processed_uris = load_processed_notifications() 979 if notification_uri in processed_uris: 980 logger.debug(f"Notification already processed: {notification_uri}") 981 return False 982 983 # Create JSON string 984 notif_json = json.dumps(notif_dict, sort_keys=True) 985 986 # Generate hash for filename (to avoid duplicates) 987 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 988 989 # Determine priority based on author handle or explicit priority 990 if is_priority is not None: 991 priority_prefix = "0_" if is_priority else "1_" 992 else: 993 if isinstance(notification, dict): 994 author_handle = notification.get('author', {}).get('handle', '') 995 else: 996 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 997 # Prioritize cameron.pfiffer.org responses 998 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 999 1000 # Create filename with priority, timestamp and hash 1001 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 1002 reason = notif_dict.get('reason', 'unknown') 1003 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" 1004 filepath = QUEUE_DIR / filename 1005 1006 # Check if this notification URI is already in the queue 1007 for existing_file in QUEUE_DIR.glob("*.json"): 1008 if existing_file.name == "processed_notifications.json": 1009 continue 1010 try: 1011 with open(existing_file, 'r') as f: 1012 existing_data = json.load(f) 1013 if existing_data.get('uri') == notification_uri: 1014 logger.debug(f"Notification already queued (URI: {notification_uri})") 1015 return False 1016 except: 1017 continue 1018 1019 # Write to file 1020 with open(filepath, 'w') as f: 1021 json.dump(notif_dict, f, indent=2) 1022 1023 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 1024 logger.info(f"Queued notification ({priority_label}): {filename}") 1025 return True 1026 1027 except Exception as e: 1028 logger.error(f"Error saving notification to queue: {e}") 1029 return False 1030 1031 1032def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 1033 """Load and process all notifications from the queue in priority order.""" 1034 try: 1035 # Get all JSON files in queue directory (excluding processed_notifications.json) 1036 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 1037 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1038 1039 # Filter out and delete like notifications immediately 1040 queue_files = [] 1041 likes_deleted = 0 1042 1043 for filepath in all_queue_files: 1044 try: 1045 with open(filepath, 'r') as f: 1046 notif_data = json.load(f) 1047 1048 # If it's a like, delete it immediately and don't process 1049 if notif_data.get('reason') == 'like': 1050 filepath.unlink() 1051 likes_deleted += 1 1052 logger.debug(f"Deleted like notification: {filepath.name}") 1053 else: 1054 queue_files.append(filepath) 1055 except Exception as e: 1056 logger.warning(f"Error checking notification file {filepath.name}: {e}") 1057 queue_files.append(filepath) # Keep it in case it's valid 1058 1059 if likes_deleted > 0: 1060 logger.info(f"Deleted {likes_deleted} like notifications from queue") 1061 1062 if not queue_files: 1063 return 1064 1065 logger.info(f"Processing {len(queue_files)} queued notifications") 1066 1067 # Log current statistics 1068 elapsed_time = time.time() - start_time 1069 total_messages = sum(message_counters.values()) 1070 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1071 1072 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 1073 1074 for i, filepath in enumerate(queue_files, 1): 1075 # Determine if this is a priority notification 1076 is_priority = filepath.name.startswith("0_") 1077 1078 # Check for new notifications periodically during queue processing 1079 # Also check immediately after processing each priority item 1080 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1) 1081 1082 # If we just processed a priority item, immediately check for new priority notifications 1083 if is_priority and i > 1: 1084 should_check_notifications = True 1085 1086 if should_check_notifications: 1087 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") 1088 try: 1089 # Fetch and queue new notifications without processing them 1090 new_count = fetch_and_queue_new_notifications(atproto_client) 1091 1092 if new_count > 0: 1093 logger.info(f"Added {new_count} new notifications to queue") 1094 # Reload the queue files to include the new items 1095 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1096 queue_files = updated_queue_files 1097 logger.info(f"Queue updated: now {len(queue_files)} total items") 1098 except Exception as e: 1099 logger.error(f"Error checking for new notifications: {e}") 1100 1101 priority_label = " [PRIORITY]" if is_priority else "" 1102 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}") 1103 try: 1104 # Load notification data 1105 with open(filepath, 'r') as f: 1106 notif_data = json.load(f) 1107 1108 # Process based on type using dict data directly 1109 success = False 1110 if notif_data['reason'] == "mention": 1111 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1112 if success: 1113 message_counters['mentions'] += 1 1114 elif notif_data['reason'] == "reply": 1115 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1116 if success: 1117 message_counters['replies'] += 1 1118 elif notif_data['reason'] == "follow": 1119 author_handle = notif_data['author']['handle'] 1120 author_display_name = notif_data['author'].get('display_name', 'no display name') 1121 follow_update = f"@{author_handle} ({author_display_name}) started following you." 1122 follow_message = f"Update: {follow_update}" 1123 logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars") 1124 CLIENT.agents.messages.create( 1125 agent_id = void_agent.id, 1126 messages = [{"role":"user", "content": follow_message}] 1127 ) 1128 success = True # Follow updates are always successful 1129 if success: 1130 message_counters['follows'] += 1 1131 elif notif_data['reason'] == "repost": 1132 # Skip reposts silently 1133 success = True # Skip reposts but mark as successful to remove from queue 1134 if success: 1135 message_counters['reposts_skipped'] += 1 1136 elif notif_data['reason'] == "like": 1137 # Skip likes silently 1138 success = True # Skip likes but mark as successful to remove from queue 1139 if success: 1140 message_counters.setdefault('likes_skipped', 0) 1141 message_counters['likes_skipped'] += 1 1142 else: 1143 logger.warning(f"Unknown notification type: {notif_data['reason']}") 1144 success = True # Remove unknown types from queue 1145 1146 # Handle file based on processing result 1147 if success: 1148 if testing_mode: 1149 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") 1150 else: 1151 filepath.unlink() 1152 logger.info(f"Successfully processed and removed: {filepath.name}") 1153 1154 # Mark as processed to avoid reprocessing 1155 if NOTIFICATION_DB: 1156 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='processed') 1157 else: 1158 processed_uris = load_processed_notifications() 1159 processed_uris.add(notif_data['uri']) 1160 save_processed_notifications(processed_uris) 1161 1162 elif success is None: # Special case for moving to error directory 1163 error_path = QUEUE_ERROR_DIR / filepath.name 1164 filepath.rename(error_path) 1165 logger.warning(f"Moved {filepath.name} to errors directory") 1166 1167 # Also mark as processed to avoid retrying 1168 if NOTIFICATION_DB: 1169 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1170 else: 1171 processed_uris = load_processed_notifications() 1172 processed_uris.add(notif_data['uri']) 1173 save_processed_notifications(processed_uris) 1174 1175 elif success == "no_reply": # Special case for moving to no_reply directory 1176 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 1177 filepath.rename(no_reply_path) 1178 logger.info(f"Moved {filepath.name} to no_reply directory") 1179 1180 # Also mark as processed to avoid retrying 1181 if NOTIFICATION_DB: 1182 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1183 else: 1184 processed_uris = load_processed_notifications() 1185 processed_uris.add(notif_data['uri']) 1186 save_processed_notifications(processed_uris) 1187 1188 elif success == "ignored": # Special case for explicitly ignored notifications 1189 # For ignored notifications, we just delete them (not move to no_reply) 1190 filepath.unlink() 1191 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1192 1193 # Also mark as processed to avoid retrying 1194 if NOTIFICATION_DB: 1195 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1196 else: 1197 processed_uris = load_processed_notifications() 1198 processed_uris.add(notif_data['uri']) 1199 save_processed_notifications(processed_uris) 1200 1201 else: 1202 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 1203 1204 except Exception as e: 1205 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 1206 # Keep the file for retry later 1207 1208 except Exception as e: 1209 logger.error(f"Error loading queued notifications: {e}") 1210 1211 1212def fetch_and_queue_new_notifications(atproto_client): 1213 """Fetch new notifications and queue them without processing.""" 1214 try: 1215 global NOTIFICATION_DB 1216 1217 # Get current time for marking notifications as seen 1218 logger.debug("Getting current time for notification marking...") 1219 last_seen_at = atproto_client.get_current_time_iso() 1220 1221 # Get timestamp of last processed notification for filtering 1222 last_processed_time = None 1223 if NOTIFICATION_DB: 1224 last_processed_time = NOTIFICATION_DB.get_latest_processed_time() 1225 if last_processed_time: 1226 logger.debug(f"Last processed notification was at: {last_processed_time}") 1227 1228 # Fetch ALL notifications using pagination 1229 all_notifications = [] 1230 cursor = None 1231 page_count = 0 1232 max_pages = 20 # Safety limit to prevent infinite loops 1233 1234 while page_count < max_pages: 1235 try: 1236 # Fetch notifications page 1237 if cursor: 1238 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1239 params={'cursor': cursor, 'limit': 100} 1240 ) 1241 else: 1242 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1243 params={'limit': 100} 1244 ) 1245 1246 page_count += 1 1247 page_notifications = notifications_response.notifications 1248 1249 if not page_notifications: 1250 break 1251 1252 all_notifications.extend(page_notifications) 1253 1254 # Check if there are more pages 1255 cursor = getattr(notifications_response, 'cursor', None) 1256 if not cursor: 1257 break 1258 1259 except Exception as e: 1260 logger.error(f"Error fetching notifications page {page_count}: {e}") 1261 break 1262 1263 # Now process all fetched notifications 1264 new_count = 0 1265 if all_notifications: 1266 logger.info(f"📥 Fetched {len(all_notifications)} total notifications from API") 1267 1268 # Mark as seen first 1269 try: 1270 atproto_client.app.bsky.notification.update_seen( 1271 data={'seenAt': last_seen_at} 1272 ) 1273 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") 1274 except Exception as e: 1275 logger.error(f"Error marking notifications as seen: {e}") 1276 1277 # Debug counters 1278 skipped_read = 0 1279 skipped_likes = 0 1280 skipped_processed = 0 1281 skipped_old_timestamp = 0 1282 processed_uris = load_processed_notifications() 1283 1284 # Queue all new notifications (except likes) 1285 for notif in all_notifications: 1286 # Skip if older than last processed (when we have timestamp filtering) 1287 if last_processed_time and hasattr(notif, 'indexed_at'): 1288 if notif.indexed_at <= last_processed_time: 1289 skipped_old_timestamp += 1 1290 logger.debug(f"Skipping old notification (indexed_at {notif.indexed_at} <= {last_processed_time})") 1291 continue 1292 1293 # Debug: Log is_read status but DON'T skip based on it 1294 if hasattr(notif, 'is_read') and notif.is_read: 1295 skipped_read += 1 1296 logger.debug(f"Notification has is_read=True (but processing anyway): {notif.uri if hasattr(notif, 'uri') else 'unknown'}") 1297 1298 # Skip likes 1299 if hasattr(notif, 'reason') and notif.reason == 'like': 1300 skipped_likes += 1 1301 continue 1302 1303 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif 1304 1305 # Skip likes in dict form too 1306 if notif_dict.get('reason') == 'like': 1307 continue 1308 1309 # Check if already processed 1310 notif_uri = notif_dict.get('uri', '') 1311 if notif_uri in processed_uris: 1312 skipped_processed += 1 1313 logger.debug(f"Skipping already processed: {notif_uri}") 1314 continue 1315 1316 # Check if it's a priority notification 1317 is_priority = False 1318 1319 # Priority for cameron.pfiffer.org notifications 1320 author_handle = notif_dict.get('author', {}).get('handle', '') 1321 if author_handle == "cameron.pfiffer.org": 1322 is_priority = True 1323 1324 # Also check for priority keywords in mentions 1325 if notif_dict.get('reason') == 'mention': 1326 # Get the mention text to check for priority keywords 1327 record = notif_dict.get('record', {}) 1328 text = record.get('text', '') 1329 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): 1330 is_priority = True 1331 1332 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1333 new_count += 1 1334 logger.debug(f"Queued notification from @{author_handle}: {notif_dict.get('reason', 'unknown')}") 1335 1336 # Log summary of filtering 1337 logger.info(f"📊 Notification processing summary:") 1338 logger.info(f" • Total fetched: {len(all_notifications)}") 1339 logger.info(f" • Had is_read=True: {skipped_read} (not skipped)") 1340 logger.info(f" • Skipped (likes): {skipped_likes}") 1341 logger.info(f" • Skipped (old timestamp): {skipped_old_timestamp}") 1342 logger.info(f" • Skipped (already processed): {skipped_processed}") 1343 logger.info(f" • Queued for processing: {new_count}") 1344 else: 1345 logger.debug("No new notifications to queue") 1346 1347 return new_count 1348 1349 except Exception as e: 1350 logger.error(f"Error fetching and queueing notifications: {e}") 1351 return 0 1352 1353 1354def process_notifications(void_agent, atproto_client, testing_mode=False): 1355 """Fetch new notifications, queue them, and process the queue.""" 1356 try: 1357 # Fetch and queue new notifications 1358 new_count = fetch_and_queue_new_notifications(atproto_client) 1359 1360 if new_count > 0: 1361 logger.info(f"Found {new_count} new notifications to process") 1362 1363 # Now process the entire queue (old + new notifications) 1364 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 1365 1366 except Exception as e: 1367 logger.error(f"Error processing notifications: {e}") 1368 1369 1370def send_synthesis_message(client: Letta, agent_id: str, atproto_client=None) -> None: 1371 """ 1372 Send a synthesis message to the agent every 10 minutes. 1373 This prompts the agent to synthesize its recent experiences. 1374 1375 Args: 1376 client: Letta client 1377 agent_id: Agent ID to send synthesis to 1378 atproto_client: Optional AT Protocol client for posting synthesis results 1379 """ 1380 # Track attached temporal blocks for cleanup 1381 attached_temporal_labels = [] 1382 1383 try: 1384 logger.info("🧠 Preparing synthesis with temporal journal blocks") 1385 1386 # Attach temporal blocks before synthesis 1387 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id) 1388 if not success: 1389 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway") 1390 1391 # Create enhanced synthesis prompt 1392 today = date.today() 1393 synthesis_prompt = f"""Time for synthesis and reflection. 1394 1395You have access to temporal journal blocks for recording your thoughts and experiences: 1396- void_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')}) 1397- void_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')}) 1398- void_year_{today.year}: This year's journal ({today.year}) 1399 1400These journal blocks are attached temporarily for this synthesis session. Use them to: 14011. Record significant interactions and insights from recent experiences 14022. Track patterns in conversations and user behaviors 14033. Note your evolving understanding of the digital social environment 14044. Reflect on your growth and changes in perspective 14055. Document memorable moments or interesting discoveries 1406 1407The journal entries should be cumulative - add to existing content rather than replacing it. 1408Consider both immediate experiences (daily) and longer-term patterns (monthly/yearly). 1409 1410After recording in your journals, synthesize your recent experiences into your core memory blocks 1411(zeitgeist, void-persona, void-humans) as you normally would. 1412 1413Begin your synthesis and journaling now.""" 1414 1415 logger.info("🧠 Sending enhanced synthesis prompt to agent") 1416 1417 # Send synthesis message with streaming to show tool use 1418 message_stream = client.agents.messages.create_stream( 1419 agent_id=agent_id, 1420 messages=[{"role": "user", "content": synthesis_prompt}], 1421 stream_tokens=False, 1422 max_steps=100 1423 ) 1424 1425 # Track synthesis content for potential posting 1426 synthesis_posts = [] 1427 ack_note = None 1428 1429 # Process the streaming response 1430 for chunk in message_stream: 1431 if hasattr(chunk, 'message_type'): 1432 if chunk.message_type == 'reasoning_message': 1433 if SHOW_REASONING: 1434 print("\n◆ Reasoning") 1435 print(" ─────────") 1436 for line in chunk.reasoning.split('\n'): 1437 print(f" {line}") 1438 1439 # Create ATProto record for reasoning (if we have atproto client) 1440 if atproto_client and hasattr(chunk, 'reasoning'): 1441 try: 1442 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 1443 except Exception as e: 1444 logger.debug(f"Failed to create reasoning record during synthesis: {e}") 1445 elif chunk.message_type == 'tool_call_message': 1446 tool_name = chunk.tool_call.name 1447 1448 # Create ATProto record for tool call (if we have atproto client) 1449 if atproto_client: 1450 try: 1451 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 1452 bsky_utils.create_tool_call_record( 1453 atproto_client, 1454 tool_name, 1455 chunk.tool_call.arguments, 1456 tool_call_id 1457 ) 1458 except Exception as e: 1459 logger.debug(f"Failed to create tool call record during synthesis: {e}") 1460 try: 1461 args = json.loads(chunk.tool_call.arguments) 1462 if tool_name == 'archival_memory_search': 1463 query = args.get('query', 'unknown') 1464 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 1465 elif tool_name == 'archival_memory_insert': 1466 content = args.get('content', '') 1467 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue") 1468 elif tool_name == 'update_block': 1469 label = args.get('label', 'unknown') 1470 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', '')) 1471 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 1472 elif tool_name == 'annotate_ack': 1473 note = args.get('note', '') 1474 if note: 1475 ack_note = note 1476 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue") 1477 elif tool_name == 'add_post_to_bluesky_reply_thread': 1478 text = args.get('text', '') 1479 synthesis_posts.append(text) 1480 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue") 1481 else: 1482 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 1483 if len(args_str) > 150: 1484 args_str = args_str[:150] + "..." 1485 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 1486 except: 1487 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 1488 elif chunk.message_type == 'tool_return_message': 1489 if chunk.status == 'success': 1490 log_with_panel("Success", f"Tool result: {chunk.name}", "green") 1491 else: 1492 log_with_panel("Error", f"Tool result: {chunk.name}", "red") 1493 elif chunk.message_type == 'assistant_message': 1494 print("\n▶ Synthesis Response") 1495 print(" ──────────────────") 1496 for line in chunk.content.split('\n'): 1497 print(f" {line}") 1498 1499 if str(chunk) == 'done': 1500 break 1501 1502 logger.info("🧠 Synthesis message processed successfully") 1503 1504 # Handle synthesis acknowledgments if we have an atproto client 1505 if atproto_client and ack_note: 1506 try: 1507 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note) 1508 if result: 1509 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...") 1510 else: 1511 logger.warning("Failed to create synthesis acknowledgment") 1512 except Exception as e: 1513 logger.error(f"Error creating synthesis acknowledgment: {e}") 1514 1515 # Handle synthesis posts if any were generated 1516 if atproto_client and synthesis_posts: 1517 try: 1518 for post_text in synthesis_posts: 1519 cleaned_text = bsky_utils.remove_outside_quotes(post_text) 1520 response = bsky_utils.send_post(atproto_client, cleaned_text) 1521 if response: 1522 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...") 1523 else: 1524 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...") 1525 except Exception as e: 1526 logger.error(f"Error posting synthesis content: {e}") 1527 1528 except Exception as e: 1529 logger.error(f"Error sending synthesis message: {e}") 1530 finally: 1531 # Always detach temporal blocks after synthesis 1532 if attached_temporal_labels: 1533 logger.info("🧠 Detaching temporal journal blocks after synthesis") 1534 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels) 1535 if not detach_success: 1536 logger.warning("Some temporal blocks may not have been detached properly") 1537 1538 1539def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None: 1540 """ 1541 Detach all user blocks from the agent to prevent memory bloat. 1542 This should be called periodically to ensure clean state. 1543 """ 1544 try: 1545 # Get all blocks attached to the agent 1546 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 1547 1548 user_blocks_to_detach = [] 1549 for block in attached_blocks: 1550 if hasattr(block, 'label') and block.label.startswith('user_'): 1551 user_blocks_to_detach.append({ 1552 'label': block.label, 1553 'id': block.id 1554 }) 1555 1556 if not user_blocks_to_detach: 1557 logger.debug("No user blocks found to detach during periodic cleanup") 1558 return 1559 1560 # Detach each user block 1561 detached_count = 0 1562 for block_info in user_blocks_to_detach: 1563 try: 1564 client.agents.blocks.detach( 1565 agent_id=agent_id, 1566 block_id=str(block_info['id']) 1567 ) 1568 detached_count += 1 1569 logger.debug(f"Detached user block: {block_info['label']}") 1570 except Exception as e: 1571 logger.warning(f"Failed to detach block {block_info['label']}: {e}") 1572 1573 if detached_count > 0: 1574 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks") 1575 1576 except Exception as e: 1577 logger.error(f"Error during periodic user block cleanup: {e}") 1578 1579 1580def attach_temporal_blocks(client: Letta, agent_id: str) -> tuple: 1581 """ 1582 Attach temporal journal blocks (day, month, year) to the agent for synthesis. 1583 Creates blocks if they don't exist. 1584 1585 Returns: 1586 Tuple of (success: bool, attached_labels: list) 1587 """ 1588 try: 1589 today = date.today() 1590 1591 # Generate temporal block labels 1592 day_label = f"void_day_{today.strftime('%Y_%m_%d')}" 1593 month_label = f"void_month_{today.strftime('%Y_%m')}" 1594 year_label = f"void_year_{today.year}" 1595 1596 temporal_labels = [day_label, month_label, year_label] 1597 attached_labels = [] 1598 1599 # Get current blocks attached to agent 1600 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1601 current_block_labels = {block.label for block in current_blocks} 1602 current_block_ids = {str(block.id) for block in current_blocks} 1603 1604 for label in temporal_labels: 1605 try: 1606 # Skip if already attached 1607 if label in current_block_labels: 1608 logger.debug(f"Temporal block already attached: {label}") 1609 attached_labels.append(label) 1610 continue 1611 1612 # Check if block exists globally 1613 blocks = client.blocks.list(label=label) 1614 1615 if blocks and len(blocks) > 0: 1616 block = blocks[0] 1617 # Check if already attached by ID 1618 if str(block.id) in current_block_ids: 1619 logger.debug(f"Temporal block already attached by ID: {label}") 1620 attached_labels.append(label) 1621 continue 1622 else: 1623 # Create new temporal block with appropriate header 1624 if "day" in label: 1625 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}" 1626 initial_content = f"{header}\n\nNo entries yet for today." 1627 elif "month" in label: 1628 header = f"# Monthly Journal - {today.strftime('%B %Y')}" 1629 initial_content = f"{header}\n\nNo entries yet for this month." 1630 else: # year 1631 header = f"# Yearly Journal - {today.year}" 1632 initial_content = f"{header}\n\nNo entries yet for this year." 1633 1634 block = client.blocks.create( 1635 label=label, 1636 value=initial_content, 1637 limit=10000 # Larger limit for journal blocks 1638 ) 1639 logger.info(f"Created new temporal block: {label}") 1640 1641 # Attach the block 1642 client.agents.blocks.attach( 1643 agent_id=agent_id, 1644 block_id=str(block.id) 1645 ) 1646 attached_labels.append(label) 1647 logger.info(f"Attached temporal block: {label}") 1648 1649 except Exception as e: 1650 # Check for duplicate constraint errors 1651 error_str = str(e) 1652 if "duplicate key value violates unique constraint" in error_str: 1653 logger.debug(f"Temporal block already attached (constraint): {label}") 1654 attached_labels.append(label) 1655 else: 1656 logger.warning(f"Failed to attach temporal block {label}: {e}") 1657 1658 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}") 1659 return True, attached_labels 1660 1661 except Exception as e: 1662 logger.error(f"Error attaching temporal blocks: {e}") 1663 return False, [] 1664 1665 1666def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: list = None) -> bool: 1667 """ 1668 Detach temporal journal blocks from the agent after synthesis. 1669 1670 Args: 1671 client: Letta client 1672 agent_id: Agent ID 1673 labels_to_detach: Optional list of specific labels to detach. 1674 If None, detaches all temporal blocks. 1675 1676 Returns: 1677 bool: Success status 1678 """ 1679 try: 1680 # If no specific labels provided, generate today's labels 1681 if labels_to_detach is None: 1682 today = date.today() 1683 labels_to_detach = [ 1684 f"void_day_{today.strftime('%Y_%m_%d')}", 1685 f"void_month_{today.strftime('%Y_%m')}", 1686 f"void_year_{today.year}" 1687 ] 1688 1689 # Get current blocks and build label to ID mapping 1690 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1691 block_label_to_id = {block.label: str(block.id) for block in current_blocks} 1692 1693 detached_count = 0 1694 for label in labels_to_detach: 1695 if label in block_label_to_id: 1696 try: 1697 client.agents.blocks.detach( 1698 agent_id=agent_id, 1699 block_id=block_label_to_id[label] 1700 ) 1701 detached_count += 1 1702 logger.debug(f"Detached temporal block: {label}") 1703 except Exception as e: 1704 logger.warning(f"Failed to detach temporal block {label}: {e}") 1705 else: 1706 logger.debug(f"Temporal block not attached: {label}") 1707 1708 logger.info(f"Detached {detached_count} temporal blocks") 1709 return True 1710 1711 except Exception as e: 1712 logger.error(f"Error detaching temporal blocks: {e}") 1713 return False 1714 1715 1716def main(): 1717 # Parse command line arguments 1718 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 1719 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1720 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1721 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') 1722 # --rich option removed as we now use simple text formatting 1723 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') 1724 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 1725 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') 1726 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') 1727 args = parser.parse_args() 1728 1729 # Configure logging based on command line arguments 1730 if args.simple_logs: 1731 log_format = "void - %(levelname)s - %(message)s" 1732 else: 1733 # Create custom formatter with symbols 1734 class SymbolFormatter(logging.Formatter): 1735 """Custom formatter that adds symbols for different log levels""" 1736 1737 SYMBOLS = { 1738 logging.DEBUG: '', 1739 logging.INFO: '', 1740 logging.WARNING: '', 1741 logging.ERROR: '', 1742 logging.CRITICAL: '' 1743 } 1744 1745 def format(self, record): 1746 # Get the symbol for this log level 1747 symbol = self.SYMBOLS.get(record.levelno, '') 1748 1749 # Format time as HH:MM:SS 1750 timestamp = self.formatTime(record, "%H:%M:%S") 1751 1752 # Build the formatted message 1753 level_name = f"{record.levelname:<5}" # Left-align, 5 chars 1754 1755 # Use vertical bar as separator 1756 parts = [symbol, timestamp, '', level_name, '', record.getMessage()] 1757 1758 return ' '.join(parts) 1759 1760 # Reset logging configuration 1761 for handler in logging.root.handlers[:]: 1762 logging.root.removeHandler(handler) 1763 1764 # Create handler with custom formatter 1765 handler = logging.StreamHandler() 1766 if not args.simple_logs: 1767 handler.setFormatter(SymbolFormatter()) 1768 else: 1769 handler.setFormatter(logging.Formatter(log_format)) 1770 1771 # Configure root logger 1772 logging.root.setLevel(logging.INFO) 1773 logging.root.addHandler(handler) 1774 1775 global logger, prompt_logger, console 1776 logger = logging.getLogger("void_bot") 1777 logger.setLevel(logging.INFO) 1778 1779 # Create a separate logger for prompts (set to WARNING to hide by default) 1780 prompt_logger = logging.getLogger("void_bot.prompts") 1781 if args.reasoning: 1782 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1783 else: 1784 prompt_logger.setLevel(logging.WARNING) # Hide by default 1785 1786 # Disable httpx logging completely 1787 logging.getLogger("httpx").setLevel(logging.CRITICAL) 1788 1789 # Create Rich console for pretty printing 1790 # Console no longer used - simple text formatting 1791 1792 global TESTING_MODE, SKIP_GIT, SHOW_REASONING 1793 TESTING_MODE = args.test 1794 1795 # Store no-git flag globally for use in export_agent_state calls 1796 SKIP_GIT = args.no_git 1797 1798 # Store rich flag globally 1799 # Rich formatting no longer used 1800 1801 # Store reasoning flag globally 1802 SHOW_REASONING = args.reasoning 1803 1804 if TESTING_MODE: 1805 logger.info("=== RUNNING IN TESTING MODE ===") 1806 logger.info(" - No messages will be sent to Bluesky") 1807 logger.info(" - Queue files will not be deleted") 1808 logger.info(" - Notifications will not be marked as seen") 1809 print("\n") 1810 1811 # Check for synthesis-only mode 1812 SYNTHESIS_ONLY = args.synthesis_only 1813 if SYNTHESIS_ONLY: 1814 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===") 1815 logger.info(" - Only synthesis messages will be sent") 1816 logger.info(" - No notification processing") 1817 logger.info(" - No Bluesky client needed") 1818 print("\n") 1819 """Main bot loop that continuously monitors for notifications.""" 1820 global start_time 1821 start_time = time.time() 1822 logger.info("=== STARTING VOID BOT ===") 1823 void_agent = initialize_void() 1824 logger.info(f"Void agent initialized: {void_agent.id}") 1825 1826 # Initialize notification database 1827 global NOTIFICATION_DB 1828 logger.info("Initializing notification database...") 1829 NOTIFICATION_DB = NotificationDB() 1830 1831 # Migrate from old JSON format if it exists 1832 if PROCESSED_NOTIFICATIONS_FILE.exists(): 1833 logger.info("Found old processed_notifications.json, migrating to database...") 1834 NOTIFICATION_DB.migrate_from_json(str(PROCESSED_NOTIFICATIONS_FILE)) 1835 1836 # Log database stats 1837 db_stats = NOTIFICATION_DB.get_stats() 1838 logger.info(f"Database initialized - Total notifications: {db_stats.get('total', 0)}, Recent (24h): {db_stats.get('recent_24h', 0)}") 1839 1840 # Clean up old records 1841 NOTIFICATION_DB.cleanup_old_records(days=7) 1842 1843 # Ensure correct tools are attached for Bluesky 1844 logger.info("Configuring tools for Bluesky platform...") 1845 try: 1846 from tool_manager import ensure_platform_tools 1847 ensure_platform_tools('bluesky', void_agent.id) 1848 except Exception as e: 1849 logger.error(f"Failed to configure platform tools: {e}") 1850 logger.warning("Continuing with existing tool configuration") 1851 1852 # Check if agent has required tools 1853 if hasattr(void_agent, 'tools') and void_agent.tools: 1854 tool_names = [tool.name for tool in void_agent.tools] 1855 # Check for bluesky-related tools 1856 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1857 if not bluesky_tools: 1858 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 1859 else: 1860 logger.warning("Agent has no tools registered!") 1861 1862 # Clean up all user blocks at startup 1863 logger.info("🧹 Cleaning up user blocks at startup...") 1864 periodic_user_block_cleanup(CLIENT, void_agent.id) 1865 1866 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts) 1867 if not SYNTHESIS_ONLY: 1868 atproto_client = bsky_utils.default_login() 1869 logger.info("Connected to Bluesky") 1870 else: 1871 # In synthesis-only mode, still connect for acks and posts (unless in test mode) 1872 if not args.test: 1873 atproto_client = bsky_utils.default_login() 1874 logger.info("Connected to Bluesky (for synthesis acks/posts)") 1875 else: 1876 atproto_client = None 1877 logger.info("Skipping Bluesky connection (test mode)") 1878 1879 # Configure intervals 1880 CLEANUP_INTERVAL = args.cleanup_interval 1881 SYNTHESIS_INTERVAL = args.synthesis_interval 1882 1883 # Synthesis-only mode 1884 if SYNTHESIS_ONLY: 1885 if SYNTHESIS_INTERVAL <= 0: 1886 logger.error("Synthesis-only mode requires --synthesis-interval > 0") 1887 return 1888 1889 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 1890 1891 while True: 1892 try: 1893 # Send synthesis message immediately on first run 1894 logger.info("🧠 Sending synthesis message") 1895 send_synthesis_message(CLIENT, void_agent.id, atproto_client) 1896 1897 # Wait for next interval 1898 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...") 1899 sleep(SYNTHESIS_INTERVAL) 1900 1901 except KeyboardInterrupt: 1902 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===") 1903 break 1904 except Exception as e: 1905 logger.error(f"Error in synthesis loop: {e}") 1906 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...") 1907 sleep(SYNTHESIS_INTERVAL) 1908 1909 # Normal mode with notification processing 1910 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 1911 1912 cycle_count = 0 1913 1914 if CLEANUP_INTERVAL > 0: 1915 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles") 1916 else: 1917 logger.info("User block cleanup disabled") 1918 1919 if SYNTHESIS_INTERVAL > 0: 1920 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 1921 else: 1922 logger.info("Synthesis messages disabled") 1923 1924 while True: 1925 try: 1926 cycle_count += 1 1927 process_notifications(void_agent, atproto_client, TESTING_MODE) 1928 1929 # Check if synthesis interval has passed 1930 if SYNTHESIS_INTERVAL > 0: 1931 current_time = time.time() 1932 global last_synthesis_time 1933 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL: 1934 logger.info(f"{SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis") 1935 send_synthesis_message(CLIENT, void_agent.id, atproto_client) 1936 last_synthesis_time = current_time 1937 1938 # Run periodic cleanup every N cycles 1939 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 1940 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 1941 periodic_user_block_cleanup(CLIENT, void_agent.id) 1942 1943 # Also check database health when doing cleanup 1944 if NOTIFICATION_DB: 1945 db_stats = NOTIFICATION_DB.get_stats() 1946 pending = db_stats.get('status_pending', 0) 1947 errors = db_stats.get('status_error', 0) 1948 1949 if pending > 50: 1950 logger.warning(f"⚠️ Queue health check: {pending} pending notifications (may be stuck)") 1951 if errors > 20: 1952 logger.warning(f"⚠️ Queue health check: {errors} error notifications") 1953 1954 # Periodic cleanup of old records 1955 if cycle_count % (CLEANUP_INTERVAL * 10) == 0: # Every 100 cycles 1956 logger.info("Running database cleanup of old records...") 1957 NOTIFICATION_DB.cleanup_old_records(days=7) 1958 1959 # Log cycle completion with stats 1960 elapsed_time = time.time() - start_time 1961 total_messages = sum(message_counters.values()) 1962 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1963 1964 if total_messages > 0: 1965 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 1966 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 1967 1968 except KeyboardInterrupt: 1969 # Final stats 1970 elapsed_time = time.time() - start_time 1971 total_messages = sum(message_counters.values()) 1972 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1973 1974 logger.info("=== BOT STOPPED BY USER ===") 1975 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 1976 logger.info(f" - {message_counters['mentions']} mentions") 1977 logger.info(f" - {message_counters['replies']} replies") 1978 logger.info(f" - {message_counters['follows']} follows") 1979 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 1980 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 1981 1982 # Close database connection 1983 if NOTIFICATION_DB: 1984 logger.info("Closing database connection...") 1985 NOTIFICATION_DB.close() 1986 1987 break 1988 except Exception as e: 1989 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 1990 logger.error(f"Error details: {e}") 1991 # Wait a bit longer on errors 1992 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 1993 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 1994 1995 1996if __name__ == "__main__": 1997 main()