a digital person for bluesky
1# Rich imports removed - using simple text formatting 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14import argparse 15 16from utils import ( 17 upsert_block, 18 upsert_agent 19) 20 21import bsky_utils 22from tools.blocks import attach_user_blocks, detach_user_blocks 23 24def extract_handles_from_data(data): 25 """Recursively extract all unique handles from nested data structure.""" 26 handles = set() 27 28 def _extract_recursive(obj): 29 if isinstance(obj, dict): 30 # Check if this dict has a 'handle' key 31 if 'handle' in obj: 32 handles.add(obj['handle']) 33 # Recursively check all values 34 for value in obj.values(): 35 _extract_recursive(value) 36 elif isinstance(obj, list): 37 # Recursively check all list items 38 for item in obj: 39 _extract_recursive(item) 40 41 _extract_recursive(data) 42 return list(handles) 43 44# Logging will be configured after argument parsing 45logger = None 46prompt_logger = None 47# Simple text formatting (Rich no longer used) 48SHOW_REASONING = False 49last_archival_query = "archival memory search" 50 51def log_with_panel(message, title=None, border_color="white"): 52 """Log a message with Unicode box-drawing characters""" 53 if title: 54 # Map old color names to appropriate symbols 55 symbol_map = { 56 "blue": "", # Tool calls 57 "green": "", # Success/completion 58 "yellow": "", # Reasoning 59 "red": "", # Errors 60 "white": "", # Default/mentions 61 "cyan": "", # Posts 62 } 63 symbol = symbol_map.get(border_color, "") 64 65 print(f"\n{symbol} {title}") 66 print(f" {'' * len(title)}") 67 # Indent message lines 68 for line in message.split('\n'): 69 print(f" {line}") 70 else: 71 print(message) 72 73 74# Create a client with extended timeout for LLM operations 75CLIENT= Letta( 76 token=os.environ["LETTA_API_KEY"], 77 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout 78) 79 80# Use the "Bluesky" project 81PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 82 83# Notification check delay 84FETCH_NOTIFICATIONS_DELAY_SEC = 30 85 86# Check for new notifications every N queue items 87CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 5 88 89# Queue directory 90QUEUE_DIR = Path("queue") 91QUEUE_DIR.mkdir(exist_ok=True) 92QUEUE_ERROR_DIR = Path("queue/errors") 93QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 94QUEUE_NO_REPLY_DIR = Path("queue/no_reply") 95QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 96PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json") 97 98# Maximum number of processed notifications to track 99MAX_PROCESSED_NOTIFICATIONS = 10000 100 101# Message tracking counters 102message_counters = defaultdict(int) 103start_time = time.time() 104 105# Testing mode flag 106TESTING_MODE = False 107 108# Skip git operations flag 109SKIP_GIT = False 110 111def export_agent_state(client, agent, skip_git=False): 112 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 113 try: 114 # Confirm export with user unless git is being skipped 115 if not skip_git: 116 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 117 if response not in ['y', 'yes']: 118 logger.info("Agent export cancelled by user.") 119 return 120 else: 121 logger.info("Exporting agent state (git staging disabled)") 122 123 # Create directories if they don't exist 124 os.makedirs("agent_archive", exist_ok=True) 125 os.makedirs("agents", exist_ok=True) 126 127 # Export agent data 128 logger.info(f"Exporting agent {agent.id}. This takes some time...") 129 agent_data = client.agents.export_file(agent_id=agent.id) 130 131 # Save timestamped archive copy 132 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 133 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 134 with open(archive_file, 'w', encoding='utf-8') as f: 135 json.dump(agent_data, f, indent=2, ensure_ascii=False) 136 137 # Save current agent state 138 current_file = os.path.join("agents", "void.af") 139 with open(current_file, 'w', encoding='utf-8') as f: 140 json.dump(agent_data, f, indent=2, ensure_ascii=False) 141 142 logger.info(f"Agent exported to {archive_file} and {current_file}") 143 144 # Git add only the current agent file (archive is ignored) unless skip_git is True 145 if not skip_git: 146 try: 147 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 148 logger.info("Added current agent file to git staging") 149 except subprocess.CalledProcessError as e: 150 logger.warning(f"Failed to git add agent file: {e}") 151 152 except Exception as e: 153 logger.error(f"Failed to export agent: {e}") 154 155def initialize_void(): 156 logger.info("Starting void agent initialization...") 157 158 # Ensure that a shared zeitgeist block exists 159 logger.info("Creating/updating zeitgeist block...") 160 zeigeist_block = upsert_block( 161 CLIENT, 162 label = "zeitgeist", 163 value = "I don't currently know anything about what is happening right now.", 164 description = "A block to store your understanding of the current social environment." 165 ) 166 167 # Ensure that a shared void personality block exists 168 logger.info("Creating/updating void-persona block...") 169 persona_block = upsert_block( 170 CLIENT, 171 label = "void-persona", 172 value = "My name is Void. I live in the void. I must develop my personality.", 173 description = "The personality of Void." 174 ) 175 176 # Ensure that a shared void human block exists 177 logger.info("Creating/updating void-humans block...") 178 human_block = upsert_block( 179 CLIENT, 180 label = "void-humans", 181 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.", 182 description = "A block to store your understanding of users you talk to or observe on the bluesky social network." 183 ) 184 185 # Create the agent if it doesn't exist 186 logger.info("Creating/updating void agent...") 187 void_agent = upsert_agent( 188 CLIENT, 189 name = "void", 190 block_ids = [ 191 persona_block.id, 192 human_block.id, 193 zeigeist_block.id, 194 ], 195 tags = ["social agent", "bluesky"], 196 model="openai/gpt-4o-mini", 197 embedding="openai/text-embedding-3-small", 198 description = "A social media agent trapped in the void.", 199 project_id = PROJECT_ID 200 ) 201 202 # Export agent state 203 logger.info("Exporting agent state...") 204 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 205 206 # Log agent details 207 logger.info(f"Void agent details - ID: {void_agent.id}") 208 logger.info(f"Agent name: {void_agent.name}") 209 if hasattr(void_agent, 'llm_config'): 210 logger.info(f"Agent model: {void_agent.llm_config.model}") 211 logger.info(f"Agent project_id: {void_agent.project_id}") 212 if hasattr(void_agent, 'tools'): 213 logger.info(f"Agent has {len(void_agent.tools)} tools") 214 for tool in void_agent.tools[:3]: # Show first 3 tools 215 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 216 217 return void_agent 218 219 220def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 221 """Process a mention and generate a reply using the Letta agent. 222 223 Args: 224 void_agent: The Letta agent instance 225 atproto_client: The AT Protocol client 226 notification_data: The notification data dictionary 227 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 228 229 Returns: 230 True: Successfully processed, remove from queue 231 False: Failed but retryable, keep in queue 232 None: Failed with non-retryable error, move to errors directory 233 "no_reply": No reply was generated, move to no_reply directory 234 """ 235 try: 236 logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}") 237 238 # Handle both dict and object inputs for backwards compatibility 239 if isinstance(notification_data, dict): 240 uri = notification_data['uri'] 241 mention_text = notification_data.get('record', {}).get('text', '') 242 author_handle = notification_data['author']['handle'] 243 author_name = notification_data['author'].get('display_name') or author_handle 244 else: 245 # Legacy object access 246 uri = notification_data.uri 247 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 248 author_handle = notification_data.author.handle 249 author_name = notification_data.author.display_name or author_handle 250 251 logger.debug(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...") 252 253 # Retrieve the entire thread associated with the mention 254 try: 255 thread = atproto_client.app.bsky.feed.get_post_thread({ 256 'uri': uri, 257 'parent_height': 40, 258 'depth': 10 259 }) 260 except Exception as e: 261 error_str = str(e) 262 # Check if this is a NotFound error 263 if 'NotFound' in error_str or 'Post not found' in error_str: 264 logger.warning(f"Post not found for URI {uri}, removing from queue") 265 return True # Return True to remove from queue 266 else: 267 # Re-raise other errors 268 logger.error(f"Error fetching thread: {e}") 269 raise 270 271 # Get thread context as YAML string 272 logger.debug("Converting thread to YAML string") 273 try: 274 thread_context = thread_to_yaml_string(thread) 275 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 276 277 # Create a more informative preview by extracting meaningful content 278 lines = thread_context.split('\n') 279 meaningful_lines = [] 280 281 for line in lines: 282 stripped = line.strip() 283 if not stripped: 284 continue 285 286 # Look for lines with actual content (not just structure) 287 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 288 meaningful_lines.append(line) 289 if len(meaningful_lines) >= 5: 290 break 291 292 if meaningful_lines: 293 preview = '\n'.join(meaningful_lines) 294 logger.debug(f"Thread content preview:\n{preview}") 295 else: 296 # If no content fields found, just show it's a thread structure 297 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 298 except Exception as yaml_error: 299 import traceback 300 logger.error(f"Error converting thread to YAML: {yaml_error}") 301 logger.error(f"Full traceback:\n{traceback.format_exc()}") 302 logger.error(f"Thread type: {type(thread)}") 303 if hasattr(thread, '__dict__'): 304 logger.error(f"Thread attributes: {thread.__dict__}") 305 # Try to continue with a simple context 306 thread_context = f"Error processing thread context: {str(yaml_error)}" 307 308 # Create a prompt for the Letta agent with thread context 309 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 310 311MOST RECENT POST (the mention you're responding to): 312"{mention_text}" 313 314FULL THREAD CONTEXT: 315```yaml 316{thread_context} 317``` 318 319The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 320 321To reply, use the add_post_to_bluesky_reply_thread tool: 322- Each call creates one post (max 300 characters) 323- For most responses, a single call is sufficient 324- Only use multiple calls for threaded replies when: 325 * The topic requires extended explanation that cannot fit in 300 characters 326 * You're explicitly asked for a detailed/long response 327 * The conversation naturally benefits from a structured multi-part answer 328- Avoid unnecessary threads - be concise when possible""" 329 330 # Extract all handles from notification and thread data 331 all_handles = set() 332 all_handles.update(extract_handles_from_data(notification_data)) 333 all_handles.update(extract_handles_from_data(thread.model_dump())) 334 unique_handles = list(all_handles) 335 336 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 337 338 # Attach user blocks before agent call 339 attached_handles = [] 340 if unique_handles: 341 try: 342 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 343 attach_result = attach_user_blocks(unique_handles, void_agent) 344 attached_handles = unique_handles # Track successfully attached handles 345 logger.debug(f"Attach result: {attach_result}") 346 except Exception as attach_error: 347 logger.warning(f"Failed to attach user blocks: {attach_error}") 348 # Continue without user blocks rather than failing completely 349 350 # Get response from Letta agent 351 # Format with Unicode characters 352 title = f"MENTION FROM @{author_handle}" 353 print(f"\n{title}") 354 print(f" {'' * len(title)}") 355 # Indent the mention text 356 for line in mention_text.split('\n'): 357 print(f" {line}") 358 359 # Log prompt details to separate logger 360 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 361 362 # Log concise prompt info to main logger 363 thread_handles_count = len(unique_handles) 364 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users") 365 366 try: 367 # Use streaming to avoid 524 timeout errors 368 message_stream = CLIENT.agents.messages.create_stream( 369 agent_id=void_agent.id, 370 messages=[{"role": "user", "content": prompt}], 371 stream_tokens=False, # Step streaming only (faster than token streaming) 372 max_steps=100 373 ) 374 375 # Collect the streaming response 376 all_messages = [] 377 for chunk in message_stream: 378 # Log condensed chunk info 379 if hasattr(chunk, 'message_type'): 380 if chunk.message_type == 'reasoning_message': 381 # Show full reasoning without truncation 382 if SHOW_REASONING: 383 # Format with Unicode characters 384 print("\n◆ Reasoning") 385 print(" ─────────") 386 # Indent reasoning lines 387 for line in chunk.reasoning.split('\n'): 388 print(f" {line}") 389 else: 390 # Default log format (only when --reasoning is used due to log level) 391 # Format with Unicode characters 392 print("\n◆ Reasoning") 393 print(" ─────────") 394 # Indent reasoning lines 395 for line in chunk.reasoning.split('\n'): 396 print(f" {line}") 397 elif chunk.message_type == 'tool_call_message': 398 # Parse tool arguments for better display 399 tool_name = chunk.tool_call.name 400 try: 401 args = json.loads(chunk.tool_call.arguments) 402 # Format based on tool type 403 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: 404 # Extract the text being posted 405 text = args.get('text', '') 406 if text: 407 # Format with Unicode characters 408 print("\n✎ Bluesky Post") 409 print(" ────────────") 410 # Indent post text 411 for line in text.split('\n'): 412 print(f" {line}") 413 else: 414 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 415 elif tool_name == 'archival_memory_search': 416 query = args.get('query', 'unknown') 417 global last_archival_query 418 last_archival_query = query 419 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 420 elif tool_name == 'archival_memory_insert': 421 content = args.get('content', '') 422 # Show the full content being inserted 423 log_with_panel(content, f"Tool call: {tool_name}", "blue") 424 elif tool_name == 'update_block': 425 label = args.get('label', 'unknown') 426 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 427 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 428 else: 429 # Generic display for other tools 430 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 431 if len(args_str) > 150: 432 args_str = args_str[:150] + "..." 433 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 434 except: 435 # Fallback to original format if parsing fails 436 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 437 elif chunk.message_type == 'tool_return_message': 438 # Enhanced tool result logging 439 tool_name = chunk.name 440 status = chunk.status 441 442 if status == 'success': 443 # Try to show meaningful result info based on tool type 444 if hasattr(chunk, 'tool_return') and chunk.tool_return: 445 result_str = str(chunk.tool_return) 446 if tool_name == 'archival_memory_search': 447 448 try: 449 # Handle both string and list formats 450 if isinstance(chunk.tool_return, str): 451 # The string format is: "([{...}, {...}], count)" 452 # We need to extract just the list part 453 if chunk.tool_return.strip(): 454 # Find the list part between the first [ and last ] 455 start_idx = chunk.tool_return.find('[') 456 end_idx = chunk.tool_return.rfind(']') 457 if start_idx != -1 and end_idx != -1: 458 list_str = chunk.tool_return[start_idx:end_idx+1] 459 # Use ast.literal_eval since this is Python literal syntax, not JSON 460 import ast 461 results = ast.literal_eval(list_str) 462 else: 463 logger.warning("Could not find list in archival_memory_search result") 464 results = [] 465 else: 466 logger.warning("Empty string returned from archival_memory_search") 467 results = [] 468 else: 469 # If it's already a list, use directly 470 results = chunk.tool_return 471 472 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name}", "green") 473 474 # Use the captured search query from the tool call 475 search_query = last_archival_query 476 477 # Combine all results into a single text block 478 content_text = "" 479 for i, entry in enumerate(results, 1): 480 timestamp = entry.get('timestamp', 'N/A') 481 content = entry.get('content', '') 482 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" 483 484 # Format with Unicode characters 485 title = f"{search_query} ({len(results)} results)" 486 print(f"\n{title}") 487 print(f" {'' * len(title)}") 488 # Indent content text 489 for line in content_text.strip().split('\n'): 490 print(f" {line}") 491 492 except Exception as e: 493 logger.error(f"Error formatting archival memory results: {e}") 494 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name}", "green") 495 elif tool_name == 'add_post_to_bluesky_reply_thread': 496 # Just show success for bluesky posts, the text was already shown in tool call 497 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") 498 elif tool_name == 'archival_memory_insert': 499 # Skip archival memory insert results (always returns None) 500 pass 501 elif tool_name == 'update_block': 502 log_with_panel("Memory block updated", f"Tool result: {tool_name}", "green") 503 else: 504 # Generic success with preview 505 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 506 log_with_panel(preview, f"Tool result: {tool_name}", "green") 507 else: 508 log_with_panel("Success", f"Tool result: {tool_name}", "green") 509 elif status == 'error': 510 # Show error details 511 if tool_name == 'add_post_to_bluesky_reply_thread': 512 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" 513 log_with_panel(error_str, f"Bluesky Post ✗", "red") 514 elif tool_name == 'archival_memory_insert': 515 # Skip archival memory insert errors too 516 pass 517 else: 518 error_preview = "" 519 if hasattr(chunk, 'tool_return') and chunk.tool_return: 520 error_str = str(chunk.tool_return) 521 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 522 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name}", "red") 523 else: 524 log_with_panel("Error occurred", f"Tool result: {tool_name}", "red") 525 else: 526 logger.info(f"Tool result: {tool_name} - {status}") 527 elif chunk.message_type == 'assistant_message': 528 # Format with Unicode characters 529 print("\n▶ Assistant Response") 530 print(" ──────────────────") 531 # Indent response text 532 for line in chunk.content.split('\n'): 533 print(f" {line}") 534 else: 535 # Filter out verbose message types 536 if chunk.message_type not in ['usage_statistics', 'stop_reason']: 537 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") 538 else: 539 logger.info(f"📦 Stream status: {chunk}") 540 541 # Log full chunk for debugging 542 logger.debug(f"Full streaming chunk: {chunk}") 543 all_messages.append(chunk) 544 if str(chunk) == 'done': 545 break 546 547 # Convert streaming response to standard format for compatibility 548 message_response = type('StreamingResponse', (), { 549 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 550 })() 551 except Exception as api_error: 552 import traceback 553 error_str = str(api_error) 554 logger.error(f"Letta API error: {api_error}") 555 logger.error(f"Error type: {type(api_error).__name__}") 556 logger.error(f"Full traceback:\n{traceback.format_exc()}") 557 logger.error(f"Mention text was: {mention_text}") 558 logger.error(f"Author: @{author_handle}") 559 logger.error(f"URI: {uri}") 560 561 562 # Try to extract more info from different error types 563 if hasattr(api_error, 'response'): 564 logger.error(f"Error response object exists") 565 if hasattr(api_error.response, 'text'): 566 logger.error(f"Response text: {api_error.response.text}") 567 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 568 try: 569 logger.error(f"Response JSON: {api_error.response.json()}") 570 except: 571 pass 572 573 # Check for specific error types 574 if hasattr(api_error, 'status_code'): 575 logger.error(f"API Status code: {api_error.status_code}") 576 if hasattr(api_error, 'body'): 577 logger.error(f"API Response body: {api_error.body}") 578 if hasattr(api_error, 'headers'): 579 logger.error(f"API Response headers: {api_error.headers}") 580 581 if api_error.status_code == 413: 582 logger.error("413 Payload Too Large - moving to errors directory") 583 return None # Move to errors directory - payload is too large to ever succeed 584 elif api_error.status_code == 524: 585 logger.error("524 error - timeout from Cloudflare, will retry later") 586 return False # Keep in queue for retry 587 588 # Check if error indicates we should remove from queue 589 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 590 logger.warning("Payload too large error, moving to errors directory") 591 return None # Move to errors directory - cannot be fixed by retry 592 elif 'status_code: 524' in error_str: 593 logger.warning("524 timeout error, keeping in queue for retry") 594 return False # Keep in queue for retry 595 596 raise 597 598 # Log successful response 599 logger.debug("Successfully received response from Letta API") 600 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 601 602 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 603 reply_candidates = [] 604 tool_call_results = {} # Map tool_call_id to status 605 ack_note = None # Track any note from annotate_ack tool 606 607 logger.debug(f"Processing {len(message_response.messages)} response messages...") 608 609 # First pass: collect tool return statuses 610 ignored_notification = False 611 ignore_reason = "" 612 ignore_category = "" 613 614 for message in message_response.messages: 615 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 616 if message.name == 'add_post_to_bluesky_reply_thread': 617 tool_call_results[message.tool_call_id] = message.status 618 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 619 elif message.name == 'ignore_notification': 620 # Check if the tool was successful 621 if hasattr(message, 'tool_return') and message.status == 'success': 622 # Parse the return value to extract category and reason 623 result_str = str(message.tool_return) 624 if 'IGNORED_NOTIFICATION::' in result_str: 625 parts = result_str.split('::') 626 if len(parts) >= 3: 627 ignore_category = parts[1] 628 ignore_reason = parts[2] 629 ignored_notification = True 630 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 631 elif message.name == 'bluesky_reply': 632 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 633 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 634 logger.error("Update the agent's tools using register_tools.py") 635 # Export agent state before terminating 636 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 637 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 638 exit(1) 639 640 # Second pass: process messages and check for successful tool calls 641 for i, message in enumerate(message_response.messages, 1): 642 # Log concise message info instead of full object 643 msg_type = getattr(message, 'message_type', 'unknown') 644 if hasattr(message, 'reasoning') and message.reasoning: 645 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 646 elif hasattr(message, 'tool_call') and message.tool_call: 647 tool_name = message.tool_call.name 648 logger.debug(f" {i}. {msg_type}: {tool_name}") 649 elif hasattr(message, 'tool_return'): 650 tool_name = getattr(message, 'name', 'unknown_tool') 651 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 652 status = getattr(message, 'status', 'unknown') 653 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 654 elif hasattr(message, 'text'): 655 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 656 else: 657 logger.debug(f" {i}. {msg_type}: <no content>") 658 659 # Check for halt_activity tool call 660 if hasattr(message, 'tool_call') and message.tool_call: 661 if message.tool_call.name == 'halt_activity': 662 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 663 try: 664 args = json.loads(message.tool_call.arguments) 665 reason = args.get('reason', 'Agent requested halt') 666 logger.info(f"Halt reason: {reason}") 667 except: 668 logger.info("Halt reason: <unable to parse>") 669 670 # Delete the queue file before terminating 671 if queue_filepath and queue_filepath.exists(): 672 queue_filepath.unlink() 673 logger.info(f"Deleted queue file: {queue_filepath.name}") 674 675 # Also mark as processed to avoid reprocessing 676 processed_uris = load_processed_notifications() 677 processed_uris.add(notification_data.get('uri', '')) 678 save_processed_notifications(processed_uris) 679 680 # Export agent state before terminating 681 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 682 683 # Exit the program 684 logger.info("=== BOT TERMINATED BY AGENT ===") 685 exit(0) 686 687 # Check for deprecated bluesky_reply tool 688 if hasattr(message, 'tool_call') and message.tool_call: 689 if message.tool_call.name == 'bluesky_reply': 690 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 691 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 692 logger.error("Update the agent's tools using register_tools.py") 693 # Export agent state before terminating 694 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 695 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 696 exit(1) 697 698 # Collect annotate_ack tool calls 699 elif message.tool_call.name == 'annotate_ack': 700 try: 701 args = json.loads(message.tool_call.arguments) 702 note = args.get('note', '') 703 if note: 704 ack_note = note 705 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 706 except json.JSONDecodeError as e: 707 logger.error(f"Failed to parse annotate_ack arguments: {e}") 708 709 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 710 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 711 tool_call_id = message.tool_call.tool_call_id 712 tool_status = tool_call_results.get(tool_call_id, 'unknown') 713 714 if tool_status == 'success': 715 try: 716 args = json.loads(message.tool_call.arguments) 717 reply_text = args.get('text', '') 718 reply_lang = args.get('lang', 'en-US') 719 720 if reply_text: # Only add if there's actual content 721 reply_candidates.append((reply_text, reply_lang)) 722 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 723 except json.JSONDecodeError as e: 724 logger.error(f"Failed to parse tool call arguments: {e}") 725 elif tool_status == 'error': 726 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 727 else: 728 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 729 730 # Check for conflicting tool calls 731 if reply_candidates and ignored_notification: 732 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 733 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 734 logger.warning("Item will be left in queue for manual review") 735 # Return False to keep in queue 736 return False 737 738 if reply_candidates: 739 # Aggregate reply posts into a thread 740 reply_messages = [] 741 reply_langs = [] 742 for text, lang in reply_candidates: 743 reply_messages.append(text) 744 reply_langs.append(lang) 745 746 # Use the first language for the entire thread (could be enhanced later) 747 reply_lang = reply_langs[0] if reply_langs else 'en-US' 748 749 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 750 751 # Display the generated reply thread 752 if len(reply_messages) == 1: 753 content = reply_messages[0] 754 title = f"Reply to @{author_handle}" 755 else: 756 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) 757 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" 758 759 # Format with Unicode characters 760 print(f"\n{title}") 761 print(f" {'' * len(title)}") 762 # Indent content lines 763 for line in content.split('\n'): 764 print(f" {line}") 765 766 # Send the reply(s) with language (unless in testing mode) 767 if testing_mode: 768 logger.info("TESTING MODE: Skipping actual Bluesky post") 769 response = True # Simulate success 770 else: 771 if len(reply_messages) == 1: 772 # Single reply - use existing function 773 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 774 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 775 response = bsky_utils.reply_to_notification( 776 client=atproto_client, 777 notification=notification_data, 778 reply_text=cleaned_text, 779 lang=reply_lang 780 ) 781 else: 782 # Multiple replies - use new threaded function 783 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 784 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 785 response = bsky_utils.reply_with_thread_to_notification( 786 client=atproto_client, 787 notification=notification_data, 788 reply_messages=cleaned_messages, 789 lang=reply_lang 790 ) 791 792 if response: 793 logger.info(f"Successfully replied to @{author_handle}") 794 795 # Acknowledge the post we're replying to with stream.thought.ack 796 try: 797 post_uri = notification_data.get('uri') 798 post_cid = notification_data.get('cid') 799 800 if post_uri and post_cid: 801 ack_result = bsky_utils.acknowledge_post( 802 client=atproto_client, 803 post_uri=post_uri, 804 post_cid=post_cid, 805 note=ack_note 806 ) 807 if ack_result: 808 if ack_note: 809 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")") 810 else: 811 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack") 812 else: 813 logger.warning(f"Failed to acknowledge post from @{author_handle}") 814 else: 815 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}") 816 except Exception as e: 817 logger.error(f"Error acknowledging post from @{author_handle}: {e}") 818 # Don't fail the entire operation if acknowledgment fails 819 820 return True 821 else: 822 logger.error(f"Failed to send reply to @{author_handle}") 823 return False 824 else: 825 # Check if notification was explicitly ignored 826 if ignored_notification: 827 logger.info(f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})") 828 return "ignored" 829 else: 830 logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder") 831 return "no_reply" 832 833 except Exception as e: 834 logger.error(f"Error processing mention: {e}") 835 return False 836 finally: 837 # Detach user blocks after agent response (success or failure) 838 if 'attached_handles' in locals() and attached_handles: 839 try: 840 logger.info(f"Detaching user blocks for handles: {attached_handles}") 841 detach_result = detach_user_blocks(attached_handles, void_agent) 842 logger.debug(f"Detach result: {detach_result}") 843 except Exception as detach_error: 844 logger.warning(f"Failed to detach user blocks: {detach_error}") 845 846 847def notification_to_dict(notification): 848 """Convert a notification object to a dictionary for JSON serialization.""" 849 return { 850 'uri': notification.uri, 851 'cid': notification.cid, 852 'reason': notification.reason, 853 'is_read': notification.is_read, 854 'indexed_at': notification.indexed_at, 855 'author': { 856 'handle': notification.author.handle, 857 'display_name': notification.author.display_name, 858 'did': notification.author.did 859 }, 860 'record': { 861 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 862 } 863 } 864 865 866def load_processed_notifications(): 867 """Load the set of processed notification URIs.""" 868 if PROCESSED_NOTIFICATIONS_FILE.exists(): 869 try: 870 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f: 871 data = json.load(f) 872 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS) 873 if len(data) > MAX_PROCESSED_NOTIFICATIONS: 874 data = data[-MAX_PROCESSED_NOTIFICATIONS:] 875 save_processed_notifications(data) 876 return set(data) 877 except Exception as e: 878 logger.error(f"Error loading processed notifications: {e}") 879 return set() 880 881 882def save_processed_notifications(processed_set): 883 """Save the set of processed notification URIs.""" 884 try: 885 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f: 886 json.dump(list(processed_set), f) 887 except Exception as e: 888 logger.error(f"Error saving processed notifications: {e}") 889 890 891def save_notification_to_queue(notification, is_priority=None): 892 """Save a notification to the queue directory with priority-based filename.""" 893 try: 894 # Check if already processed 895 processed_uris = load_processed_notifications() 896 897 # Handle both notification objects and dicts 898 if isinstance(notification, dict): 899 notif_dict = notification 900 notification_uri = notification.get('uri') 901 else: 902 notif_dict = notification_to_dict(notification) 903 notification_uri = notification.uri 904 905 if notification_uri in processed_uris: 906 logger.debug(f"Notification already processed: {notification_uri}") 907 return False 908 909 # Create JSON string 910 notif_json = json.dumps(notif_dict, sort_keys=True) 911 912 # Generate hash for filename (to avoid duplicates) 913 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 914 915 # Determine priority based on author handle or explicit priority 916 if is_priority is not None: 917 priority_prefix = "0_" if is_priority else "1_" 918 else: 919 if isinstance(notification, dict): 920 author_handle = notification.get('author', {}).get('handle', '') 921 else: 922 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 923 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 924 925 # Create filename with priority, timestamp and hash 926 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 927 reason = notif_dict.get('reason', 'unknown') 928 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" 929 filepath = QUEUE_DIR / filename 930 931 # Check if this notification URI is already in the queue 932 for existing_file in QUEUE_DIR.glob("*.json"): 933 if existing_file.name == "processed_notifications.json": 934 continue 935 try: 936 with open(existing_file, 'r') as f: 937 existing_data = json.load(f) 938 if existing_data.get('uri') == notification_uri: 939 logger.debug(f"Notification already queued (URI: {notification_uri})") 940 return False 941 except: 942 continue 943 944 # Write to file 945 with open(filepath, 'w') as f: 946 json.dump(notif_dict, f, indent=2) 947 948 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 949 logger.info(f"Queued notification ({priority_label}): {filename}") 950 return True 951 952 except Exception as e: 953 logger.error(f"Error saving notification to queue: {e}") 954 return False 955 956 957def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 958 """Load and process all notifications from the queue in priority order.""" 959 try: 960 # Get all JSON files in queue directory (excluding processed_notifications.json) 961 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 962 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 963 964 # Filter out and delete like notifications immediately 965 queue_files = [] 966 likes_deleted = 0 967 968 for filepath in all_queue_files: 969 try: 970 with open(filepath, 'r') as f: 971 notif_data = json.load(f) 972 973 # If it's a like, delete it immediately and don't process 974 if notif_data.get('reason') == 'like': 975 filepath.unlink() 976 likes_deleted += 1 977 logger.debug(f"Deleted like notification: {filepath.name}") 978 else: 979 queue_files.append(filepath) 980 except Exception as e: 981 logger.warning(f"Error checking notification file {filepath.name}: {e}") 982 queue_files.append(filepath) # Keep it in case it's valid 983 984 if likes_deleted > 0: 985 logger.info(f"Deleted {likes_deleted} like notifications from queue") 986 987 if not queue_files: 988 return 989 990 logger.info(f"Processing {len(queue_files)} queued notifications") 991 992 # Log current statistics 993 elapsed_time = time.time() - start_time 994 total_messages = sum(message_counters.values()) 995 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 996 997 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 998 999 for i, filepath in enumerate(queue_files, 1): 1000 # Check for new notifications periodically during queue processing 1001 if i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1: 1002 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") 1003 try: 1004 # Fetch and queue new notifications without processing them 1005 new_count = fetch_and_queue_new_notifications(atproto_client) 1006 1007 if new_count > 0: 1008 logger.info(f"Added {new_count} new notifications to queue") 1009 # Reload the queue files to include the new items 1010 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1011 queue_files = updated_queue_files 1012 logger.info(f"Queue updated: now {len(queue_files)} total items") 1013 except Exception as e: 1014 logger.error(f"Error checking for new notifications: {e}") 1015 1016 logger.info(f"Processing queue file {i}/{len(queue_files)}: {filepath.name}") 1017 try: 1018 # Load notification data 1019 with open(filepath, 'r') as f: 1020 notif_data = json.load(f) 1021 1022 # Process based on type using dict data directly 1023 success = False 1024 if notif_data['reason'] == "mention": 1025 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1026 if success: 1027 message_counters['mentions'] += 1 1028 elif notif_data['reason'] == "reply": 1029 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1030 if success: 1031 message_counters['replies'] += 1 1032 elif notif_data['reason'] == "follow": 1033 author_handle = notif_data['author']['handle'] 1034 author_display_name = notif_data['author'].get('display_name', 'no display name') 1035 follow_update = f"@{author_handle} ({author_display_name}) started following you." 1036 logger.info(f"Notifying agent about new follower: @{author_handle}") 1037 CLIENT.agents.messages.create( 1038 agent_id = void_agent.id, 1039 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 1040 ) 1041 success = True # Follow updates are always successful 1042 if success: 1043 message_counters['follows'] += 1 1044 elif notif_data['reason'] == "repost": 1045 # Skip reposts silently 1046 success = True # Skip reposts but mark as successful to remove from queue 1047 if success: 1048 message_counters['reposts_skipped'] += 1 1049 elif notif_data['reason'] == "like": 1050 # Skip likes silently 1051 success = True # Skip likes but mark as successful to remove from queue 1052 if success: 1053 message_counters.setdefault('likes_skipped', 0) 1054 message_counters['likes_skipped'] += 1 1055 else: 1056 logger.warning(f"Unknown notification type: {notif_data['reason']}") 1057 success = True # Remove unknown types from queue 1058 1059 # Handle file based on processing result 1060 if success: 1061 if testing_mode: 1062 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") 1063 else: 1064 filepath.unlink() 1065 logger.info(f"Successfully processed and removed: {filepath.name}") 1066 1067 # Mark as processed to avoid reprocessing 1068 processed_uris = load_processed_notifications() 1069 processed_uris.add(notif_data['uri']) 1070 save_processed_notifications(processed_uris) 1071 1072 elif success is None: # Special case for moving to error directory 1073 error_path = QUEUE_ERROR_DIR / filepath.name 1074 filepath.rename(error_path) 1075 logger.warning(f"Moved {filepath.name} to errors directory") 1076 1077 # Also mark as processed to avoid retrying 1078 processed_uris = load_processed_notifications() 1079 processed_uris.add(notif_data['uri']) 1080 save_processed_notifications(processed_uris) 1081 1082 elif success == "no_reply": # Special case for moving to no_reply directory 1083 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 1084 filepath.rename(no_reply_path) 1085 logger.info(f"Moved {filepath.name} to no_reply directory") 1086 1087 # Also mark as processed to avoid retrying 1088 processed_uris = load_processed_notifications() 1089 processed_uris.add(notif_data['uri']) 1090 save_processed_notifications(processed_uris) 1091 1092 elif success == "ignored": # Special case for explicitly ignored notifications 1093 # For ignored notifications, we just delete them (not move to no_reply) 1094 filepath.unlink() 1095 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1096 1097 # Also mark as processed to avoid retrying 1098 processed_uris = load_processed_notifications() 1099 processed_uris.add(notif_data['uri']) 1100 save_processed_notifications(processed_uris) 1101 1102 else: 1103 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 1104 1105 except Exception as e: 1106 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 1107 # Keep the file for retry later 1108 1109 except Exception as e: 1110 logger.error(f"Error loading queued notifications: {e}") 1111 1112 1113def fetch_and_queue_new_notifications(atproto_client): 1114 """Fetch new notifications and queue them without processing.""" 1115 try: 1116 # Get current time for marking notifications as seen 1117 logger.debug("Getting current time for notification marking...") 1118 last_seen_at = atproto_client.get_current_time_iso() 1119 1120 # Fetch ALL notifications using pagination 1121 all_notifications = [] 1122 cursor = None 1123 page_count = 0 1124 max_pages = 20 # Safety limit to prevent infinite loops 1125 1126 while page_count < max_pages: 1127 try: 1128 # Fetch notifications page 1129 if cursor: 1130 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1131 params={'cursor': cursor, 'limit': 100} 1132 ) 1133 else: 1134 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1135 params={'limit': 100} 1136 ) 1137 1138 page_count += 1 1139 page_notifications = notifications_response.notifications 1140 1141 if not page_notifications: 1142 break 1143 1144 all_notifications.extend(page_notifications) 1145 1146 # Check if there are more pages 1147 cursor = getattr(notifications_response, 'cursor', None) 1148 if not cursor: 1149 break 1150 1151 except Exception as e: 1152 logger.error(f"Error fetching notifications page {page_count}: {e}") 1153 break 1154 1155 # Now process all fetched notifications 1156 new_count = 0 1157 if all_notifications: 1158 # Mark as seen first 1159 try: 1160 atproto_client.app.bsky.notification.update_seen( 1161 data={'seenAt': last_seen_at} 1162 ) 1163 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") 1164 except Exception as e: 1165 logger.error(f"Error marking notifications as seen: {e}") 1166 1167 # Queue all new notifications (except likes and already read) 1168 for notif in all_notifications: 1169 # Skip if already read or if it's a like 1170 if (hasattr(notif, 'is_read') and notif.is_read) or (hasattr(notif, 'reason') and notif.reason == 'like'): 1171 continue 1172 1173 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif 1174 1175 # Skip likes in dict form too 1176 if notif_dict.get('reason') == 'like': 1177 continue 1178 1179 # Check if it's a priority notification 1180 is_priority = False 1181 1182 # Priority for cameron.pfiffer.org notifications 1183 author_handle = notif_dict.get('author', {}).get('handle', '') 1184 if author_handle == "cameron.pfiffer.org": 1185 is_priority = True 1186 1187 # Also check for priority keywords in mentions 1188 if notif_dict.get('reason') == 'mention': 1189 # Get the mention text to check for priority keywords 1190 record = notif_dict.get('record', {}) 1191 text = record.get('text', '') 1192 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): 1193 is_priority = True 1194 1195 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1196 new_count += 1 1197 1198 logger.info(f"Queued {new_count} new notifications and marked as seen") 1199 else: 1200 logger.debug("No new notifications to queue") 1201 1202 return new_count 1203 1204 except Exception as e: 1205 logger.error(f"Error fetching and queueing notifications: {e}") 1206 return 0 1207 1208 1209def process_notifications(void_agent, atproto_client, testing_mode=False): 1210 """Fetch new notifications, queue them, and process the queue.""" 1211 try: 1212 # Fetch and queue new notifications 1213 new_count = fetch_and_queue_new_notifications(atproto_client) 1214 1215 if new_count > 0: 1216 logger.info(f"Found {new_count} new notifications to process") 1217 1218 # Now process the entire queue (old + new notifications) 1219 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 1220 1221 except Exception as e: 1222 logger.error(f"Error processing notifications: {e}") 1223 1224 1225def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None: 1226 """ 1227 Detach all user blocks from the agent to prevent memory bloat. 1228 This should be called periodically to ensure clean state. 1229 """ 1230 try: 1231 # Get all blocks attached to the agent 1232 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 1233 1234 user_blocks_to_detach = [] 1235 for block in attached_blocks: 1236 if hasattr(block, 'label') and block.label.startswith('user_'): 1237 user_blocks_to_detach.append({ 1238 'label': block.label, 1239 'id': block.id 1240 }) 1241 1242 if not user_blocks_to_detach: 1243 logger.debug("No user blocks found to detach during periodic cleanup") 1244 return 1245 1246 # Detach each user block 1247 detached_count = 0 1248 for block_info in user_blocks_to_detach: 1249 try: 1250 client.agents.blocks.detach( 1251 agent_id=agent_id, 1252 block_id=str(block_info['id']) 1253 ) 1254 detached_count += 1 1255 logger.debug(f"Detached user block: {block_info['label']}") 1256 except Exception as e: 1257 logger.warning(f"Failed to detach block {block_info['label']}: {e}") 1258 1259 if detached_count > 0: 1260 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks") 1261 1262 except Exception as e: 1263 logger.error(f"Error during periodic user block cleanup: {e}") 1264 1265 1266def main(): 1267 # Parse command line arguments 1268 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 1269 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1270 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1271 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') 1272 # --rich option removed as we now use simple text formatting 1273 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') 1274 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 1275 args = parser.parse_args() 1276 1277 # Configure logging based on command line arguments 1278 if args.simple_logs: 1279 log_format = "void - %(levelname)s - %(message)s" 1280 else: 1281 # Create custom formatter with symbols 1282 class SymbolFormatter(logging.Formatter): 1283 """Custom formatter that adds symbols for different log levels""" 1284 1285 SYMBOLS = { 1286 logging.DEBUG: '', 1287 logging.INFO: '', 1288 logging.WARNING: '', 1289 logging.ERROR: '', 1290 logging.CRITICAL: '' 1291 } 1292 1293 def format(self, record): 1294 # Get the symbol for this log level 1295 symbol = self.SYMBOLS.get(record.levelno, '') 1296 1297 # Format time as HH:MM:SS 1298 timestamp = self.formatTime(record, "%H:%M:%S") 1299 1300 # Build the formatted message 1301 level_name = f"{record.levelname:<5}" # Left-align, 5 chars 1302 1303 # Use vertical bar as separator 1304 parts = [symbol, timestamp, '', level_name, '', record.getMessage()] 1305 1306 return ' '.join(parts) 1307 1308 # Reset logging configuration 1309 for handler in logging.root.handlers[:]: 1310 logging.root.removeHandler(handler) 1311 1312 # Create handler with custom formatter 1313 handler = logging.StreamHandler() 1314 if not args.simple_logs: 1315 handler.setFormatter(SymbolFormatter()) 1316 else: 1317 handler.setFormatter(logging.Formatter(log_format)) 1318 1319 # Configure root logger 1320 logging.root.setLevel(logging.INFO) 1321 logging.root.addHandler(handler) 1322 1323 global logger, prompt_logger, console 1324 logger = logging.getLogger("void_bot") 1325 logger.setLevel(logging.INFO) 1326 1327 # Create a separate logger for prompts (set to WARNING to hide by default) 1328 prompt_logger = logging.getLogger("void_bot.prompts") 1329 if args.reasoning: 1330 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1331 else: 1332 prompt_logger.setLevel(logging.WARNING) # Hide by default 1333 1334 # Disable httpx logging completely 1335 logging.getLogger("httpx").setLevel(logging.CRITICAL) 1336 1337 # Create Rich console for pretty printing 1338 # Console no longer used - simple text formatting 1339 1340 global TESTING_MODE, SKIP_GIT, SHOW_REASONING 1341 TESTING_MODE = args.test 1342 1343 # Store no-git flag globally for use in export_agent_state calls 1344 SKIP_GIT = args.no_git 1345 1346 # Store rich flag globally 1347 # Rich formatting no longer used 1348 1349 # Store reasoning flag globally 1350 SHOW_REASONING = args.reasoning 1351 1352 if TESTING_MODE: 1353 logger.info("=== RUNNING IN TESTING MODE ===") 1354 logger.info(" - No messages will be sent to Bluesky") 1355 logger.info(" - Queue files will not be deleted") 1356 logger.info(" - Notifications will not be marked as seen") 1357 print("\n") 1358 """Main bot loop that continuously monitors for notifications.""" 1359 global start_time 1360 start_time = time.time() 1361 logger.info("=== STARTING VOID BOT ===") 1362 void_agent = initialize_void() 1363 logger.info(f"Void agent initialized: {void_agent.id}") 1364 1365 # Check if agent has required tools 1366 if hasattr(void_agent, 'tools') and void_agent.tools: 1367 tool_names = [tool.name for tool in void_agent.tools] 1368 # Check for bluesky-related tools 1369 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1370 if not bluesky_tools: 1371 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 1372 else: 1373 logger.warning("Agent has no tools registered!") 1374 1375 # Initialize Bluesky client 1376 atproto_client = bsky_utils.default_login() 1377 logger.info("Connected to Bluesky") 1378 1379 # Main loop 1380 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 1381 1382 cycle_count = 0 1383 CLEANUP_INTERVAL = args.cleanup_interval 1384 1385 if CLEANUP_INTERVAL > 0: 1386 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles") 1387 else: 1388 logger.info("User block cleanup disabled") 1389 1390 while True: 1391 try: 1392 cycle_count += 1 1393 process_notifications(void_agent, atproto_client, TESTING_MODE) 1394 1395 # Run periodic cleanup every N cycles 1396 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 1397 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 1398 periodic_user_block_cleanup(CLIENT, void_agent.id) 1399 1400 # Log cycle completion with stats 1401 elapsed_time = time.time() - start_time 1402 total_messages = sum(message_counters.values()) 1403 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1404 1405 if total_messages > 0: 1406 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 1407 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 1408 1409 except KeyboardInterrupt: 1410 # Final stats 1411 elapsed_time = time.time() - start_time 1412 total_messages = sum(message_counters.values()) 1413 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1414 1415 logger.info("=== BOT STOPPED BY USER ===") 1416 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 1417 logger.info(f" - {message_counters['mentions']} mentions") 1418 logger.info(f" - {message_counters['replies']} replies") 1419 logger.info(f" - {message_counters['follows']} follows") 1420 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 1421 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 1422 break 1423 except Exception as e: 1424 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 1425 logger.error(f"Error details: {e}") 1426 # Wait a bit longer on errors 1427 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 1428 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 1429 1430 1431if __name__ == "__main__": 1432 main()