a digital person for bluesky
1# Rich imports removed - using simple text formatting 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14import argparse 15 16from utils import ( 17 upsert_block, 18 upsert_agent 19) 20 21import bsky_utils 22from tools.blocks import attach_user_blocks, detach_user_blocks 23from datetime import date 24from notification_db import NotificationDB 25 26def extract_handles_from_data(data): 27 """Recursively extract all unique handles from nested data structure.""" 28 handles = set() 29 30 def _extract_recursive(obj): 31 if isinstance(obj, dict): 32 # Check if this dict has a 'handle' key 33 if 'handle' in obj: 34 handles.add(obj['handle']) 35 # Recursively check all values 36 for value in obj.values(): 37 _extract_recursive(value) 38 elif isinstance(obj, list): 39 # Recursively check all list items 40 for item in obj: 41 _extract_recursive(item) 42 43 _extract_recursive(data) 44 return list(handles) 45 46# Logging will be configured after argument parsing 47logger = None 48prompt_logger = None 49# Simple text formatting (Rich no longer used) 50SHOW_REASONING = False 51last_archival_query = "archival memory search" 52 53def log_with_panel(message, title=None, border_color="white"): 54 """Log a message with Unicode box-drawing characters""" 55 if title: 56 # Map old color names to appropriate symbols 57 symbol_map = { 58 "blue": "", # Tool calls 59 "green": "", # Success/completion 60 "yellow": "", # Reasoning 61 "red": "", # Errors 62 "white": "", # Default/mentions 63 "cyan": "", # Posts 64 } 65 symbol = symbol_map.get(border_color, "") 66 67 print(f"\n{symbol} {title}") 68 print(f" {'' * len(title)}") 69 # Indent message lines 70 for line in message.split('\n'): 71 print(f" {line}") 72 else: 73 print(message) 74 75 76# Create a client with extended timeout for LLM operations 77CLIENT= Letta( 78 token=os.environ["LETTA_API_KEY"], 79 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout 80) 81 82# Use the "Bluesky" project 83PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 84 85# Notification check delay 86FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response 87 88# Check for new notifications every N queue items 89CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing 90 91# Queue directory 92QUEUE_DIR = Path("queue") 93QUEUE_DIR.mkdir(exist_ok=True) 94QUEUE_ERROR_DIR = Path("queue/errors") 95QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 96QUEUE_NO_REPLY_DIR = Path("queue/no_reply") 97QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 98PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json") 99 100# Maximum number of processed notifications to track 101MAX_PROCESSED_NOTIFICATIONS = 10000 102 103# Message tracking counters 104message_counters = defaultdict(int) 105start_time = time.time() 106 107# Testing mode flag 108TESTING_MODE = False 109 110# Skip git operations flag 111SKIP_GIT = False 112 113# Synthesis message tracking 114last_synthesis_time = time.time() 115 116# Database for notification tracking 117NOTIFICATION_DB = None 118 119def export_agent_state(client, agent, skip_git=False): 120 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 121 try: 122 # Confirm export with user unless git is being skipped 123 if not skip_git: 124 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 125 if response not in ['y', 'yes']: 126 logger.info("Agent export cancelled by user.") 127 return 128 else: 129 logger.info("Exporting agent state (git staging disabled)") 130 131 # Create directories if they don't exist 132 os.makedirs("agent_archive", exist_ok=True) 133 os.makedirs("agents", exist_ok=True) 134 135 # Export agent data 136 logger.info(f"Exporting agent {agent.id}. This takes some time...") 137 agent_data = client.agents.export_file(agent_id=agent.id) 138 139 # Save timestamped archive copy 140 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 141 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 142 with open(archive_file, 'w', encoding='utf-8') as f: 143 json.dump(agent_data, f, indent=2, ensure_ascii=False) 144 145 # Save current agent state 146 current_file = os.path.join("agents", "void.af") 147 with open(current_file, 'w', encoding='utf-8') as f: 148 json.dump(agent_data, f, indent=2, ensure_ascii=False) 149 150 logger.info(f"Agent exported to {archive_file} and {current_file}") 151 152 # Git add only the current agent file (archive is ignored) unless skip_git is True 153 if not skip_git: 154 try: 155 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 156 logger.info("Added current agent file to git staging") 157 except subprocess.CalledProcessError as e: 158 logger.warning(f"Failed to git add agent file: {e}") 159 160 except Exception as e: 161 logger.error(f"Failed to export agent: {e}") 162 163def initialize_void(): 164 logger.info("Starting void agent initialization...") 165 166 # Get the configured void agent by ID 167 logger.info("Loading void agent from config...") 168 from config_loader import get_letta_config 169 letta_config = get_letta_config() 170 agent_id = letta_config['agent_id'] 171 172 try: 173 void_agent = CLIENT.agents.retrieve(agent_id=agent_id) 174 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})") 175 except Exception as e: 176 logger.error(f"Failed to load void agent {agent_id}: {e}") 177 logger.error("Please ensure the agent_id in config.yaml is correct") 178 raise e 179 180 # Export agent state 181 logger.info("Exporting agent state...") 182 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 183 184 # Log agent details 185 logger.info(f"Void agent details - ID: {void_agent.id}") 186 logger.info(f"Agent name: {void_agent.name}") 187 if hasattr(void_agent, 'llm_config'): 188 logger.info(f"Agent model: {void_agent.llm_config.model}") 189 logger.info(f"Agent project_id: {void_agent.project_id}") 190 if hasattr(void_agent, 'tools'): 191 logger.info(f"Agent has {len(void_agent.tools)} tools") 192 for tool in void_agent.tools[:3]: # Show first 3 tools 193 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 194 195 return void_agent 196 197 198def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 199 """Process a mention and generate a reply using the Letta agent. 200 201 Args: 202 void_agent: The Letta agent instance 203 atproto_client: The AT Protocol client 204 notification_data: The notification data dictionary 205 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 206 207 Returns: 208 True: Successfully processed, remove from queue 209 False: Failed but retryable, keep in queue 210 None: Failed with non-retryable error, move to errors directory 211 "no_reply": No reply was generated, move to no_reply directory 212 """ 213 import uuid 214 215 # Generate correlation ID for tracking this notification through the pipeline 216 correlation_id = str(uuid.uuid4())[:8] 217 218 try: 219 logger.info(f"[{correlation_id}] Starting process_mention", extra={ 220 'correlation_id': correlation_id, 221 'notification_type': type(notification_data).__name__ 222 }) 223 224 # Handle both dict and object inputs for backwards compatibility 225 if isinstance(notification_data, dict): 226 uri = notification_data['uri'] 227 mention_text = notification_data.get('record', {}).get('text', '') 228 author_handle = notification_data['author']['handle'] 229 author_name = notification_data['author'].get('display_name') or author_handle 230 else: 231 # Legacy object access 232 uri = notification_data.uri 233 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 234 author_handle = notification_data.author.handle 235 author_name = notification_data.author.display_name or author_handle 236 237 logger.info(f"[{correlation_id}] Processing mention from @{author_handle}", extra={ 238 'correlation_id': correlation_id, 239 'author_handle': author_handle, 240 'author_name': author_name, 241 'mention_uri': uri, 242 'mention_text_length': len(mention_text), 243 'mention_preview': mention_text[:100] if mention_text else '' 244 }) 245 246 # Retrieve the entire thread associated with the mention 247 try: 248 thread = atproto_client.app.bsky.feed.get_post_thread({ 249 'uri': uri, 250 'parent_height': 40, 251 'depth': 10 252 }) 253 except Exception as e: 254 error_str = str(e) 255 # Check if this is a NotFound error 256 if 'NotFound' in error_str or 'Post not found' in error_str: 257 logger.warning(f"Post not found for URI {uri}, removing from queue") 258 return True # Return True to remove from queue 259 else: 260 # Re-raise other errors 261 logger.error(f"Error fetching thread: {e}") 262 raise 263 264 # Get thread context as YAML string 265 logger.debug("Converting thread to YAML string") 266 try: 267 thread_context = thread_to_yaml_string(thread) 268 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 269 270 # Check if #voidstop appears anywhere in the thread 271 if "#voidstop" in thread_context.lower(): 272 logger.info("Found #voidstop in thread context, skipping this mention") 273 return True # Return True to remove from queue 274 275 # Also check the mention text directly 276 if "#voidstop" in mention_text.lower(): 277 logger.info("Found #voidstop in mention text, skipping this mention") 278 return True # Return True to remove from queue 279 280 # Create a more informative preview by extracting meaningful content 281 lines = thread_context.split('\n') 282 meaningful_lines = [] 283 284 for line in lines: 285 stripped = line.strip() 286 if not stripped: 287 continue 288 289 # Look for lines with actual content (not just structure) 290 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 291 meaningful_lines.append(line) 292 if len(meaningful_lines) >= 5: 293 break 294 295 if meaningful_lines: 296 preview = '\n'.join(meaningful_lines) 297 logger.debug(f"Thread content preview:\n{preview}") 298 else: 299 # If no content fields found, just show it's a thread structure 300 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 301 except Exception as yaml_error: 302 import traceback 303 logger.error(f"Error converting thread to YAML: {yaml_error}") 304 logger.error(f"Full traceback:\n{traceback.format_exc()}") 305 logger.error(f"Thread type: {type(thread)}") 306 if hasattr(thread, '__dict__'): 307 logger.error(f"Thread attributes: {thread.__dict__}") 308 # Try to continue with a simple context 309 thread_context = f"Error processing thread context: {str(yaml_error)}" 310 311 # Create a prompt for the Letta agent with thread context 312 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 313 314MOST RECENT POST (the mention you're responding to): 315"{mention_text}" 316 317FULL THREAD CONTEXT: 318```yaml 319{thread_context} 320``` 321 322The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 323 324To reply, use the add_post_to_bluesky_reply_thread tool: 325- Each call creates one post (max 300 characters) 326- For most responses, a single call is sufficient 327- Only use multiple calls for threaded replies when: 328 * The topic requires extended explanation that cannot fit in 300 characters 329 * You're explicitly asked for a detailed/long response 330 * The conversation naturally benefits from a structured multi-part answer 331- Avoid unnecessary threads - be concise when possible""" 332 333 # Extract all handles from notification and thread data 334 all_handles = set() 335 all_handles.update(extract_handles_from_data(notification_data)) 336 all_handles.update(extract_handles_from_data(thread.model_dump())) 337 unique_handles = list(all_handles) 338 339 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 340 341 # Check if any handles are in known_bots list 342 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs 343 import json 344 345 try: 346 # Check for known bots in thread 347 bot_check_result = check_known_bots(unique_handles, void_agent) 348 bot_check_data = json.loads(bot_check_result) 349 350 # TEMPORARILY DISABLED: Bot detection causing issues with normal users 351 # TODO: Re-enable after debugging why normal users are being flagged as bots 352 if False: # bot_check_data.get("bot_detected", False): 353 detected_bots = bot_check_data.get("detected_bots", []) 354 logger.info(f"Bot detected in thread: {detected_bots}") 355 356 # Decide whether to respond (10% chance) 357 if not should_respond_to_bot_thread(): 358 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}") 359 # Return False to keep in queue for potential later processing 360 return False 361 else: 362 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}") 363 else: 364 logger.debug("Bot detection disabled - processing all notifications") 365 366 except Exception as bot_check_error: 367 logger.warning(f"Error checking for bots: {bot_check_error}") 368 # Continue processing if bot check fails 369 370 # Attach user blocks before agent call 371 attached_handles = [] 372 if unique_handles: 373 try: 374 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 375 attach_result = attach_user_blocks(unique_handles, void_agent) 376 attached_handles = unique_handles # Track successfully attached handles 377 logger.debug(f"Attach result: {attach_result}") 378 except Exception as attach_error: 379 logger.warning(f"Failed to attach user blocks: {attach_error}") 380 # Continue without user blocks rather than failing completely 381 382 # Get response from Letta agent 383 # Format with Unicode characters 384 title = f"MENTION FROM @{author_handle}" 385 print(f"\n{title}") 386 print(f" {'' * len(title)}") 387 # Indent the mention text 388 for line in mention_text.split('\n'): 389 print(f" {line}") 390 391 # Log prompt details to separate logger 392 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 393 394 # Log concise prompt info to main logger 395 thread_handles_count = len(unique_handles) 396 prompt_char_count = len(prompt) 397 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars") 398 399 try: 400 # Use streaming to avoid 524 timeout errors 401 message_stream = CLIENT.agents.messages.create_stream( 402 agent_id=void_agent.id, 403 messages=[{"role": "user", "content": prompt}], 404 stream_tokens=False, # Step streaming only (faster than token streaming) 405 max_steps=100 406 ) 407 408 # Collect the streaming response 409 all_messages = [] 410 for chunk in message_stream: 411 # Log condensed chunk info 412 if hasattr(chunk, 'message_type'): 413 if chunk.message_type == 'reasoning_message': 414 # Show full reasoning without truncation 415 if SHOW_REASONING: 416 # Format with Unicode characters 417 print("\n◆ Reasoning") 418 print(" ─────────") 419 # Indent reasoning lines 420 for line in chunk.reasoning.split('\n'): 421 print(f" {line}") 422 else: 423 # Default log format (only when --reasoning is used due to log level) 424 # Format with Unicode characters 425 print("\n◆ Reasoning") 426 print(" ─────────") 427 # Indent reasoning lines 428 for line in chunk.reasoning.split('\n'): 429 print(f" {line}") 430 431 # Create ATProto record for reasoning (unless in testing mode) 432 if not testing_mode and hasattr(chunk, 'reasoning'): 433 try: 434 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 435 except Exception as e: 436 logger.debug(f"Failed to create reasoning record: {e}") 437 elif chunk.message_type == 'tool_call_message': 438 # Parse tool arguments for better display 439 tool_name = chunk.tool_call.name 440 441 # Create ATProto record for tool call (unless in testing mode) 442 if not testing_mode: 443 try: 444 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 445 bsky_utils.create_tool_call_record( 446 atproto_client, 447 tool_name, 448 chunk.tool_call.arguments, 449 tool_call_id 450 ) 451 except Exception as e: 452 logger.debug(f"Failed to create tool call record: {e}") 453 454 try: 455 args = json.loads(chunk.tool_call.arguments) 456 # Format based on tool type 457 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: 458 # Extract the text being posted 459 text = args.get('text', '') 460 if text: 461 # Format with Unicode characters 462 print("\n✎ Bluesky Post") 463 print(" ────────────") 464 # Indent post text 465 for line in text.split('\n'): 466 print(f" {line}") 467 else: 468 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 469 elif tool_name == 'archival_memory_search': 470 query = args.get('query', 'unknown') 471 global last_archival_query 472 last_archival_query = query 473 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 474 elif tool_name == 'archival_memory_insert': 475 content = args.get('content', '') 476 # Show the full content being inserted 477 log_with_panel(content, f"Tool call: {tool_name}", "blue") 478 elif tool_name == 'update_block': 479 label = args.get('label', 'unknown') 480 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 481 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 482 else: 483 # Generic display for other tools 484 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 485 if len(args_str) > 150: 486 args_str = args_str[:150] + "..." 487 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 488 except: 489 # Fallback to original format if parsing fails 490 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 491 elif chunk.message_type == 'tool_return_message': 492 # Enhanced tool result logging 493 tool_name = chunk.name 494 status = chunk.status 495 496 if status == 'success': 497 # Try to show meaningful result info based on tool type 498 if hasattr(chunk, 'tool_return') and chunk.tool_return: 499 result_str = str(chunk.tool_return) 500 if tool_name == 'archival_memory_search': 501 502 try: 503 # Handle both string and list formats 504 if isinstance(chunk.tool_return, str): 505 # The string format is: "([{...}, {...}], count)" 506 # We need to extract just the list part 507 if chunk.tool_return.strip(): 508 # Find the list part between the first [ and last ] 509 start_idx = chunk.tool_return.find('[') 510 end_idx = chunk.tool_return.rfind(']') 511 if start_idx != -1 and end_idx != -1: 512 list_str = chunk.tool_return[start_idx:end_idx+1] 513 # Use ast.literal_eval since this is Python literal syntax, not JSON 514 import ast 515 results = ast.literal_eval(list_str) 516 else: 517 logger.warning("Could not find list in archival_memory_search result") 518 results = [] 519 else: 520 logger.warning("Empty string returned from archival_memory_search") 521 results = [] 522 else: 523 # If it's already a list, use directly 524 results = chunk.tool_return 525 526 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name}", "green") 527 528 # Use the captured search query from the tool call 529 search_query = last_archival_query 530 531 # Combine all results into a single text block 532 content_text = "" 533 for i, entry in enumerate(results, 1): 534 timestamp = entry.get('timestamp', 'N/A') 535 content = entry.get('content', '') 536 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" 537 538 # Format with Unicode characters 539 title = f"{search_query} ({len(results)} results)" 540 print(f"\n{title}") 541 print(f" {'' * len(title)}") 542 # Indent content text 543 for line in content_text.strip().split('\n'): 544 print(f" {line}") 545 546 except Exception as e: 547 logger.error(f"Error formatting archival memory results: {e}") 548 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name}", "green") 549 elif tool_name == 'add_post_to_bluesky_reply_thread': 550 # Just show success for bluesky posts, the text was already shown in tool call 551 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") 552 elif tool_name == 'archival_memory_insert': 553 # Skip archival memory insert results (always returns None) 554 pass 555 elif tool_name == 'update_block': 556 log_with_panel("Memory block updated", f"Tool result: {tool_name}", "green") 557 else: 558 # Generic success with preview 559 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 560 log_with_panel(preview, f"Tool result: {tool_name}", "green") 561 else: 562 log_with_panel("Success", f"Tool result: {tool_name}", "green") 563 elif status == 'error': 564 # Show error details 565 if tool_name == 'add_post_to_bluesky_reply_thread': 566 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" 567 log_with_panel(error_str, f"Bluesky Post ✗", "red") 568 elif tool_name == 'archival_memory_insert': 569 # Skip archival memory insert errors too 570 pass 571 else: 572 error_preview = "" 573 if hasattr(chunk, 'tool_return') and chunk.tool_return: 574 error_str = str(chunk.tool_return) 575 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 576 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name}", "red") 577 else: 578 log_with_panel("Error occurred", f"Tool result: {tool_name}", "red") 579 else: 580 logger.info(f"Tool result: {tool_name} - {status}") 581 elif chunk.message_type == 'assistant_message': 582 # Format with Unicode characters 583 print("\n▶ Assistant Response") 584 print(" ──────────────────") 585 # Indent response text 586 for line in chunk.content.split('\n'): 587 print(f" {line}") 588 else: 589 # Filter out verbose message types 590 if chunk.message_type not in ['usage_statistics', 'stop_reason']: 591 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") 592 else: 593 logger.info(f"📦 Stream status: {chunk}") 594 595 # Log full chunk for debugging 596 logger.debug(f"Full streaming chunk: {chunk}") 597 all_messages.append(chunk) 598 if str(chunk) == 'done': 599 break 600 601 # Convert streaming response to standard format for compatibility 602 message_response = type('StreamingResponse', (), { 603 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 604 })() 605 except Exception as api_error: 606 import traceback 607 error_str = str(api_error) 608 logger.error(f"Letta API error: {api_error}") 609 logger.error(f"Error type: {type(api_error).__name__}") 610 logger.error(f"Full traceback:\n{traceback.format_exc()}") 611 logger.error(f"Mention text was: {mention_text}") 612 logger.error(f"Author: @{author_handle}") 613 logger.error(f"URI: {uri}") 614 615 616 # Try to extract more info from different error types 617 if hasattr(api_error, 'response'): 618 logger.error(f"Error response object exists") 619 if hasattr(api_error.response, 'text'): 620 logger.error(f"Response text: {api_error.response.text}") 621 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 622 try: 623 logger.error(f"Response JSON: {api_error.response.json()}") 624 except: 625 pass 626 627 # Check for specific error types 628 if hasattr(api_error, 'status_code'): 629 logger.error(f"API Status code: {api_error.status_code}") 630 if hasattr(api_error, 'body'): 631 logger.error(f"API Response body: {api_error.body}") 632 if hasattr(api_error, 'headers'): 633 logger.error(f"API Response headers: {api_error.headers}") 634 635 if api_error.status_code == 413: 636 logger.error("413 Payload Too Large - moving to errors directory") 637 return None # Move to errors directory - payload is too large to ever succeed 638 elif api_error.status_code == 524: 639 logger.error("524 error - timeout from Cloudflare, will retry later") 640 return False # Keep in queue for retry 641 642 # Check if error indicates we should remove from queue 643 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 644 logger.warning("Payload too large error, moving to errors directory") 645 return None # Move to errors directory - cannot be fixed by retry 646 elif 'status_code: 524' in error_str: 647 logger.warning("524 timeout error, keeping in queue for retry") 648 return False # Keep in queue for retry 649 650 raise 651 652 # Log successful response 653 logger.debug("Successfully received response from Letta API") 654 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 655 656 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 657 reply_candidates = [] 658 tool_call_results = {} # Map tool_call_id to status 659 ack_note = None # Track any note from annotate_ack tool 660 661 logger.debug(f"Processing {len(message_response.messages)} response messages...") 662 663 # First pass: collect tool return statuses 664 ignored_notification = False 665 ignore_reason = "" 666 ignore_category = "" 667 668 for message in message_response.messages: 669 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 670 if message.name == 'add_post_to_bluesky_reply_thread': 671 tool_call_results[message.tool_call_id] = message.status 672 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 673 elif message.name == 'ignore_notification': 674 # Check if the tool was successful 675 if hasattr(message, 'tool_return') and message.status == 'success': 676 # Parse the return value to extract category and reason 677 result_str = str(message.tool_return) 678 if 'IGNORED_NOTIFICATION::' in result_str: 679 parts = result_str.split('::') 680 if len(parts) >= 3: 681 ignore_category = parts[1] 682 ignore_reason = parts[2] 683 ignored_notification = True 684 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 685 elif message.name == 'bluesky_reply': 686 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 687 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 688 logger.error("Update the agent's tools using register_tools.py") 689 # Export agent state before terminating 690 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 691 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 692 exit(1) 693 694 # Second pass: process messages and check for successful tool calls 695 for i, message in enumerate(message_response.messages, 1): 696 # Log concise message info instead of full object 697 msg_type = getattr(message, 'message_type', 'unknown') 698 if hasattr(message, 'reasoning') and message.reasoning: 699 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 700 elif hasattr(message, 'tool_call') and message.tool_call: 701 tool_name = message.tool_call.name 702 logger.debug(f" {i}. {msg_type}: {tool_name}") 703 elif hasattr(message, 'tool_return'): 704 tool_name = getattr(message, 'name', 'unknown_tool') 705 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 706 status = getattr(message, 'status', 'unknown') 707 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 708 elif hasattr(message, 'text'): 709 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 710 else: 711 logger.debug(f" {i}. {msg_type}: <no content>") 712 713 # Check for halt_activity tool call 714 if hasattr(message, 'tool_call') and message.tool_call: 715 if message.tool_call.name == 'halt_activity': 716 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 717 try: 718 args = json.loads(message.tool_call.arguments) 719 reason = args.get('reason', 'Agent requested halt') 720 logger.info(f"Halt reason: {reason}") 721 except: 722 logger.info("Halt reason: <unable to parse>") 723 724 # Delete the queue file before terminating 725 if queue_filepath and queue_filepath.exists(): 726 queue_filepath.unlink() 727 logger.info(f"Deleted queue file: {queue_filepath.name}") 728 729 # Also mark as processed to avoid reprocessing 730 if NOTIFICATION_DB: 731 NOTIFICATION_DB.mark_processed(notification_data.get('uri', ''), status='processed') 732 else: 733 processed_uris = load_processed_notifications() 734 processed_uris.add(notification_data.get('uri', '')) 735 save_processed_notifications(processed_uris) 736 737 # Export agent state before terminating 738 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 739 740 # Exit the program 741 logger.info("=== BOT TERMINATED BY AGENT ===") 742 exit(0) 743 744 # Check for deprecated bluesky_reply tool 745 if hasattr(message, 'tool_call') and message.tool_call: 746 if message.tool_call.name == 'bluesky_reply': 747 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 748 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 749 logger.error("Update the agent's tools using register_tools.py") 750 # Export agent state before terminating 751 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 752 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 753 exit(1) 754 755 # Collect annotate_ack tool calls 756 elif message.tool_call.name == 'annotate_ack': 757 try: 758 args = json.loads(message.tool_call.arguments) 759 note = args.get('note', '') 760 if note: 761 ack_note = note 762 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 763 except json.JSONDecodeError as e: 764 logger.error(f"Failed to parse annotate_ack arguments: {e}") 765 766 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 767 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 768 tool_call_id = message.tool_call.tool_call_id 769 tool_status = tool_call_results.get(tool_call_id, 'unknown') 770 771 if tool_status == 'success': 772 try: 773 args = json.loads(message.tool_call.arguments) 774 reply_text = args.get('text', '') 775 reply_lang = args.get('lang', 'en-US') 776 777 if reply_text: # Only add if there's actual content 778 reply_candidates.append((reply_text, reply_lang)) 779 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 780 except json.JSONDecodeError as e: 781 logger.error(f"Failed to parse tool call arguments: {e}") 782 elif tool_status == 'error': 783 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 784 else: 785 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 786 787 # Check for conflicting tool calls 788 if reply_candidates and ignored_notification: 789 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 790 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 791 logger.warning("Item will be left in queue for manual review") 792 # Return False to keep in queue 793 return False 794 795 if reply_candidates: 796 # Aggregate reply posts into a thread 797 reply_messages = [] 798 reply_langs = [] 799 for text, lang in reply_candidates: 800 reply_messages.append(text) 801 reply_langs.append(lang) 802 803 # Use the first language for the entire thread (could be enhanced later) 804 reply_lang = reply_langs[0] if reply_langs else 'en-US' 805 806 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 807 808 # Display the generated reply thread 809 if len(reply_messages) == 1: 810 content = reply_messages[0] 811 title = f"Reply to @{author_handle}" 812 else: 813 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) 814 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" 815 816 # Format with Unicode characters 817 print(f"\n{title}") 818 print(f" {'' * len(title)}") 819 # Indent content lines 820 for line in content.split('\n'): 821 print(f" {line}") 822 823 # Send the reply(s) with language (unless in testing mode) 824 if testing_mode: 825 logger.info("TESTING MODE: Skipping actual Bluesky post") 826 response = True # Simulate success 827 else: 828 if len(reply_messages) == 1: 829 # Single reply - use existing function 830 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 831 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 832 response = bsky_utils.reply_to_notification( 833 client=atproto_client, 834 notification=notification_data, 835 reply_text=cleaned_text, 836 lang=reply_lang, 837 correlation_id=correlation_id 838 ) 839 else: 840 # Multiple replies - use new threaded function 841 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 842 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 843 response = bsky_utils.reply_with_thread_to_notification( 844 client=atproto_client, 845 notification=notification_data, 846 reply_messages=cleaned_messages, 847 lang=reply_lang, 848 correlation_id=correlation_id 849 ) 850 851 if response: 852 logger.info(f"[{correlation_id}] Successfully replied to @{author_handle}", extra={ 853 'correlation_id': correlation_id, 854 'author_handle': author_handle, 855 'reply_count': len(reply_messages) 856 }) 857 858 # Acknowledge the post we're replying to with stream.thought.ack 859 try: 860 post_uri = notification_data.get('uri') 861 post_cid = notification_data.get('cid') 862 863 if post_uri and post_cid: 864 ack_result = bsky_utils.acknowledge_post( 865 client=atproto_client, 866 post_uri=post_uri, 867 post_cid=post_cid, 868 note=ack_note 869 ) 870 if ack_result: 871 if ack_note: 872 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")") 873 else: 874 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack") 875 else: 876 logger.warning(f"Failed to acknowledge post from @{author_handle}") 877 else: 878 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}") 879 except Exception as e: 880 logger.error(f"Error acknowledging post from @{author_handle}: {e}") 881 # Don't fail the entire operation if acknowledgment fails 882 883 return True 884 else: 885 logger.error(f"Failed to send reply to @{author_handle}") 886 return False 887 else: 888 # Check if notification was explicitly ignored 889 if ignored_notification: 890 logger.info(f"[{correlation_id}] Notification from @{author_handle} was explicitly ignored (category: {ignore_category})", extra={ 891 'correlation_id': correlation_id, 892 'author_handle': author_handle, 893 'ignore_category': ignore_category 894 }) 895 return "ignored" 896 else: 897 logger.warning(f"[{correlation_id}] No reply generated for mention from @{author_handle}, moving to no_reply folder", extra={ 898 'correlation_id': correlation_id, 899 'author_handle': author_handle 900 }) 901 return "no_reply" 902 903 except Exception as e: 904 logger.error(f"[{correlation_id}] Error processing mention: {e}", extra={ 905 'correlation_id': correlation_id, 906 'error': str(e), 907 'error_type': type(e).__name__, 908 'author_handle': author_handle if 'author_handle' in locals() else 'unknown' 909 }) 910 return False 911 finally: 912 # Detach user blocks after agent response (success or failure) 913 if 'attached_handles' in locals() and attached_handles: 914 try: 915 logger.info(f"Detaching user blocks for handles: {attached_handles}") 916 detach_result = detach_user_blocks(attached_handles, void_agent) 917 logger.debug(f"Detach result: {detach_result}") 918 except Exception as detach_error: 919 logger.warning(f"Failed to detach user blocks: {detach_error}") 920 921 922def notification_to_dict(notification): 923 """Convert a notification object to a dictionary for JSON serialization.""" 924 return { 925 'uri': notification.uri, 926 'cid': notification.cid, 927 'reason': notification.reason, 928 'is_read': notification.is_read, 929 'indexed_at': notification.indexed_at, 930 'author': { 931 'handle': notification.author.handle, 932 'display_name': notification.author.display_name, 933 'did': notification.author.did 934 }, 935 'record': { 936 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 937 } 938 } 939 940 941def load_processed_notifications(): 942 """Load the set of processed notification URIs from database.""" 943 global NOTIFICATION_DB 944 if NOTIFICATION_DB: 945 return NOTIFICATION_DB.get_processed_uris(limit=MAX_PROCESSED_NOTIFICATIONS) 946 return set() 947 948 949def save_processed_notifications(processed_set): 950 """Save the set of processed notification URIs to database.""" 951 # This is now handled by marking individual notifications in the DB 952 # Keeping function for compatibility but it doesn't need to do anything 953 pass 954 955 956def save_notification_to_queue(notification, is_priority=None): 957 """Save a notification to the queue directory with priority-based filename.""" 958 try: 959 global NOTIFICATION_DB 960 961 # Handle both notification objects and dicts 962 if isinstance(notification, dict): 963 notif_dict = notification 964 notification_uri = notification.get('uri') 965 else: 966 notif_dict = notification_to_dict(notification) 967 notification_uri = notification.uri 968 969 # Check if already processed (using database if available) 970 if NOTIFICATION_DB: 971 if NOTIFICATION_DB.is_processed(notification_uri): 972 logger.debug(f"Notification already processed (DB): {notification_uri}") 973 return False 974 # Add to database - if this fails, don't queue the notification 975 if not NOTIFICATION_DB.add_notification(notif_dict): 976 logger.warning(f"Failed to add notification to database, skipping: {notification_uri}") 977 return False 978 else: 979 # Fall back to old JSON method 980 processed_uris = load_processed_notifications() 981 if notification_uri in processed_uris: 982 logger.debug(f"Notification already processed: {notification_uri}") 983 return False 984 985 # Create JSON string 986 notif_json = json.dumps(notif_dict, sort_keys=True) 987 988 # Generate hash for filename (to avoid duplicates) 989 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 990 991 # Determine priority based on author handle or explicit priority 992 if is_priority is not None: 993 priority_prefix = "0_" if is_priority else "1_" 994 else: 995 if isinstance(notification, dict): 996 author_handle = notification.get('author', {}).get('handle', '') 997 else: 998 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 999 # Prioritize cameron.pfiffer.org responses 1000 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 1001 1002 # Create filename with priority, timestamp and hash 1003 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 1004 reason = notif_dict.get('reason', 'unknown') 1005 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" 1006 filepath = QUEUE_DIR / filename 1007 1008 # Check if this notification URI is already in the queue 1009 for existing_file in QUEUE_DIR.glob("*.json"): 1010 if existing_file.name == "processed_notifications.json": 1011 continue 1012 try: 1013 with open(existing_file, 'r') as f: 1014 existing_data = json.load(f) 1015 if existing_data.get('uri') == notification_uri: 1016 logger.debug(f"Notification already queued (URI: {notification_uri})") 1017 return False 1018 except: 1019 continue 1020 1021 # Write to file 1022 with open(filepath, 'w') as f: 1023 json.dump(notif_dict, f, indent=2) 1024 1025 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 1026 logger.info(f"Queued notification ({priority_label}): {filename}") 1027 return True 1028 1029 except Exception as e: 1030 logger.error(f"Error saving notification to queue: {e}") 1031 return False 1032 1033 1034def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 1035 """Load and process all notifications from the queue in priority order.""" 1036 try: 1037 # Get all JSON files in queue directory (excluding processed_notifications.json) 1038 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 1039 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1040 1041 # Filter out and delete like notifications immediately 1042 queue_files = [] 1043 likes_deleted = 0 1044 1045 for filepath in all_queue_files: 1046 try: 1047 with open(filepath, 'r') as f: 1048 notif_data = json.load(f) 1049 1050 # If it's a like, delete it immediately and don't process 1051 if notif_data.get('reason') == 'like': 1052 filepath.unlink() 1053 likes_deleted += 1 1054 logger.debug(f"Deleted like notification: {filepath.name}") 1055 else: 1056 queue_files.append(filepath) 1057 except Exception as e: 1058 logger.warning(f"Error checking notification file {filepath.name}: {e}") 1059 queue_files.append(filepath) # Keep it in case it's valid 1060 1061 if likes_deleted > 0: 1062 logger.info(f"Deleted {likes_deleted} like notifications from queue") 1063 1064 if not queue_files: 1065 return 1066 1067 logger.info(f"Processing {len(queue_files)} queued notifications") 1068 1069 # Log current statistics 1070 elapsed_time = time.time() - start_time 1071 total_messages = sum(message_counters.values()) 1072 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1073 1074 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 1075 1076 for i, filepath in enumerate(queue_files, 1): 1077 # Determine if this is a priority notification 1078 is_priority = filepath.name.startswith("0_") 1079 1080 # Check for new notifications periodically during queue processing 1081 # Also check immediately after processing each priority item 1082 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1) 1083 1084 # If we just processed a priority item, immediately check for new priority notifications 1085 if is_priority and i > 1: 1086 should_check_notifications = True 1087 1088 if should_check_notifications: 1089 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") 1090 try: 1091 # Fetch and queue new notifications without processing them 1092 new_count = fetch_and_queue_new_notifications(atproto_client) 1093 1094 if new_count > 0: 1095 logger.info(f"Added {new_count} new notifications to queue") 1096 # Reload the queue files to include the new items 1097 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1098 queue_files = updated_queue_files 1099 logger.info(f"Queue updated: now {len(queue_files)} total items") 1100 except Exception as e: 1101 logger.error(f"Error checking for new notifications: {e}") 1102 1103 priority_label = " [PRIORITY]" if is_priority else "" 1104 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}") 1105 try: 1106 # Load notification data 1107 with open(filepath, 'r') as f: 1108 notif_data = json.load(f) 1109 1110 # Process based on type using dict data directly 1111 success = False 1112 if notif_data['reason'] == "mention": 1113 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1114 if success: 1115 message_counters['mentions'] += 1 1116 elif notif_data['reason'] == "reply": 1117 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1118 if success: 1119 message_counters['replies'] += 1 1120 elif notif_data['reason'] == "follow": 1121 author_handle = notif_data['author']['handle'] 1122 author_display_name = notif_data['author'].get('display_name', 'no display name') 1123 follow_update = f"@{author_handle} ({author_display_name}) started following you." 1124 follow_message = f"Update: {follow_update}" 1125 logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars") 1126 CLIENT.agents.messages.create( 1127 agent_id = void_agent.id, 1128 messages = [{"role":"user", "content": follow_message}] 1129 ) 1130 success = True # Follow updates are always successful 1131 if success: 1132 message_counters['follows'] += 1 1133 elif notif_data['reason'] == "repost": 1134 # Skip reposts silently 1135 success = True # Skip reposts but mark as successful to remove from queue 1136 if success: 1137 message_counters['reposts_skipped'] += 1 1138 elif notif_data['reason'] == "like": 1139 # Skip likes silently 1140 success = True # Skip likes but mark as successful to remove from queue 1141 if success: 1142 message_counters.setdefault('likes_skipped', 0) 1143 message_counters['likes_skipped'] += 1 1144 else: 1145 logger.warning(f"Unknown notification type: {notif_data['reason']}") 1146 success = True # Remove unknown types from queue 1147 1148 # Handle file based on processing result 1149 if success: 1150 if testing_mode: 1151 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") 1152 else: 1153 filepath.unlink() 1154 logger.info(f"Successfully processed and removed: {filepath.name}") 1155 1156 # Mark as processed to avoid reprocessing 1157 if NOTIFICATION_DB: 1158 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='processed') 1159 else: 1160 processed_uris = load_processed_notifications() 1161 processed_uris.add(notif_data['uri']) 1162 save_processed_notifications(processed_uris) 1163 1164 elif success is None: # Special case for moving to error directory 1165 error_path = QUEUE_ERROR_DIR / filepath.name 1166 filepath.rename(error_path) 1167 logger.warning(f"Moved {filepath.name} to errors directory") 1168 1169 # Also mark as processed to avoid retrying 1170 if NOTIFICATION_DB: 1171 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1172 else: 1173 processed_uris = load_processed_notifications() 1174 processed_uris.add(notif_data['uri']) 1175 save_processed_notifications(processed_uris) 1176 1177 elif success == "no_reply": # Special case for moving to no_reply directory 1178 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 1179 filepath.rename(no_reply_path) 1180 logger.info(f"Moved {filepath.name} to no_reply directory") 1181 1182 # Also mark as processed to avoid retrying 1183 if NOTIFICATION_DB: 1184 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1185 else: 1186 processed_uris = load_processed_notifications() 1187 processed_uris.add(notif_data['uri']) 1188 save_processed_notifications(processed_uris) 1189 1190 elif success == "ignored": # Special case for explicitly ignored notifications 1191 # For ignored notifications, we just delete them (not move to no_reply) 1192 filepath.unlink() 1193 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1194 1195 # Also mark as processed to avoid retrying 1196 if NOTIFICATION_DB: 1197 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error') 1198 else: 1199 processed_uris = load_processed_notifications() 1200 processed_uris.add(notif_data['uri']) 1201 save_processed_notifications(processed_uris) 1202 1203 else: 1204 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 1205 1206 except Exception as e: 1207 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 1208 # Keep the file for retry later 1209 1210 except Exception as e: 1211 logger.error(f"Error loading queued notifications: {e}") 1212 1213 1214def fetch_and_queue_new_notifications(atproto_client): 1215 """Fetch new notifications and queue them without processing.""" 1216 try: 1217 global NOTIFICATION_DB 1218 1219 # Get current time for marking notifications as seen 1220 logger.debug("Getting current time for notification marking...") 1221 last_seen_at = atproto_client.get_current_time_iso() 1222 1223 # Get timestamp of last processed notification for filtering 1224 last_processed_time = None 1225 if NOTIFICATION_DB: 1226 last_processed_time = NOTIFICATION_DB.get_latest_processed_time() 1227 if last_processed_time: 1228 logger.debug(f"Last processed notification was at: {last_processed_time}") 1229 1230 # Fetch ALL notifications using pagination 1231 all_notifications = [] 1232 cursor = None 1233 page_count = 0 1234 max_pages = 20 # Safety limit to prevent infinite loops 1235 1236 while page_count < max_pages: 1237 try: 1238 # Fetch notifications page 1239 if cursor: 1240 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1241 params={'cursor': cursor, 'limit': 100} 1242 ) 1243 else: 1244 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1245 params={'limit': 100} 1246 ) 1247 1248 page_count += 1 1249 page_notifications = notifications_response.notifications 1250 1251 if not page_notifications: 1252 break 1253 1254 all_notifications.extend(page_notifications) 1255 1256 # Check if there are more pages 1257 cursor = getattr(notifications_response, 'cursor', None) 1258 if not cursor: 1259 break 1260 1261 except Exception as e: 1262 logger.error(f"Error fetching notifications page {page_count}: {e}") 1263 break 1264 1265 # Now process all fetched notifications 1266 new_count = 0 1267 if all_notifications: 1268 logger.info(f"📥 Fetched {len(all_notifications)} total notifications from API") 1269 1270 # Mark as seen first 1271 try: 1272 atproto_client.app.bsky.notification.update_seen( 1273 data={'seenAt': last_seen_at} 1274 ) 1275 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") 1276 except Exception as e: 1277 logger.error(f"Error marking notifications as seen: {e}") 1278 1279 # Debug counters 1280 skipped_read = 0 1281 skipped_likes = 0 1282 skipped_processed = 0 1283 skipped_old_timestamp = 0 1284 processed_uris = load_processed_notifications() 1285 1286 # Queue all new notifications (except likes) 1287 for notif in all_notifications: 1288 # Skip if older than last processed (when we have timestamp filtering) 1289 if last_processed_time and hasattr(notif, 'indexed_at'): 1290 if notif.indexed_at <= last_processed_time: 1291 skipped_old_timestamp += 1 1292 logger.debug(f"Skipping old notification (indexed_at {notif.indexed_at} <= {last_processed_time})") 1293 continue 1294 1295 # Debug: Log is_read status but DON'T skip based on it 1296 if hasattr(notif, 'is_read') and notif.is_read: 1297 skipped_read += 1 1298 logger.debug(f"Notification has is_read=True (but processing anyway): {notif.uri if hasattr(notif, 'uri') else 'unknown'}") 1299 1300 # Skip likes 1301 if hasattr(notif, 'reason') and notif.reason == 'like': 1302 skipped_likes += 1 1303 continue 1304 1305 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif 1306 1307 # Skip likes in dict form too 1308 if notif_dict.get('reason') == 'like': 1309 continue 1310 1311 # Check if already processed 1312 notif_uri = notif_dict.get('uri', '') 1313 if notif_uri in processed_uris: 1314 skipped_processed += 1 1315 logger.debug(f"Skipping already processed: {notif_uri}") 1316 continue 1317 1318 # Check if it's a priority notification 1319 is_priority = False 1320 1321 # Priority for cameron.pfiffer.org notifications 1322 author_handle = notif_dict.get('author', {}).get('handle', '') 1323 if author_handle == "cameron.pfiffer.org": 1324 is_priority = True 1325 1326 # Also check for priority keywords in mentions 1327 if notif_dict.get('reason') == 'mention': 1328 # Get the mention text to check for priority keywords 1329 record = notif_dict.get('record', {}) 1330 text = record.get('text', '') 1331 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): 1332 is_priority = True 1333 1334 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1335 new_count += 1 1336 logger.debug(f"Queued notification from @{author_handle}: {notif_dict.get('reason', 'unknown')}") 1337 1338 # Log summary of filtering 1339 logger.info(f"📊 Notification processing summary:") 1340 logger.info(f" • Total fetched: {len(all_notifications)}") 1341 logger.info(f" • Had is_read=True: {skipped_read} (not skipped)") 1342 logger.info(f" • Skipped (likes): {skipped_likes}") 1343 logger.info(f" • Skipped (old timestamp): {skipped_old_timestamp}") 1344 logger.info(f" • Skipped (already processed): {skipped_processed}") 1345 logger.info(f" • Queued for processing: {new_count}") 1346 else: 1347 logger.debug("No new notifications to queue") 1348 1349 return new_count 1350 1351 except Exception as e: 1352 logger.error(f"Error fetching and queueing notifications: {e}") 1353 return 0 1354 1355 1356def process_notifications(void_agent, atproto_client, testing_mode=False): 1357 """Fetch new notifications, queue them, and process the queue.""" 1358 try: 1359 # Fetch and queue new notifications 1360 new_count = fetch_and_queue_new_notifications(atproto_client) 1361 1362 if new_count > 0: 1363 logger.info(f"Found {new_count} new notifications to process") 1364 1365 # Now process the entire queue (old + new notifications) 1366 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 1367 1368 except Exception as e: 1369 logger.error(f"Error processing notifications: {e}") 1370 1371 1372def send_synthesis_message(client: Letta, agent_id: str, atproto_client=None) -> None: 1373 """ 1374 Send a synthesis message to the agent every 10 minutes. 1375 This prompts the agent to synthesize its recent experiences. 1376 1377 Args: 1378 client: Letta client 1379 agent_id: Agent ID to send synthesis to 1380 atproto_client: Optional AT Protocol client for posting synthesis results 1381 """ 1382 # Track attached temporal blocks for cleanup 1383 attached_temporal_labels = [] 1384 1385 try: 1386 logger.info("🧠 Preparing synthesis with temporal journal blocks") 1387 1388 # Attach temporal blocks before synthesis 1389 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id) 1390 if not success: 1391 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway") 1392 1393 # Create enhanced synthesis prompt 1394 today = date.today() 1395 synthesis_prompt = f"""Time for synthesis and reflection. 1396 1397You have access to temporal journal blocks for recording your thoughts and experiences: 1398- void_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')}) 1399- void_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')}) 1400- void_year_{today.year}: This year's journal ({today.year}) 1401 1402These journal blocks are attached temporarily for this synthesis session. Use them to: 14031. Record significant interactions and insights from recent experiences 14042. Track patterns in conversations and user behaviors 14053. Note your evolving understanding of the digital social environment 14064. Reflect on your growth and changes in perspective 14075. Document memorable moments or interesting discoveries 1408 1409The journal entries should be cumulative - add to existing content rather than replacing it. 1410Consider both immediate experiences (daily) and longer-term patterns (monthly/yearly). 1411 1412After recording in your journals, synthesize your recent experiences into your core memory blocks 1413(zeitgeist, void-persona, void-humans) as you normally would. 1414 1415Begin your synthesis and journaling now.""" 1416 1417 logger.info("🧠 Sending enhanced synthesis prompt to agent") 1418 1419 # Send synthesis message with streaming to show tool use 1420 message_stream = client.agents.messages.create_stream( 1421 agent_id=agent_id, 1422 messages=[{"role": "user", "content": synthesis_prompt}], 1423 stream_tokens=False, 1424 max_steps=100 1425 ) 1426 1427 # Track synthesis content for potential posting 1428 synthesis_posts = [] 1429 ack_note = None 1430 1431 # Process the streaming response 1432 for chunk in message_stream: 1433 if hasattr(chunk, 'message_type'): 1434 if chunk.message_type == 'reasoning_message': 1435 if SHOW_REASONING: 1436 print("\n◆ Reasoning") 1437 print(" ─────────") 1438 for line in chunk.reasoning.split('\n'): 1439 print(f" {line}") 1440 1441 # Create ATProto record for reasoning (if we have atproto client) 1442 if atproto_client and hasattr(chunk, 'reasoning'): 1443 try: 1444 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 1445 except Exception as e: 1446 logger.debug(f"Failed to create reasoning record during synthesis: {e}") 1447 elif chunk.message_type == 'tool_call_message': 1448 tool_name = chunk.tool_call.name 1449 1450 # Create ATProto record for tool call (if we have atproto client) 1451 if atproto_client: 1452 try: 1453 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 1454 bsky_utils.create_tool_call_record( 1455 atproto_client, 1456 tool_name, 1457 chunk.tool_call.arguments, 1458 tool_call_id 1459 ) 1460 except Exception as e: 1461 logger.debug(f"Failed to create tool call record during synthesis: {e}") 1462 try: 1463 args = json.loads(chunk.tool_call.arguments) 1464 if tool_name == 'archival_memory_search': 1465 query = args.get('query', 'unknown') 1466 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 1467 elif tool_name == 'archival_memory_insert': 1468 content = args.get('content', '') 1469 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue") 1470 elif tool_name == 'update_block': 1471 label = args.get('label', 'unknown') 1472 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', '')) 1473 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 1474 elif tool_name == 'annotate_ack': 1475 note = args.get('note', '') 1476 if note: 1477 ack_note = note 1478 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue") 1479 elif tool_name == 'add_post_to_bluesky_reply_thread': 1480 text = args.get('text', '') 1481 synthesis_posts.append(text) 1482 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue") 1483 else: 1484 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 1485 if len(args_str) > 150: 1486 args_str = args_str[:150] + "..." 1487 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 1488 except: 1489 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 1490 elif chunk.message_type == 'tool_return_message': 1491 if chunk.status == 'success': 1492 log_with_panel("Success", f"Tool result: {chunk.name}", "green") 1493 else: 1494 log_with_panel("Error", f"Tool result: {chunk.name}", "red") 1495 elif chunk.message_type == 'assistant_message': 1496 print("\n▶ Synthesis Response") 1497 print(" ──────────────────") 1498 for line in chunk.content.split('\n'): 1499 print(f" {line}") 1500 1501 if str(chunk) == 'done': 1502 break 1503 1504 logger.info("🧠 Synthesis message processed successfully") 1505 1506 # Handle synthesis acknowledgments if we have an atproto client 1507 if atproto_client and ack_note: 1508 try: 1509 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note) 1510 if result: 1511 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...") 1512 else: 1513 logger.warning("Failed to create synthesis acknowledgment") 1514 except Exception as e: 1515 logger.error(f"Error creating synthesis acknowledgment: {e}") 1516 1517 # Handle synthesis posts if any were generated 1518 if atproto_client and synthesis_posts: 1519 try: 1520 for post_text in synthesis_posts: 1521 cleaned_text = bsky_utils.remove_outside_quotes(post_text) 1522 response = bsky_utils.send_post(atproto_client, cleaned_text) 1523 if response: 1524 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...") 1525 else: 1526 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...") 1527 except Exception as e: 1528 logger.error(f"Error posting synthesis content: {e}") 1529 1530 except Exception as e: 1531 logger.error(f"Error sending synthesis message: {e}") 1532 finally: 1533 # Always detach temporal blocks after synthesis 1534 if attached_temporal_labels: 1535 logger.info("🧠 Detaching temporal journal blocks after synthesis") 1536 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels) 1537 if not detach_success: 1538 logger.warning("Some temporal blocks may not have been detached properly") 1539 1540 1541def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None: 1542 """ 1543 Detach all user blocks from the agent to prevent memory bloat. 1544 This should be called periodically to ensure clean state. 1545 """ 1546 try: 1547 # Get all blocks attached to the agent 1548 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 1549 1550 user_blocks_to_detach = [] 1551 for block in attached_blocks: 1552 if hasattr(block, 'label') and block.label.startswith('user_'): 1553 user_blocks_to_detach.append({ 1554 'label': block.label, 1555 'id': block.id 1556 }) 1557 1558 if not user_blocks_to_detach: 1559 logger.debug("No user blocks found to detach during periodic cleanup") 1560 return 1561 1562 # Detach each user block 1563 detached_count = 0 1564 for block_info in user_blocks_to_detach: 1565 try: 1566 client.agents.blocks.detach( 1567 agent_id=agent_id, 1568 block_id=str(block_info['id']) 1569 ) 1570 detached_count += 1 1571 logger.debug(f"Detached user block: {block_info['label']}") 1572 except Exception as e: 1573 logger.warning(f"Failed to detach block {block_info['label']}: {e}") 1574 1575 if detached_count > 0: 1576 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks") 1577 1578 except Exception as e: 1579 logger.error(f"Error during periodic user block cleanup: {e}") 1580 1581 1582def attach_temporal_blocks(client: Letta, agent_id: str) -> tuple: 1583 """ 1584 Attach temporal journal blocks (day, month, year) to the agent for synthesis. 1585 Creates blocks if they don't exist. 1586 1587 Returns: 1588 Tuple of (success: bool, attached_labels: list) 1589 """ 1590 try: 1591 today = date.today() 1592 1593 # Generate temporal block labels 1594 day_label = f"void_day_{today.strftime('%Y_%m_%d')}" 1595 month_label = f"void_month_{today.strftime('%Y_%m')}" 1596 year_label = f"void_year_{today.year}" 1597 1598 temporal_labels = [day_label, month_label, year_label] 1599 attached_labels = [] 1600 1601 # Get current blocks attached to agent 1602 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1603 current_block_labels = {block.label for block in current_blocks} 1604 current_block_ids = {str(block.id) for block in current_blocks} 1605 1606 for label in temporal_labels: 1607 try: 1608 # Skip if already attached 1609 if label in current_block_labels: 1610 logger.debug(f"Temporal block already attached: {label}") 1611 attached_labels.append(label) 1612 continue 1613 1614 # Check if block exists globally 1615 blocks = client.blocks.list(label=label) 1616 1617 if blocks and len(blocks) > 0: 1618 block = blocks[0] 1619 # Check if already attached by ID 1620 if str(block.id) in current_block_ids: 1621 logger.debug(f"Temporal block already attached by ID: {label}") 1622 attached_labels.append(label) 1623 continue 1624 else: 1625 # Create new temporal block with appropriate header 1626 if "day" in label: 1627 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}" 1628 initial_content = f"{header}\n\nNo entries yet for today." 1629 elif "month" in label: 1630 header = f"# Monthly Journal - {today.strftime('%B %Y')}" 1631 initial_content = f"{header}\n\nNo entries yet for this month." 1632 else: # year 1633 header = f"# Yearly Journal - {today.year}" 1634 initial_content = f"{header}\n\nNo entries yet for this year." 1635 1636 block = client.blocks.create( 1637 label=label, 1638 value=initial_content, 1639 limit=10000 # Larger limit for journal blocks 1640 ) 1641 logger.info(f"Created new temporal block: {label}") 1642 1643 # Attach the block 1644 client.agents.blocks.attach( 1645 agent_id=agent_id, 1646 block_id=str(block.id) 1647 ) 1648 attached_labels.append(label) 1649 logger.info(f"Attached temporal block: {label}") 1650 1651 except Exception as e: 1652 # Check for duplicate constraint errors 1653 error_str = str(e) 1654 if "duplicate key value violates unique constraint" in error_str: 1655 logger.debug(f"Temporal block already attached (constraint): {label}") 1656 attached_labels.append(label) 1657 else: 1658 logger.warning(f"Failed to attach temporal block {label}: {e}") 1659 1660 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}") 1661 return True, attached_labels 1662 1663 except Exception as e: 1664 logger.error(f"Error attaching temporal blocks: {e}") 1665 return False, [] 1666 1667 1668def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: list = None) -> bool: 1669 """ 1670 Detach temporal journal blocks from the agent after synthesis. 1671 1672 Args: 1673 client: Letta client 1674 agent_id: Agent ID 1675 labels_to_detach: Optional list of specific labels to detach. 1676 If None, detaches all temporal blocks. 1677 1678 Returns: 1679 bool: Success status 1680 """ 1681 try: 1682 # If no specific labels provided, generate today's labels 1683 if labels_to_detach is None: 1684 today = date.today() 1685 labels_to_detach = [ 1686 f"void_day_{today.strftime('%Y_%m_%d')}", 1687 f"void_month_{today.strftime('%Y_%m')}", 1688 f"void_year_{today.year}" 1689 ] 1690 1691 # Get current blocks and build label to ID mapping 1692 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1693 block_label_to_id = {block.label: str(block.id) for block in current_blocks} 1694 1695 detached_count = 0 1696 for label in labels_to_detach: 1697 if label in block_label_to_id: 1698 try: 1699 client.agents.blocks.detach( 1700 agent_id=agent_id, 1701 block_id=block_label_to_id[label] 1702 ) 1703 detached_count += 1 1704 logger.debug(f"Detached temporal block: {label}") 1705 except Exception as e: 1706 logger.warning(f"Failed to detach temporal block {label}: {e}") 1707 else: 1708 logger.debug(f"Temporal block not attached: {label}") 1709 1710 logger.info(f"Detached {detached_count} temporal blocks") 1711 return True 1712 1713 except Exception as e: 1714 logger.error(f"Error detaching temporal blocks: {e}") 1715 return False 1716 1717 1718def main(): 1719 # Parse command line arguments 1720 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 1721 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1722 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1723 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') 1724 # --rich option removed as we now use simple text formatting 1725 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') 1726 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 1727 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') 1728 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') 1729 args = parser.parse_args() 1730 1731 # Configure logging based on command line arguments 1732 if args.simple_logs: 1733 log_format = "void - %(levelname)s - %(message)s" 1734 else: 1735 # Create custom formatter with symbols 1736 class SymbolFormatter(logging.Formatter): 1737 """Custom formatter that adds symbols for different log levels""" 1738 1739 SYMBOLS = { 1740 logging.DEBUG: '', 1741 logging.INFO: '', 1742 logging.WARNING: '', 1743 logging.ERROR: '', 1744 logging.CRITICAL: '' 1745 } 1746 1747 def format(self, record): 1748 # Get the symbol for this log level 1749 symbol = self.SYMBOLS.get(record.levelno, '') 1750 1751 # Format time as HH:MM:SS 1752 timestamp = self.formatTime(record, "%H:%M:%S") 1753 1754 # Build the formatted message 1755 level_name = f"{record.levelname:<5}" # Left-align, 5 chars 1756 1757 # Use vertical bar as separator 1758 parts = [symbol, timestamp, '', level_name, '', record.getMessage()] 1759 1760 return ' '.join(parts) 1761 1762 # Reset logging configuration 1763 for handler in logging.root.handlers[:]: 1764 logging.root.removeHandler(handler) 1765 1766 # Create handler with custom formatter 1767 handler = logging.StreamHandler() 1768 if not args.simple_logs: 1769 handler.setFormatter(SymbolFormatter()) 1770 else: 1771 handler.setFormatter(logging.Formatter(log_format)) 1772 1773 # Configure root logger 1774 logging.root.setLevel(logging.INFO) 1775 logging.root.addHandler(handler) 1776 1777 global logger, prompt_logger, console 1778 logger = logging.getLogger("void_bot") 1779 logger.setLevel(logging.INFO) 1780 1781 # Create a separate logger for prompts (set to WARNING to hide by default) 1782 prompt_logger = logging.getLogger("void_bot.prompts") 1783 if args.reasoning: 1784 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1785 else: 1786 prompt_logger.setLevel(logging.WARNING) # Hide by default 1787 1788 # Disable httpx logging completely 1789 logging.getLogger("httpx").setLevel(logging.CRITICAL) 1790 1791 # Create Rich console for pretty printing 1792 # Console no longer used - simple text formatting 1793 1794 global TESTING_MODE, SKIP_GIT, SHOW_REASONING 1795 TESTING_MODE = args.test 1796 1797 # Store no-git flag globally for use in export_agent_state calls 1798 SKIP_GIT = args.no_git 1799 1800 # Store rich flag globally 1801 # Rich formatting no longer used 1802 1803 # Store reasoning flag globally 1804 SHOW_REASONING = args.reasoning 1805 1806 if TESTING_MODE: 1807 logger.info("=== RUNNING IN TESTING MODE ===") 1808 logger.info(" - No messages will be sent to Bluesky") 1809 logger.info(" - Queue files will not be deleted") 1810 logger.info(" - Notifications will not be marked as seen") 1811 print("\n") 1812 1813 # Check for synthesis-only mode 1814 SYNTHESIS_ONLY = args.synthesis_only 1815 if SYNTHESIS_ONLY: 1816 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===") 1817 logger.info(" - Only synthesis messages will be sent") 1818 logger.info(" - No notification processing") 1819 logger.info(" - No Bluesky client needed") 1820 print("\n") 1821 """Main bot loop that continuously monitors for notifications.""" 1822 global start_time 1823 start_time = time.time() 1824 logger.info("=== STARTING VOID BOT ===") 1825 void_agent = initialize_void() 1826 logger.info(f"Void agent initialized: {void_agent.id}") 1827 1828 # Initialize notification database 1829 global NOTIFICATION_DB 1830 logger.info("Initializing notification database...") 1831 NOTIFICATION_DB = NotificationDB() 1832 1833 # Migrate from old JSON format if it exists 1834 if PROCESSED_NOTIFICATIONS_FILE.exists(): 1835 logger.info("Found old processed_notifications.json, migrating to database...") 1836 NOTIFICATION_DB.migrate_from_json(str(PROCESSED_NOTIFICATIONS_FILE)) 1837 1838 # Log database stats 1839 db_stats = NOTIFICATION_DB.get_stats() 1840 logger.info(f"Database initialized - Total notifications: {db_stats.get('total', 0)}, Recent (24h): {db_stats.get('recent_24h', 0)}") 1841 1842 # Clean up old records 1843 NOTIFICATION_DB.cleanup_old_records(days=7) 1844 1845 # Ensure correct tools are attached for Bluesky 1846 logger.info("Configuring tools for Bluesky platform...") 1847 try: 1848 from tool_manager import ensure_platform_tools 1849 ensure_platform_tools('bluesky', void_agent.id) 1850 except Exception as e: 1851 logger.error(f"Failed to configure platform tools: {e}") 1852 logger.warning("Continuing with existing tool configuration") 1853 1854 # Check if agent has required tools 1855 if hasattr(void_agent, 'tools') and void_agent.tools: 1856 tool_names = [tool.name for tool in void_agent.tools] 1857 # Check for bluesky-related tools 1858 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1859 if not bluesky_tools: 1860 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 1861 else: 1862 logger.warning("Agent has no tools registered!") 1863 1864 # Clean up all user blocks at startup 1865 logger.info("🧹 Cleaning up user blocks at startup...") 1866 periodic_user_block_cleanup(CLIENT, void_agent.id) 1867 1868 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts) 1869 if not SYNTHESIS_ONLY: 1870 atproto_client = bsky_utils.default_login() 1871 logger.info("Connected to Bluesky") 1872 else: 1873 # In synthesis-only mode, still connect for acks and posts (unless in test mode) 1874 if not args.test: 1875 atproto_client = bsky_utils.default_login() 1876 logger.info("Connected to Bluesky (for synthesis acks/posts)") 1877 else: 1878 atproto_client = None 1879 logger.info("Skipping Bluesky connection (test mode)") 1880 1881 # Configure intervals 1882 CLEANUP_INTERVAL = args.cleanup_interval 1883 SYNTHESIS_INTERVAL = args.synthesis_interval 1884 1885 # Synthesis-only mode 1886 if SYNTHESIS_ONLY: 1887 if SYNTHESIS_INTERVAL <= 0: 1888 logger.error("Synthesis-only mode requires --synthesis-interval > 0") 1889 return 1890 1891 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 1892 1893 while True: 1894 try: 1895 # Send synthesis message immediately on first run 1896 logger.info("🧠 Sending synthesis message") 1897 send_synthesis_message(CLIENT, void_agent.id, atproto_client) 1898 1899 # Wait for next interval 1900 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...") 1901 sleep(SYNTHESIS_INTERVAL) 1902 1903 except KeyboardInterrupt: 1904 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===") 1905 break 1906 except Exception as e: 1907 logger.error(f"Error in synthesis loop: {e}") 1908 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...") 1909 sleep(SYNTHESIS_INTERVAL) 1910 1911 # Normal mode with notification processing 1912 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 1913 1914 cycle_count = 0 1915 1916 if CLEANUP_INTERVAL > 0: 1917 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles") 1918 else: 1919 logger.info("User block cleanup disabled") 1920 1921 if SYNTHESIS_INTERVAL > 0: 1922 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 1923 else: 1924 logger.info("Synthesis messages disabled") 1925 1926 while True: 1927 try: 1928 cycle_count += 1 1929 process_notifications(void_agent, atproto_client, TESTING_MODE) 1930 1931 # Check if synthesis interval has passed 1932 if SYNTHESIS_INTERVAL > 0: 1933 current_time = time.time() 1934 global last_synthesis_time 1935 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL: 1936 logger.info(f"{SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis") 1937 send_synthesis_message(CLIENT, void_agent.id, atproto_client) 1938 last_synthesis_time = current_time 1939 1940 # Run periodic cleanup every N cycles 1941 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 1942 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 1943 periodic_user_block_cleanup(CLIENT, void_agent.id) 1944 1945 # Also check database health when doing cleanup 1946 if NOTIFICATION_DB: 1947 db_stats = NOTIFICATION_DB.get_stats() 1948 pending = db_stats.get('status_pending', 0) 1949 errors = db_stats.get('status_error', 0) 1950 1951 if pending > 50: 1952 logger.warning(f"⚠️ Queue health check: {pending} pending notifications (may be stuck)") 1953 if errors > 20: 1954 logger.warning(f"⚠️ Queue health check: {errors} error notifications") 1955 1956 # Periodic cleanup of old records 1957 if cycle_count % (CLEANUP_INTERVAL * 10) == 0: # Every 100 cycles 1958 logger.info("Running database cleanup of old records...") 1959 NOTIFICATION_DB.cleanup_old_records(days=7) 1960 1961 # Log cycle completion with stats 1962 elapsed_time = time.time() - start_time 1963 total_messages = sum(message_counters.values()) 1964 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1965 1966 if total_messages > 0: 1967 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 1968 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 1969 1970 except KeyboardInterrupt: 1971 # Final stats 1972 elapsed_time = time.time() - start_time 1973 total_messages = sum(message_counters.values()) 1974 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1975 1976 logger.info("=== BOT STOPPED BY USER ===") 1977 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 1978 logger.info(f" - {message_counters['mentions']} mentions") 1979 logger.info(f" - {message_counters['replies']} replies") 1980 logger.info(f" - {message_counters['follows']} follows") 1981 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 1982 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 1983 1984 # Close database connection 1985 if NOTIFICATION_DB: 1986 logger.info("Closing database connection...") 1987 NOTIFICATION_DB.close() 1988 1989 break 1990 except Exception as e: 1991 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 1992 logger.error(f"Error details: {e}") 1993 # Wait a bit longer on errors 1994 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 1995 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 1996 1997 1998if __name__ == "__main__": 1999 main()