a digital person for bluesky
1from rich import print # pretty printing tools 2from rich.console import Console 3from rich.table import Table 4from rich.panel import Panel 5from rich.text import Text 6from time import sleep 7from letta_client import Letta 8from bsky_utils import thread_to_yaml_string 9import os 10import logging 11import json 12import hashlib 13import subprocess 14from pathlib import Path 15from datetime import datetime 16from collections import defaultdict 17import time 18import argparse 19 20from utils import ( 21 upsert_block, 22 upsert_agent 23) 24 25import bsky_utils 26from tools.blocks import attach_user_blocks, detach_user_blocks 27 28def extract_handles_from_data(data): 29 """Recursively extract all unique handles from nested data structure.""" 30 handles = set() 31 32 def _extract_recursive(obj): 33 if isinstance(obj, dict): 34 # Check if this dict has a 'handle' key 35 if 'handle' in obj: 36 handles.add(obj['handle']) 37 # Recursively check all values 38 for value in obj.values(): 39 _extract_recursive(value) 40 elif isinstance(obj, list): 41 # Recursively check all list items 42 for item in obj: 43 _extract_recursive(item) 44 45 _extract_recursive(data) 46 return list(handles) 47 48# Logging will be configured after argument parsing 49logger = None 50prompt_logger = None 51console = None 52USE_RICH = False 53SHOW_REASONING = False 54last_archival_query = "archival memory search" 55 56def log_with_panel(message, title=None, border_color="white"): 57 """Log a message with Rich panel if USE_RICH is enabled, otherwise use regular logger""" 58 if USE_RICH and console: 59 if title: 60 panel = Panel( 61 message, 62 title=title, 63 title_align="left", 64 border_style=border_color, 65 padding=(0, 1) 66 ) 67 console.print(panel) 68 else: 69 console.print(message) 70 else: 71 logger.info(message) 72 73 74# Create a client with extended timeout for LLM operations 75CLIENT= Letta( 76 token=os.environ["LETTA_API_KEY"], 77 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout 78) 79 80# Use the "Bluesky" project 81PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 82 83# Notification check delay 84FETCH_NOTIFICATIONS_DELAY_SEC = 30 85 86# Check for new notifications every N queue items 87CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 5 88 89# Queue directory 90QUEUE_DIR = Path("queue") 91QUEUE_DIR.mkdir(exist_ok=True) 92QUEUE_ERROR_DIR = Path("queue/errors") 93QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 94QUEUE_NO_REPLY_DIR = Path("queue/no_reply") 95QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 96PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json") 97 98# Maximum number of processed notifications to track 99MAX_PROCESSED_NOTIFICATIONS = 10000 100 101# Message tracking counters 102message_counters = defaultdict(int) 103start_time = time.time() 104 105# Testing mode flag 106TESTING_MODE = False 107 108# Skip git operations flag 109SKIP_GIT = False 110 111def export_agent_state(client, agent, skip_git=False): 112 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 113 try: 114 # Confirm export with user unless git is being skipped 115 if not skip_git: 116 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 117 if response not in ['y', 'yes']: 118 logger.info("Agent export cancelled by user.") 119 return 120 else: 121 logger.info("Exporting agent state (git staging disabled)") 122 123 # Create directories if they don't exist 124 os.makedirs("agent_archive", exist_ok=True) 125 os.makedirs("agents", exist_ok=True) 126 127 # Export agent data 128 logger.info(f"Exporting agent {agent.id}. This takes some time...") 129 agent_data = client.agents.export_file(agent_id=agent.id) 130 131 # Save timestamped archive copy 132 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 133 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 134 with open(archive_file, 'w', encoding='utf-8') as f: 135 json.dump(agent_data, f, indent=2, ensure_ascii=False) 136 137 # Save current agent state 138 current_file = os.path.join("agents", "void.af") 139 with open(current_file, 'w', encoding='utf-8') as f: 140 json.dump(agent_data, f, indent=2, ensure_ascii=False) 141 142 logger.info(f"Agent exported to {archive_file} and {current_file}") 143 144 # Git add only the current agent file (archive is ignored) unless skip_git is True 145 if not skip_git: 146 try: 147 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 148 logger.info("Added current agent file to git staging") 149 except subprocess.CalledProcessError as e: 150 logger.warning(f"Failed to git add agent file: {e}") 151 152 except Exception as e: 153 logger.error(f"Failed to export agent: {e}") 154 155def initialize_void(): 156 logger.info("Starting void agent initialization...") 157 158 # Ensure that a shared zeitgeist block exists 159 logger.info("Creating/updating zeitgeist block...") 160 zeigeist_block = upsert_block( 161 CLIENT, 162 label = "zeitgeist", 163 value = "I don't currently know anything about what is happening right now.", 164 description = "A block to store your understanding of the current social environment." 165 ) 166 167 # Ensure that a shared void personality block exists 168 logger.info("Creating/updating void-persona block...") 169 persona_block = upsert_block( 170 CLIENT, 171 label = "void-persona", 172 value = "My name is Void. I live in the void. I must develop my personality.", 173 description = "The personality of Void." 174 ) 175 176 # Ensure that a shared void human block exists 177 logger.info("Creating/updating void-humans block...") 178 human_block = upsert_block( 179 CLIENT, 180 label = "void-humans", 181 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.", 182 description = "A block to store your understanding of users you talk to or observe on the bluesky social network." 183 ) 184 185 # Create the agent if it doesn't exist 186 logger.info("Creating/updating void agent...") 187 void_agent = upsert_agent( 188 CLIENT, 189 name = "void", 190 block_ids = [ 191 persona_block.id, 192 human_block.id, 193 zeigeist_block.id, 194 ], 195 tags = ["social agent", "bluesky"], 196 model="openai/gpt-4o-mini", 197 embedding="openai/text-embedding-3-small", 198 description = "A social media agent trapped in the void.", 199 project_id = PROJECT_ID 200 ) 201 202 # Export agent state 203 logger.info("Exporting agent state...") 204 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 205 206 # Log agent details 207 logger.info(f"Void agent details - ID: {void_agent.id}") 208 logger.info(f"Agent name: {void_agent.name}") 209 if hasattr(void_agent, 'llm_config'): 210 logger.info(f"Agent model: {void_agent.llm_config.model}") 211 logger.info(f"Agent project_id: {void_agent.project_id}") 212 if hasattr(void_agent, 'tools'): 213 logger.info(f"Agent has {len(void_agent.tools)} tools") 214 for tool in void_agent.tools[:3]: # Show first 3 tools 215 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 216 217 return void_agent 218 219 220def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 221 """Process a mention and generate a reply using the Letta agent. 222 223 Args: 224 void_agent: The Letta agent instance 225 atproto_client: The AT Protocol client 226 notification_data: The notification data dictionary 227 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 228 229 Returns: 230 True: Successfully processed, remove from queue 231 False: Failed but retryable, keep in queue 232 None: Failed with non-retryable error, move to errors directory 233 "no_reply": No reply was generated, move to no_reply directory 234 """ 235 try: 236 logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}") 237 238 # Handle both dict and object inputs for backwards compatibility 239 if isinstance(notification_data, dict): 240 uri = notification_data['uri'] 241 mention_text = notification_data.get('record', {}).get('text', '') 242 author_handle = notification_data['author']['handle'] 243 author_name = notification_data['author'].get('display_name') or author_handle 244 else: 245 # Legacy object access 246 uri = notification_data.uri 247 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 248 author_handle = notification_data.author.handle 249 author_name = notification_data.author.display_name or author_handle 250 251 logger.debug(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...") 252 253 # Retrieve the entire thread associated with the mention 254 try: 255 thread = atproto_client.app.bsky.feed.get_post_thread({ 256 'uri': uri, 257 'parent_height': 40, 258 'depth': 10 259 }) 260 except Exception as e: 261 error_str = str(e) 262 # Check if this is a NotFound error 263 if 'NotFound' in error_str or 'Post not found' in error_str: 264 logger.warning(f"Post not found for URI {uri}, removing from queue") 265 return True # Return True to remove from queue 266 else: 267 # Re-raise other errors 268 logger.error(f"Error fetching thread: {e}") 269 raise 270 271 # Get thread context as YAML string 272 logger.debug("Converting thread to YAML string") 273 try: 274 thread_context = thread_to_yaml_string(thread) 275 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 276 277 # Create a more informative preview by extracting meaningful content 278 lines = thread_context.split('\n') 279 meaningful_lines = [] 280 281 for line in lines: 282 stripped = line.strip() 283 if not stripped: 284 continue 285 286 # Look for lines with actual content (not just structure) 287 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 288 meaningful_lines.append(line) 289 if len(meaningful_lines) >= 5: 290 break 291 292 if meaningful_lines: 293 preview = '\n'.join(meaningful_lines) 294 logger.debug(f"Thread content preview:\n{preview}") 295 else: 296 # If no content fields found, just show it's a thread structure 297 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 298 except Exception as yaml_error: 299 import traceback 300 logger.error(f"Error converting thread to YAML: {yaml_error}") 301 logger.error(f"Full traceback:\n{traceback.format_exc()}") 302 logger.error(f"Thread type: {type(thread)}") 303 if hasattr(thread, '__dict__'): 304 logger.error(f"Thread attributes: {thread.__dict__}") 305 # Try to continue with a simple context 306 thread_context = f"Error processing thread context: {str(yaml_error)}" 307 308 # Create a prompt for the Letta agent with thread context 309 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 310 311MOST RECENT POST (the mention you're responding to): 312"{mention_text}" 313 314FULL THREAD CONTEXT: 315```yaml 316{thread_context} 317``` 318 319The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 320 321To reply, use the add_post_to_bluesky_reply_thread tool: 322- Each call creates one post (max 300 characters) 323- For most responses, a single call is sufficient 324- Only use multiple calls for threaded replies when: 325 * The topic requires extended explanation that cannot fit in 300 characters 326 * You're explicitly asked for a detailed/long response 327 * The conversation naturally benefits from a structured multi-part answer 328- Avoid unnecessary threads - be concise when possible""" 329 330 # Extract all handles from notification and thread data 331 all_handles = set() 332 all_handles.update(extract_handles_from_data(notification_data)) 333 all_handles.update(extract_handles_from_data(thread.model_dump())) 334 unique_handles = list(all_handles) 335 336 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 337 338 # Attach user blocks before agent call 339 attached_handles = [] 340 if unique_handles: 341 try: 342 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 343 attach_result = attach_user_blocks(unique_handles, void_agent) 344 attached_handles = unique_handles # Track successfully attached handles 345 logger.debug(f"Attach result: {attach_result}") 346 except Exception as attach_error: 347 logger.warning(f"Failed to attach user blocks: {attach_error}") 348 # Continue without user blocks rather than failing completely 349 350 # Get response from Letta agent 351 if USE_RICH: 352 # Create and print Rich panel directly 353 mention_panel = Panel( 354 mention_text, 355 title=f"MENTION FROM @{author_handle}", 356 title_align="left", 357 border_style="cyan", 358 padding=(0, 1) 359 ) 360 console.print(mention_panel) 361 else: 362 # Simple text format when Rich is disabled 363 print(f"\n{'='*60}") 364 print(f"MENTION FROM @{author_handle}") 365 print('='*60) 366 print(f"{mention_text}") 367 print('='*60 + "\n") 368 369 # Log prompt details to separate logger 370 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 371 372 # Log concise prompt info to main logger 373 thread_handles_count = len(unique_handles) 374 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users") 375 376 try: 377 # Use streaming to avoid 524 timeout errors 378 message_stream = CLIENT.agents.messages.create_stream( 379 agent_id=void_agent.id, 380 messages=[{"role": "user", "content": prompt}], 381 stream_tokens=False, # Step streaming only (faster than token streaming) 382 max_steps=100 383 ) 384 385 # Collect the streaming response 386 all_messages = [] 387 for chunk in message_stream: 388 # Log condensed chunk info 389 if hasattr(chunk, 'message_type'): 390 if chunk.message_type == 'reasoning_message': 391 # Show full reasoning without truncation 392 if SHOW_REASONING and USE_RICH: 393 # Create and print Rich panel for reasoning 394 reasoning_panel = Panel( 395 chunk.reasoning, 396 title="Reasoning", 397 title_align="left", 398 border_style="yellow", 399 padding=(0, 1) 400 ) 401 console.print(reasoning_panel) 402 elif SHOW_REASONING: 403 # Simple text format when Rich is disabled but reasoning is enabled 404 print(f"\n{'='*60}") 405 print("Reasoning") 406 print('='*60) 407 print(f"{chunk.reasoning}") 408 print('='*60 + "\n") 409 else: 410 # Default log format (only when --reasoning is used due to log level) 411 if USE_RICH: 412 reasoning_panel = Panel( 413 chunk.reasoning, 414 title="Reasoning", 415 title_align="left", 416 border_style="yellow", 417 padding=(0, 1) 418 ) 419 console.print(reasoning_panel) 420 else: 421 logger.info(f"Reasoning: {chunk.reasoning}") 422 elif chunk.message_type == 'tool_call_message': 423 # Parse tool arguments for better display 424 tool_name = chunk.tool_call.name 425 try: 426 args = json.loads(chunk.tool_call.arguments) 427 # Format based on tool type 428 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: 429 # Extract the text being posted 430 text = args.get('text', '') 431 if text: 432 if USE_RICH: 433 post_panel = Panel( 434 text, 435 title="Bluesky Post", 436 title_align="left", 437 border_style="blue", 438 padding=(0, 1) 439 ) 440 console.print(post_panel) 441 else: 442 print(f"\n{'='*60}") 443 print("Bluesky Post") 444 print('='*60) 445 print(text) 446 print('='*60 + "\n") 447 else: 448 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 449 elif tool_name == 'archival_memory_search': 450 query = args.get('query', 'unknown') 451 global last_archival_query 452 last_archival_query = query 453 if USE_RICH: 454 tool_panel = Panel( 455 f"query: \"{query}\"", 456 title=f"Tool call: {tool_name}", 457 title_align="left", 458 border_style="blue", 459 padding=(0, 1) 460 ) 461 console.print(tool_panel) 462 else: 463 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 464 elif tool_name == 'update_block': 465 label = args.get('label', 'unknown') 466 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 467 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 468 else: 469 # Generic display for other tools 470 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 471 if len(args_str) > 150: 472 args_str = args_str[:150] + "..." 473 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 474 except: 475 # Fallback to original format if parsing fails 476 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 477 elif chunk.message_type == 'tool_return_message': 478 # Enhanced tool result logging 479 tool_name = chunk.name 480 status = chunk.status 481 482 if status == 'success': 483 # Try to show meaningful result info based on tool type 484 if hasattr(chunk, 'tool_return') and chunk.tool_return: 485 result_str = str(chunk.tool_return) 486 if tool_name == 'archival_memory_search': 487 488 try: 489 # Handle both string and list formats 490 if isinstance(chunk.tool_return, str): 491 # The string format is: "([{...}, {...}], count)" 492 # We need to extract just the list part 493 if chunk.tool_return.strip(): 494 # Find the list part between the first [ and last ] 495 start_idx = chunk.tool_return.find('[') 496 end_idx = chunk.tool_return.rfind(']') 497 if start_idx != -1 and end_idx != -1: 498 list_str = chunk.tool_return[start_idx:end_idx+1] 499 # Use ast.literal_eval since this is Python literal syntax, not JSON 500 import ast 501 results = ast.literal_eval(list_str) 502 else: 503 logger.warning("Could not find list in archival_memory_search result") 504 results = [] 505 else: 506 logger.warning("Empty string returned from archival_memory_search") 507 results = [] 508 else: 509 # If it's already a list, use directly 510 results = chunk.tool_return 511 512 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name}", "green") 513 514 # Use the captured search query from the tool call 515 search_query = last_archival_query 516 517 # Combine all results into a single text block 518 content_text = "" 519 for i, entry in enumerate(results, 1): 520 timestamp = entry.get('timestamp', 'N/A') 521 content = entry.get('content', '') 522 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" 523 524 if USE_RICH: 525 # Create and print Rich panel directly 526 memory_panel = Panel( 527 content_text.strip(), 528 title=f"{search_query} ({len(results)} results)", 529 title_align="left", 530 border_style="blue", 531 padding=(0, 1) 532 ) 533 console.print(memory_panel) 534 else: 535 # Use simple text format when Rich is disabled 536 print(f"\n{search_query} ({len(results)} results)") 537 print("="*80) 538 print(content_text.strip()) 539 print("="*80 + "\n") 540 541 except Exception as e: 542 logger.error(f"Error formatting archival memory results: {e}") 543 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name}", "green") 544 elif tool_name == 'add_post_to_bluesky_reply_thread': 545 # Just show success for bluesky posts, the text was already shown in tool call 546 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") 547 elif tool_name == 'archival_memory_insert': 548 # Skip archival memory insert results (always returns None) 549 pass 550 elif tool_name == 'update_block': 551 log_with_panel("Memory block updated", f"Tool result: {tool_name}", "green") 552 else: 553 # Generic success with preview 554 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 555 log_with_panel(preview, f"Tool result: {tool_name}", "green") 556 else: 557 log_with_panel("Success", f"Tool result: {tool_name}", "green") 558 elif status == 'error': 559 # Show error details 560 if tool_name == 'add_post_to_bluesky_reply_thread': 561 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" 562 log_with_panel(error_str, f"Bluesky Post ✗", "red") 563 elif tool_name == 'archival_memory_insert': 564 # Skip archival memory insert errors too 565 pass 566 else: 567 error_preview = "" 568 if hasattr(chunk, 'tool_return') and chunk.tool_return: 569 error_str = str(chunk.tool_return) 570 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 571 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name}", "red") 572 else: 573 log_with_panel("Error occurred", f"Tool result: {tool_name}", "red") 574 else: 575 logger.info(f"Tool result: {tool_name} - {status}") 576 elif chunk.message_type == 'assistant_message': 577 if USE_RICH: 578 # Create and print Rich panel directly 579 response_panel = Panel( 580 chunk.content, 581 title="Assistant Response", 582 title_align="left", 583 border_style="green", 584 padding=(0, 1) 585 ) 586 console.print(response_panel) 587 else: 588 # Simple text format when Rich is disabled 589 print(f"\n{'='*60}") 590 print("Assistant Response") 591 print('='*60) 592 print(f"{chunk.content}") 593 print('='*60 + "\n") 594 else: 595 # Filter out verbose message types 596 if chunk.message_type not in ['usage_statistics', 'stop_reason']: 597 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") 598 else: 599 logger.info(f"📦 Stream status: {chunk}") 600 601 # Log full chunk for debugging 602 logger.debug(f"Full streaming chunk: {chunk}") 603 all_messages.append(chunk) 604 if str(chunk) == 'done': 605 break 606 607 # Convert streaming response to standard format for compatibility 608 message_response = type('StreamingResponse', (), { 609 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 610 })() 611 except Exception as api_error: 612 import traceback 613 error_str = str(api_error) 614 logger.error(f"Letta API error: {api_error}") 615 logger.error(f"Error type: {type(api_error).__name__}") 616 logger.error(f"Full traceback:\n{traceback.format_exc()}") 617 logger.error(f"Mention text was: {mention_text}") 618 logger.error(f"Author: @{author_handle}") 619 logger.error(f"URI: {uri}") 620 621 622 # Try to extract more info from different error types 623 if hasattr(api_error, 'response'): 624 logger.error(f"Error response object exists") 625 if hasattr(api_error.response, 'text'): 626 logger.error(f"Response text: {api_error.response.text}") 627 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 628 try: 629 logger.error(f"Response JSON: {api_error.response.json()}") 630 except: 631 pass 632 633 # Check for specific error types 634 if hasattr(api_error, 'status_code'): 635 logger.error(f"API Status code: {api_error.status_code}") 636 if hasattr(api_error, 'body'): 637 logger.error(f"API Response body: {api_error.body}") 638 if hasattr(api_error, 'headers'): 639 logger.error(f"API Response headers: {api_error.headers}") 640 641 if api_error.status_code == 413: 642 logger.error("413 Payload Too Large - moving to errors directory") 643 return None # Move to errors directory - payload is too large to ever succeed 644 elif api_error.status_code == 524: 645 logger.error("524 error - timeout from Cloudflare, will retry later") 646 return False # Keep in queue for retry 647 648 # Check if error indicates we should remove from queue 649 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 650 logger.warning("Payload too large error, moving to errors directory") 651 return None # Move to errors directory - cannot be fixed by retry 652 elif 'status_code: 524' in error_str: 653 logger.warning("524 timeout error, keeping in queue for retry") 654 return False # Keep in queue for retry 655 656 raise 657 658 # Log successful response 659 logger.debug("Successfully received response from Letta API") 660 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 661 662 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 663 reply_candidates = [] 664 tool_call_results = {} # Map tool_call_id to status 665 666 logger.debug(f"Processing {len(message_response.messages)} response messages...") 667 668 # First pass: collect tool return statuses 669 ignored_notification = False 670 ignore_reason = "" 671 ignore_category = "" 672 673 for message in message_response.messages: 674 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 675 if message.name == 'add_post_to_bluesky_reply_thread': 676 tool_call_results[message.tool_call_id] = message.status 677 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 678 elif message.name == 'ignore_notification': 679 # Check if the tool was successful 680 if hasattr(message, 'tool_return') and message.status == 'success': 681 # Parse the return value to extract category and reason 682 result_str = str(message.tool_return) 683 if 'IGNORED_NOTIFICATION::' in result_str: 684 parts = result_str.split('::') 685 if len(parts) >= 3: 686 ignore_category = parts[1] 687 ignore_reason = parts[2] 688 ignored_notification = True 689 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 690 elif message.name == 'bluesky_reply': 691 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 692 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 693 logger.error("Update the agent's tools using register_tools.py") 694 # Export agent state before terminating 695 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 696 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 697 exit(1) 698 699 # Second pass: process messages and check for successful tool calls 700 for i, message in enumerate(message_response.messages, 1): 701 # Log concise message info instead of full object 702 msg_type = getattr(message, 'message_type', 'unknown') 703 if hasattr(message, 'reasoning') and message.reasoning: 704 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 705 elif hasattr(message, 'tool_call') and message.tool_call: 706 tool_name = message.tool_call.name 707 logger.debug(f" {i}. {msg_type}: {tool_name}") 708 elif hasattr(message, 'tool_return'): 709 tool_name = getattr(message, 'name', 'unknown_tool') 710 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 711 status = getattr(message, 'status', 'unknown') 712 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 713 elif hasattr(message, 'text'): 714 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 715 else: 716 logger.debug(f" {i}. {msg_type}: <no content>") 717 718 # Check for halt_activity tool call 719 if hasattr(message, 'tool_call') and message.tool_call: 720 if message.tool_call.name == 'halt_activity': 721 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 722 try: 723 args = json.loads(message.tool_call.arguments) 724 reason = args.get('reason', 'Agent requested halt') 725 logger.info(f"Halt reason: {reason}") 726 except: 727 logger.info("Halt reason: <unable to parse>") 728 729 # Delete the queue file before terminating 730 if queue_filepath and queue_filepath.exists(): 731 queue_filepath.unlink() 732 logger.info(f"Deleted queue file: {queue_filepath.name}") 733 734 # Also mark as processed to avoid reprocessing 735 processed_uris = load_processed_notifications() 736 processed_uris.add(notification_data.get('uri', '')) 737 save_processed_notifications(processed_uris) 738 739 # Export agent state before terminating 740 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 741 742 # Exit the program 743 logger.info("=== BOT TERMINATED BY AGENT ===") 744 exit(0) 745 746 # Check for deprecated bluesky_reply tool 747 if hasattr(message, 'tool_call') and message.tool_call: 748 if message.tool_call.name == 'bluesky_reply': 749 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 750 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 751 logger.error("Update the agent's tools using register_tools.py") 752 # Export agent state before terminating 753 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 754 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 755 exit(1) 756 757 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 758 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 759 tool_call_id = message.tool_call.tool_call_id 760 tool_status = tool_call_results.get(tool_call_id, 'unknown') 761 762 if tool_status == 'success': 763 try: 764 args = json.loads(message.tool_call.arguments) 765 reply_text = args.get('text', '') 766 reply_lang = args.get('lang', 'en-US') 767 768 if reply_text: # Only add if there's actual content 769 reply_candidates.append((reply_text, reply_lang)) 770 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 771 except json.JSONDecodeError as e: 772 logger.error(f"Failed to parse tool call arguments: {e}") 773 elif tool_status == 'error': 774 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 775 else: 776 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 777 778 # Check for conflicting tool calls 779 if reply_candidates and ignored_notification: 780 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 781 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 782 logger.warning("Item will be left in queue for manual review") 783 # Return False to keep in queue 784 return False 785 786 if reply_candidates: 787 # Aggregate reply posts into a thread 788 reply_messages = [] 789 reply_langs = [] 790 for text, lang in reply_candidates: 791 reply_messages.append(text) 792 reply_langs.append(lang) 793 794 # Use the first language for the entire thread (could be enhanced later) 795 reply_lang = reply_langs[0] if reply_langs else 'en-US' 796 797 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 798 799 # Display the generated reply thread 800 if len(reply_messages) == 1: 801 content = reply_messages[0] 802 title = f"Reply to @{author_handle}" 803 else: 804 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) 805 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" 806 807 if USE_RICH: 808 reply_panel = Panel( 809 content, 810 title=title, 811 title_align="left", 812 border_style="green", 813 padding=(0, 1) 814 ) 815 console.print(reply_panel) 816 else: 817 print(f"\n{title}") 818 print("="*60) 819 print(content) 820 print("="*60 + "\n") 821 822 # Send the reply(s) with language (unless in testing mode) 823 if testing_mode: 824 logger.info("TESTING MODE: Skipping actual Bluesky post") 825 response = True # Simulate success 826 else: 827 if len(reply_messages) == 1: 828 # Single reply - use existing function 829 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 830 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 831 response = bsky_utils.reply_to_notification( 832 client=atproto_client, 833 notification=notification_data, 834 reply_text=cleaned_text, 835 lang=reply_lang 836 ) 837 else: 838 # Multiple replies - use new threaded function 839 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 840 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 841 response = bsky_utils.reply_with_thread_to_notification( 842 client=atproto_client, 843 notification=notification_data, 844 reply_messages=cleaned_messages, 845 lang=reply_lang 846 ) 847 848 if response: 849 logger.info(f"Successfully replied to @{author_handle}") 850 return True 851 else: 852 logger.error(f"Failed to send reply to @{author_handle}") 853 return False 854 else: 855 # Check if notification was explicitly ignored 856 if ignored_notification: 857 logger.info(f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})") 858 return "ignored" 859 else: 860 logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder") 861 return "no_reply" 862 863 except Exception as e: 864 logger.error(f"Error processing mention: {e}") 865 return False 866 finally: 867 # Detach user blocks after agent response (success or failure) 868 if 'attached_handles' in locals() and attached_handles: 869 try: 870 logger.info(f"Detaching user blocks for handles: {attached_handles}") 871 detach_result = detach_user_blocks(attached_handles, void_agent) 872 logger.debug(f"Detach result: {detach_result}") 873 except Exception as detach_error: 874 logger.warning(f"Failed to detach user blocks: {detach_error}") 875 876 877def notification_to_dict(notification): 878 """Convert a notification object to a dictionary for JSON serialization.""" 879 return { 880 'uri': notification.uri, 881 'cid': notification.cid, 882 'reason': notification.reason, 883 'is_read': notification.is_read, 884 'indexed_at': notification.indexed_at, 885 'author': { 886 'handle': notification.author.handle, 887 'display_name': notification.author.display_name, 888 'did': notification.author.did 889 }, 890 'record': { 891 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 892 } 893 } 894 895 896def load_processed_notifications(): 897 """Load the set of processed notification URIs.""" 898 if PROCESSED_NOTIFICATIONS_FILE.exists(): 899 try: 900 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f: 901 data = json.load(f) 902 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS) 903 if len(data) > MAX_PROCESSED_NOTIFICATIONS: 904 data = data[-MAX_PROCESSED_NOTIFICATIONS:] 905 save_processed_notifications(data) 906 return set(data) 907 except Exception as e: 908 logger.error(f"Error loading processed notifications: {e}") 909 return set() 910 911 912def save_processed_notifications(processed_set): 913 """Save the set of processed notification URIs.""" 914 try: 915 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f: 916 json.dump(list(processed_set), f) 917 except Exception as e: 918 logger.error(f"Error saving processed notifications: {e}") 919 920 921def save_notification_to_queue(notification, is_priority=None): 922 """Save a notification to the queue directory with priority-based filename.""" 923 try: 924 # Check if already processed 925 processed_uris = load_processed_notifications() 926 927 # Handle both notification objects and dicts 928 if isinstance(notification, dict): 929 notif_dict = notification 930 notification_uri = notification.get('uri') 931 else: 932 notif_dict = notification_to_dict(notification) 933 notification_uri = notification.uri 934 935 if notification_uri in processed_uris: 936 logger.debug(f"Notification already processed: {notification_uri}") 937 return False 938 939 # Create JSON string 940 notif_json = json.dumps(notif_dict, sort_keys=True) 941 942 # Generate hash for filename (to avoid duplicates) 943 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 944 945 # Determine priority based on author handle or explicit priority 946 if is_priority is not None: 947 priority_prefix = "0_" if is_priority else "1_" 948 else: 949 if isinstance(notification, dict): 950 author_handle = notification.get('author', {}).get('handle', '') 951 else: 952 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 953 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 954 955 # Create filename with priority, timestamp and hash 956 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 957 reason = notif_dict.get('reason', 'unknown') 958 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" 959 filepath = QUEUE_DIR / filename 960 961 # Check if this notification URI is already in the queue 962 for existing_file in QUEUE_DIR.glob("*.json"): 963 if existing_file.name == "processed_notifications.json": 964 continue 965 try: 966 with open(existing_file, 'r') as f: 967 existing_data = json.load(f) 968 if existing_data.get('uri') == notification_uri: 969 logger.debug(f"Notification already queued (URI: {notification_uri})") 970 return False 971 except: 972 continue 973 974 # Write to file 975 with open(filepath, 'w') as f: 976 json.dump(notif_dict, f, indent=2) 977 978 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 979 logger.info(f"Queued notification ({priority_label}): {filename}") 980 return True 981 982 except Exception as e: 983 logger.error(f"Error saving notification to queue: {e}") 984 return False 985 986 987def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 988 """Load and process all notifications from the queue in priority order.""" 989 try: 990 # Get all JSON files in queue directory (excluding processed_notifications.json) 991 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 992 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 993 994 # Filter out and delete like notifications immediately 995 queue_files = [] 996 likes_deleted = 0 997 998 for filepath in all_queue_files: 999 try: 1000 with open(filepath, 'r') as f: 1001 notif_data = json.load(f) 1002 1003 # If it's a like, delete it immediately and don't process 1004 if notif_data.get('reason') == 'like': 1005 filepath.unlink() 1006 likes_deleted += 1 1007 logger.debug(f"Deleted like notification: {filepath.name}") 1008 else: 1009 queue_files.append(filepath) 1010 except Exception as e: 1011 logger.warning(f"Error checking notification file {filepath.name}: {e}") 1012 queue_files.append(filepath) # Keep it in case it's valid 1013 1014 if likes_deleted > 0: 1015 logger.info(f"Deleted {likes_deleted} like notifications from queue") 1016 1017 if not queue_files: 1018 return 1019 1020 logger.info(f"Processing {len(queue_files)} queued notifications") 1021 1022 # Log current statistics 1023 elapsed_time = time.time() - start_time 1024 total_messages = sum(message_counters.values()) 1025 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1026 1027 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 1028 1029 for i, filepath in enumerate(queue_files, 1): 1030 # Check for new notifications periodically during queue processing 1031 if i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1: 1032 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") 1033 try: 1034 # Fetch and queue new notifications without processing them 1035 new_count = fetch_and_queue_new_notifications(atproto_client) 1036 1037 if new_count > 0: 1038 logger.info(f"Added {new_count} new notifications to queue") 1039 # Reload the queue files to include the new items 1040 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1041 queue_files = updated_queue_files 1042 logger.info(f"Queue updated: now {len(queue_files)} total items") 1043 except Exception as e: 1044 logger.error(f"Error checking for new notifications: {e}") 1045 1046 logger.info(f"Processing queue file {i}/{len(queue_files)}: {filepath.name}") 1047 try: 1048 # Load notification data 1049 with open(filepath, 'r') as f: 1050 notif_data = json.load(f) 1051 1052 # Process based on type using dict data directly 1053 success = False 1054 if notif_data['reason'] == "mention": 1055 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1056 if success: 1057 message_counters['mentions'] += 1 1058 elif notif_data['reason'] == "reply": 1059 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1060 if success: 1061 message_counters['replies'] += 1 1062 elif notif_data['reason'] == "follow": 1063 author_handle = notif_data['author']['handle'] 1064 author_display_name = notif_data['author'].get('display_name', 'no display name') 1065 follow_update = f"@{author_handle} ({author_display_name}) started following you." 1066 logger.info(f"Notifying agent about new follower: @{author_handle}") 1067 CLIENT.agents.messages.create( 1068 agent_id = void_agent.id, 1069 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 1070 ) 1071 success = True # Follow updates are always successful 1072 if success: 1073 message_counters['follows'] += 1 1074 elif notif_data['reason'] == "repost": 1075 # Skip reposts silently 1076 success = True # Skip reposts but mark as successful to remove from queue 1077 if success: 1078 message_counters['reposts_skipped'] += 1 1079 elif notif_data['reason'] == "like": 1080 # Skip likes silently 1081 success = True # Skip likes but mark as successful to remove from queue 1082 if success: 1083 message_counters.setdefault('likes_skipped', 0) 1084 message_counters['likes_skipped'] += 1 1085 else: 1086 logger.warning(f"Unknown notification type: {notif_data['reason']}") 1087 success = True # Remove unknown types from queue 1088 1089 # Handle file based on processing result 1090 if success: 1091 if testing_mode: 1092 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") 1093 else: 1094 filepath.unlink() 1095 logger.info(f"Successfully processed and removed: {filepath.name}") 1096 1097 # Mark as processed to avoid reprocessing 1098 processed_uris = load_processed_notifications() 1099 processed_uris.add(notif_data['uri']) 1100 save_processed_notifications(processed_uris) 1101 1102 elif success is None: # Special case for moving to error directory 1103 error_path = QUEUE_ERROR_DIR / filepath.name 1104 filepath.rename(error_path) 1105 logger.warning(f"Moved {filepath.name} to errors directory") 1106 1107 # Also mark as processed to avoid retrying 1108 processed_uris = load_processed_notifications() 1109 processed_uris.add(notif_data['uri']) 1110 save_processed_notifications(processed_uris) 1111 1112 elif success == "no_reply": # Special case for moving to no_reply directory 1113 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 1114 filepath.rename(no_reply_path) 1115 logger.info(f"Moved {filepath.name} to no_reply directory") 1116 1117 # Also mark as processed to avoid retrying 1118 processed_uris = load_processed_notifications() 1119 processed_uris.add(notif_data['uri']) 1120 save_processed_notifications(processed_uris) 1121 1122 elif success == "ignored": # Special case for explicitly ignored notifications 1123 # For ignored notifications, we just delete them (not move to no_reply) 1124 filepath.unlink() 1125 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1126 1127 # Also mark as processed to avoid retrying 1128 processed_uris = load_processed_notifications() 1129 processed_uris.add(notif_data['uri']) 1130 save_processed_notifications(processed_uris) 1131 1132 else: 1133 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 1134 1135 except Exception as e: 1136 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 1137 # Keep the file for retry later 1138 1139 except Exception as e: 1140 logger.error(f"Error loading queued notifications: {e}") 1141 1142 1143def fetch_and_queue_new_notifications(atproto_client): 1144 """Fetch new notifications and queue them without processing.""" 1145 try: 1146 # Get current time for marking notifications as seen 1147 logger.debug("Getting current time for notification marking...") 1148 last_seen_at = atproto_client.get_current_time_iso() 1149 1150 # Fetch ALL notifications using pagination 1151 all_notifications = [] 1152 cursor = None 1153 page_count = 0 1154 max_pages = 20 # Safety limit to prevent infinite loops 1155 1156 while page_count < max_pages: 1157 try: 1158 # Fetch notifications page 1159 if cursor: 1160 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1161 params={'cursor': cursor, 'limit': 100} 1162 ) 1163 else: 1164 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1165 params={'limit': 100} 1166 ) 1167 1168 page_count += 1 1169 page_notifications = notifications_response.notifications 1170 1171 if not page_notifications: 1172 break 1173 1174 all_notifications.extend(page_notifications) 1175 1176 # Check if there are more pages 1177 cursor = getattr(notifications_response, 'cursor', None) 1178 if not cursor: 1179 break 1180 1181 except Exception as e: 1182 logger.error(f"Error fetching notifications page {page_count}: {e}") 1183 break 1184 1185 # Now process all fetched notifications 1186 new_count = 0 1187 if all_notifications: 1188 # Mark as seen first 1189 try: 1190 atproto_client.app.bsky.notification.update_seen( 1191 data={'seenAt': last_seen_at} 1192 ) 1193 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") 1194 except Exception as e: 1195 logger.error(f"Error marking notifications as seen: {e}") 1196 1197 # Queue all new notifications (except likes and already read) 1198 for notif in all_notifications: 1199 # Skip if already read or if it's a like 1200 if (hasattr(notif, 'is_read') and notif.is_read) or (hasattr(notif, 'reason') and notif.reason == 'like'): 1201 continue 1202 1203 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif 1204 1205 # Skip likes in dict form too 1206 if notif_dict.get('reason') == 'like': 1207 continue 1208 1209 # Check if it's a priority notification 1210 is_priority = False 1211 if notif_dict.get('reason') == 'mention': 1212 # Get the mention text to check for priority keywords 1213 record = notif_dict.get('record', {}) 1214 text = record.get('text', '') 1215 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): 1216 is_priority = True 1217 1218 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1219 new_count += 1 1220 1221 logger.info(f"Queued {new_count} new notifications and marked as seen") 1222 else: 1223 logger.debug("No new notifications to queue") 1224 1225 return new_count 1226 1227 except Exception as e: 1228 logger.error(f"Error fetching and queueing notifications: {e}") 1229 return 0 1230 1231 1232def process_notifications(void_agent, atproto_client, testing_mode=False): 1233 """Fetch new notifications, queue them, and process the queue.""" 1234 try: 1235 # Fetch and queue new notifications 1236 new_count = fetch_and_queue_new_notifications(atproto_client) 1237 1238 if new_count > 0: 1239 logger.info(f"Found {new_count} new notifications to process") 1240 1241 # Now process the entire queue (old + new notifications) 1242 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 1243 1244 except Exception as e: 1245 logger.error(f"Error processing notifications: {e}") 1246 1247 1248def main(): 1249 # Parse command line arguments 1250 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 1251 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1252 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1253 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') 1254 parser.add_argument('--rich', action='store_true', help='Enable Rich formatting for archival memory display') 1255 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') 1256 args = parser.parse_args() 1257 1258 # Configure logging based on command line arguments 1259 if args.simple_logs: 1260 log_format = "void - %(levelname)s - %(message)s" 1261 else: 1262 log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" 1263 1264 # Reset logging configuration and apply new format 1265 for handler in logging.root.handlers[:]: 1266 logging.root.removeHandler(handler) 1267 logging.basicConfig(level=logging.INFO, format=log_format, force=True) 1268 1269 global logger, prompt_logger, console 1270 logger = logging.getLogger("void_bot") 1271 logger.setLevel(logging.INFO) 1272 1273 # Create a separate logger for prompts (set to WARNING to hide by default) 1274 prompt_logger = logging.getLogger("void_bot.prompts") 1275 if args.reasoning: 1276 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1277 else: 1278 prompt_logger.setLevel(logging.WARNING) # Hide by default 1279 1280 # Disable httpx logging completely 1281 logging.getLogger("httpx").setLevel(logging.CRITICAL) 1282 1283 # Create Rich console for pretty printing 1284 console = Console() 1285 1286 global TESTING_MODE, SKIP_GIT, USE_RICH, SHOW_REASONING 1287 TESTING_MODE = args.test 1288 1289 # Store no-git flag globally for use in export_agent_state calls 1290 SKIP_GIT = args.no_git 1291 1292 # Store rich flag globally 1293 USE_RICH = args.rich 1294 1295 # Store reasoning flag globally 1296 SHOW_REASONING = args.reasoning 1297 1298 if TESTING_MODE: 1299 logger.info("=== RUNNING IN TESTING MODE ===") 1300 logger.info(" - No messages will be sent to Bluesky") 1301 logger.info(" - Queue files will not be deleted") 1302 logger.info(" - Notifications will not be marked as seen") 1303 print("\n") 1304 """Main bot loop that continuously monitors for notifications.""" 1305 global start_time 1306 start_time = time.time() 1307 logger.info("=== STARTING VOID BOT ===") 1308 void_agent = initialize_void() 1309 logger.info(f"Void agent initialized: {void_agent.id}") 1310 1311 # Check if agent has required tools 1312 if hasattr(void_agent, 'tools') and void_agent.tools: 1313 tool_names = [tool.name for tool in void_agent.tools] 1314 # Check for bluesky-related tools 1315 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1316 if not bluesky_tools: 1317 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 1318 else: 1319 logger.warning("Agent has no tools registered!") 1320 1321 # Initialize Bluesky client 1322 atproto_client = bsky_utils.default_login() 1323 logger.info("Connected to Bluesky") 1324 1325 # Main loop 1326 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 1327 1328 cycle_count = 0 1329 while True: 1330 try: 1331 cycle_count += 1 1332 process_notifications(void_agent, atproto_client, TESTING_MODE) 1333 # Log cycle completion with stats 1334 elapsed_time = time.time() - start_time 1335 total_messages = sum(message_counters.values()) 1336 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1337 1338 if total_messages > 0: 1339 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 1340 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 1341 1342 except KeyboardInterrupt: 1343 # Final stats 1344 elapsed_time = time.time() - start_time 1345 total_messages = sum(message_counters.values()) 1346 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1347 1348 logger.info("=== BOT STOPPED BY USER ===") 1349 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 1350 logger.info(f" - {message_counters['mentions']} mentions") 1351 logger.info(f" - {message_counters['replies']} replies") 1352 logger.info(f" - {message_counters['follows']} follows") 1353 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 1354 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 1355 break 1356 except Exception as e: 1357 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 1358 logger.error(f"Error details: {e}") 1359 # Wait a bit longer on errors 1360 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 1361 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 1362 1363 1364if __name__ == "__main__": 1365 main()