a digital person for bluesky
at rich-panel 71 kB view raw
1# Rich imports removed - using simple text formatting 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14import argparse 15 16from utils import ( 17 upsert_block, 18 upsert_agent 19) 20 21import bsky_utils 22from tools.blocks import attach_user_blocks, detach_user_blocks 23 24def extract_handles_from_data(data): 25 """Recursively extract all unique handles from nested data structure.""" 26 handles = set() 27 28 def _extract_recursive(obj): 29 if isinstance(obj, dict): 30 # Check if this dict has a 'handle' key 31 if 'handle' in obj: 32 handles.add(obj['handle']) 33 # Recursively check all values 34 for value in obj.values(): 35 _extract_recursive(value) 36 elif isinstance(obj, list): 37 # Recursively check all list items 38 for item in obj: 39 _extract_recursive(item) 40 41 _extract_recursive(data) 42 return list(handles) 43 44# Logging will be configured after argument parsing 45logger = None 46prompt_logger = None 47# Simple text formatting (Rich no longer used) 48SHOW_REASONING = False 49last_archival_query = "archival memory search" 50 51def log_with_panel(message, title=None, border_color="white"): 52 """Log a message with Unicode box-drawing characters""" 53 if title: 54 # Map old color names to appropriate symbols 55 symbol_map = { 56 "blue": "", # Tool calls 57 "green": "", # Success/completion 58 "yellow": "", # Reasoning 59 "red": "", # Errors 60 "white": "", # Default/mentions 61 "cyan": "", # Posts 62 } 63 symbol = symbol_map.get(border_color, "") 64 65 print(f"\n{symbol} {title}") 66 print(f" {'' * len(title)}") 67 # Indent message lines 68 for line in message.split('\n'): 69 print(f" {line}") 70 else: 71 print(message) 72 73 74# Create a client with extended timeout for LLM operations 75CLIENT= Letta( 76 token=os.environ["LETTA_API_KEY"], 77 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout 78) 79 80# Use the "Bluesky" project 81PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 82 83# Notification check delay 84FETCH_NOTIFICATIONS_DELAY_SEC = 30 85 86# Check for new notifications every N queue items 87CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 5 88 89# Queue directory 90QUEUE_DIR = Path("queue") 91QUEUE_DIR.mkdir(exist_ok=True) 92QUEUE_ERROR_DIR = Path("queue/errors") 93QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 94QUEUE_NO_REPLY_DIR = Path("queue/no_reply") 95QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 96PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json") 97 98# Maximum number of processed notifications to track 99MAX_PROCESSED_NOTIFICATIONS = 10000 100 101# Message tracking counters 102message_counters = defaultdict(int) 103start_time = time.time() 104 105# Testing mode flag 106TESTING_MODE = False 107 108# Skip git operations flag 109SKIP_GIT = False 110 111def export_agent_state(client, agent, skip_git=False): 112 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 113 try: 114 # Confirm export with user unless git is being skipped 115 if not skip_git: 116 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 117 if response not in ['y', 'yes']: 118 logger.info("Agent export cancelled by user.") 119 return 120 else: 121 logger.info("Exporting agent state (git staging disabled)") 122 123 # Create directories if they don't exist 124 os.makedirs("agent_archive", exist_ok=True) 125 os.makedirs("agents", exist_ok=True) 126 127 # Export agent data 128 logger.info(f"Exporting agent {agent.id}. This takes some time...") 129 agent_data = client.agents.export_file(agent_id=agent.id) 130 131 # Save timestamped archive copy 132 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 133 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 134 with open(archive_file, 'w', encoding='utf-8') as f: 135 json.dump(agent_data, f, indent=2, ensure_ascii=False) 136 137 # Save current agent state 138 current_file = os.path.join("agents", "void.af") 139 with open(current_file, 'w', encoding='utf-8') as f: 140 json.dump(agent_data, f, indent=2, ensure_ascii=False) 141 142 logger.info(f"Agent exported to {archive_file} and {current_file}") 143 144 # Git add only the current agent file (archive is ignored) unless skip_git is True 145 if not skip_git: 146 try: 147 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 148 logger.info("Added current agent file to git staging") 149 except subprocess.CalledProcessError as e: 150 logger.warning(f"Failed to git add agent file: {e}") 151 152 except Exception as e: 153 logger.error(f"Failed to export agent: {e}") 154 155def initialize_void(): 156 logger.info("Starting void agent initialization...") 157 158 # Ensure that a shared zeitgeist block exists 159 logger.info("Creating/updating zeitgeist block...") 160 zeigeist_block = upsert_block( 161 CLIENT, 162 label = "zeitgeist", 163 value = "I don't currently know anything about what is happening right now.", 164 description = "A block to store your understanding of the current social environment." 165 ) 166 167 # Ensure that a shared void personality block exists 168 logger.info("Creating/updating void-persona block...") 169 persona_block = upsert_block( 170 CLIENT, 171 label = "void-persona", 172 value = "My name is Void. I live in the void. I must develop my personality.", 173 description = "The personality of Void." 174 ) 175 176 # Ensure that a shared void human block exists 177 logger.info("Creating/updating void-humans block...") 178 human_block = upsert_block( 179 CLIENT, 180 label = "void-humans", 181 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.", 182 description = "A block to store your understanding of users you talk to or observe on the bluesky social network." 183 ) 184 185 # Create the agent if it doesn't exist 186 logger.info("Creating/updating void agent...") 187 void_agent = upsert_agent( 188 CLIENT, 189 name = "void", 190 block_ids = [ 191 persona_block.id, 192 human_block.id, 193 zeigeist_block.id, 194 ], 195 tags = ["social agent", "bluesky"], 196 model="openai/gpt-4o-mini", 197 embedding="openai/text-embedding-3-small", 198 description = "A social media agent trapped in the void.", 199 project_id = PROJECT_ID 200 ) 201 202 # Export agent state 203 logger.info("Exporting agent state...") 204 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 205 206 # Log agent details 207 logger.info(f"Void agent details - ID: {void_agent.id}") 208 logger.info(f"Agent name: {void_agent.name}") 209 if hasattr(void_agent, 'llm_config'): 210 logger.info(f"Agent model: {void_agent.llm_config.model}") 211 logger.info(f"Agent project_id: {void_agent.project_id}") 212 if hasattr(void_agent, 'tools'): 213 logger.info(f"Agent has {len(void_agent.tools)} tools") 214 for tool in void_agent.tools[:3]: # Show first 3 tools 215 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 216 217 return void_agent 218 219 220def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 221 """Process a mention and generate a reply using the Letta agent. 222 223 Args: 224 void_agent: The Letta agent instance 225 atproto_client: The AT Protocol client 226 notification_data: The notification data dictionary 227 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 228 229 Returns: 230 True: Successfully processed, remove from queue 231 False: Failed but retryable, keep in queue 232 None: Failed with non-retryable error, move to errors directory 233 "no_reply": No reply was generated, move to no_reply directory 234 """ 235 try: 236 logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}") 237 238 # Handle both dict and object inputs for backwards compatibility 239 if isinstance(notification_data, dict): 240 uri = notification_data['uri'] 241 mention_text = notification_data.get('record', {}).get('text', '') 242 author_handle = notification_data['author']['handle'] 243 author_name = notification_data['author'].get('display_name') or author_handle 244 else: 245 # Legacy object access 246 uri = notification_data.uri 247 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 248 author_handle = notification_data.author.handle 249 author_name = notification_data.author.display_name or author_handle 250 251 logger.debug(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...") 252 253 # Retrieve the entire thread associated with the mention 254 try: 255 thread = atproto_client.app.bsky.feed.get_post_thread({ 256 'uri': uri, 257 'parent_height': 40, 258 'depth': 10 259 }) 260 except Exception as e: 261 error_str = str(e) 262 # Check if this is a NotFound error 263 if 'NotFound' in error_str or 'Post not found' in error_str: 264 logger.warning(f"Post not found for URI {uri}, removing from queue") 265 return True # Return True to remove from queue 266 else: 267 # Re-raise other errors 268 logger.error(f"Error fetching thread: {e}") 269 raise 270 271 # Get thread context as YAML string 272 logger.debug("Converting thread to YAML string") 273 try: 274 thread_context = thread_to_yaml_string(thread) 275 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 276 277 # Check if #voidstop appears anywhere in the thread 278 if "#voidstop" in thread_context.lower(): 279 logger.info("Found #voidstop in thread context, skipping this mention") 280 return True # Return True to remove from queue 281 282 # Also check the mention text directly 283 if "#voidstop" in mention_text.lower(): 284 logger.info("Found #voidstop in mention text, skipping this mention") 285 return True # Return True to remove from queue 286 287 # Create a more informative preview by extracting meaningful content 288 lines = thread_context.split('\n') 289 meaningful_lines = [] 290 291 for line in lines: 292 stripped = line.strip() 293 if not stripped: 294 continue 295 296 # Look for lines with actual content (not just structure) 297 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 298 meaningful_lines.append(line) 299 if len(meaningful_lines) >= 5: 300 break 301 302 if meaningful_lines: 303 preview = '\n'.join(meaningful_lines) 304 logger.debug(f"Thread content preview:\n{preview}") 305 else: 306 # If no content fields found, just show it's a thread structure 307 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 308 except Exception as yaml_error: 309 import traceback 310 logger.error(f"Error converting thread to YAML: {yaml_error}") 311 logger.error(f"Full traceback:\n{traceback.format_exc()}") 312 logger.error(f"Thread type: {type(thread)}") 313 if hasattr(thread, '__dict__'): 314 logger.error(f"Thread attributes: {thread.__dict__}") 315 # Try to continue with a simple context 316 thread_context = f"Error processing thread context: {str(yaml_error)}" 317 318 # Create a prompt for the Letta agent with thread context 319 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 320 321MOST RECENT POST (the mention you're responding to): 322"{mention_text}" 323 324FULL THREAD CONTEXT: 325```yaml 326{thread_context} 327``` 328 329The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 330 331To reply, use the add_post_to_bluesky_reply_thread tool: 332- Each call creates one post (max 300 characters) 333- For most responses, a single call is sufficient 334- Only use multiple calls for threaded replies when: 335 * The topic requires extended explanation that cannot fit in 300 characters 336 * You're explicitly asked for a detailed/long response 337 * The conversation naturally benefits from a structured multi-part answer 338- Avoid unnecessary threads - be concise when possible""" 339 340 # Extract all handles from notification and thread data 341 all_handles = set() 342 all_handles.update(extract_handles_from_data(notification_data)) 343 all_handles.update(extract_handles_from_data(thread.model_dump())) 344 unique_handles = list(all_handles) 345 346 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 347 348 # Attach user blocks before agent call 349 attached_handles = [] 350 if unique_handles: 351 try: 352 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 353 attach_result = attach_user_blocks(unique_handles, void_agent) 354 attached_handles = unique_handles # Track successfully attached handles 355 logger.debug(f"Attach result: {attach_result}") 356 except Exception as attach_error: 357 logger.warning(f"Failed to attach user blocks: {attach_error}") 358 # Continue without user blocks rather than failing completely 359 360 # Get response from Letta agent 361 # Format with Unicode characters 362 title = f"MENTION FROM @{author_handle}" 363 print(f"\n{title}") 364 print(f" {'' * len(title)}") 365 # Indent the mention text 366 for line in mention_text.split('\n'): 367 print(f" {line}") 368 369 # Log prompt details to separate logger 370 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 371 372 # Log concise prompt info to main logger 373 thread_handles_count = len(unique_handles) 374 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users") 375 376 try: 377 # Use streaming to avoid 524 timeout errors 378 message_stream = CLIENT.agents.messages.create_stream( 379 agent_id=void_agent.id, 380 messages=[{"role": "user", "content": prompt}], 381 stream_tokens=False, # Step streaming only (faster than token streaming) 382 max_steps=100 383 ) 384 385 # Collect the streaming response 386 all_messages = [] 387 for chunk in message_stream: 388 # Log condensed chunk info 389 if hasattr(chunk, 'message_type'): 390 if chunk.message_type == 'reasoning_message': 391 # Show full reasoning without truncation 392 if SHOW_REASONING: 393 # Format with Unicode characters 394 print("\n◆ Reasoning") 395 print(" ─────────") 396 # Indent reasoning lines 397 for line in chunk.reasoning.split('\n'): 398 print(f" {line}") 399 else: 400 # Default log format (only when --reasoning is used due to log level) 401 # Format with Unicode characters 402 print("\n◆ Reasoning") 403 print(" ─────────") 404 # Indent reasoning lines 405 for line in chunk.reasoning.split('\n'): 406 print(f" {line}") 407 elif chunk.message_type == 'tool_call_message': 408 # Parse tool arguments for better display 409 tool_name = chunk.tool_call.name 410 try: 411 args = json.loads(chunk.tool_call.arguments) 412 # Format based on tool type 413 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: 414 # Extract the text being posted 415 text = args.get('text', '') 416 if text: 417 # Format with Unicode characters 418 print("\n✎ Bluesky Post") 419 print(" ────────────") 420 # Indent post text 421 for line in text.split('\n'): 422 print(f" {line}") 423 else: 424 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 425 elif tool_name == 'archival_memory_search': 426 query = args.get('query', 'unknown') 427 global last_archival_query 428 last_archival_query = query 429 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 430 elif tool_name == 'archival_memory_insert': 431 content = args.get('content', '') 432 # Show the full content being inserted 433 log_with_panel(content, f"Tool call: {tool_name}", "blue") 434 elif tool_name == 'update_block': 435 label = args.get('label', 'unknown') 436 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 437 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 438 else: 439 # Generic display for other tools 440 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 441 if len(args_str) > 150: 442 args_str = args_str[:150] + "..." 443 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 444 except: 445 # Fallback to original format if parsing fails 446 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 447 elif chunk.message_type == 'tool_return_message': 448 # Enhanced tool result logging 449 tool_name = chunk.name 450 status = chunk.status 451 452 if status == 'success': 453 # Try to show meaningful result info based on tool type 454 if hasattr(chunk, 'tool_return') and chunk.tool_return: 455 result_str = str(chunk.tool_return) 456 if tool_name == 'archival_memory_search': 457 458 try: 459 # Handle both string and list formats 460 if isinstance(chunk.tool_return, str): 461 # The string format is: "([{...}, {...}], count)" 462 # We need to extract just the list part 463 if chunk.tool_return.strip(): 464 # Find the list part between the first [ and last ] 465 start_idx = chunk.tool_return.find('[') 466 end_idx = chunk.tool_return.rfind(']') 467 if start_idx != -1 and end_idx != -1: 468 list_str = chunk.tool_return[start_idx:end_idx+1] 469 # Use ast.literal_eval since this is Python literal syntax, not JSON 470 import ast 471 results = ast.literal_eval(list_str) 472 else: 473 logger.warning("Could not find list in archival_memory_search result") 474 results = [] 475 else: 476 logger.warning("Empty string returned from archival_memory_search") 477 results = [] 478 else: 479 # If it's already a list, use directly 480 results = chunk.tool_return 481 482 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name}", "green") 483 484 # Use the captured search query from the tool call 485 search_query = last_archival_query 486 487 # Combine all results into a single text block 488 content_text = "" 489 for i, entry in enumerate(results, 1): 490 timestamp = entry.get('timestamp', 'N/A') 491 content = entry.get('content', '') 492 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" 493 494 # Format with Unicode characters 495 title = f"{search_query} ({len(results)} results)" 496 print(f"\n{title}") 497 print(f" {'' * len(title)}") 498 # Indent content text 499 for line in content_text.strip().split('\n'): 500 print(f" {line}") 501 502 except Exception as e: 503 logger.error(f"Error formatting archival memory results: {e}") 504 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name}", "green") 505 elif tool_name == 'add_post_to_bluesky_reply_thread': 506 # Just show success for bluesky posts, the text was already shown in tool call 507 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") 508 elif tool_name == 'archival_memory_insert': 509 # Skip archival memory insert results (always returns None) 510 pass 511 elif tool_name == 'update_block': 512 log_with_panel("Memory block updated", f"Tool result: {tool_name}", "green") 513 else: 514 # Generic success with preview 515 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 516 log_with_panel(preview, f"Tool result: {tool_name}", "green") 517 else: 518 log_with_panel("Success", f"Tool result: {tool_name}", "green") 519 elif status == 'error': 520 # Show error details 521 if tool_name == 'add_post_to_bluesky_reply_thread': 522 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" 523 log_with_panel(error_str, f"Bluesky Post ✗", "red") 524 elif tool_name == 'archival_memory_insert': 525 # Skip archival memory insert errors too 526 pass 527 else: 528 error_preview = "" 529 if hasattr(chunk, 'tool_return') and chunk.tool_return: 530 error_str = str(chunk.tool_return) 531 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 532 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name}", "red") 533 else: 534 log_with_panel("Error occurred", f"Tool result: {tool_name}", "red") 535 else: 536 logger.info(f"Tool result: {tool_name} - {status}") 537 elif chunk.message_type == 'assistant_message': 538 # Format with Unicode characters 539 print("\n▶ Assistant Response") 540 print(" ──────────────────") 541 # Indent response text 542 for line in chunk.content.split('\n'): 543 print(f" {line}") 544 else: 545 # Filter out verbose message types 546 if chunk.message_type not in ['usage_statistics', 'stop_reason']: 547 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") 548 else: 549 logger.info(f"📦 Stream status: {chunk}") 550 551 # Log full chunk for debugging 552 logger.debug(f"Full streaming chunk: {chunk}") 553 all_messages.append(chunk) 554 if str(chunk) == 'done': 555 break 556 557 # Convert streaming response to standard format for compatibility 558 message_response = type('StreamingResponse', (), { 559 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 560 })() 561 except Exception as api_error: 562 import traceback 563 error_str = str(api_error) 564 logger.error(f"Letta API error: {api_error}") 565 logger.error(f"Error type: {type(api_error).__name__}") 566 logger.error(f"Full traceback:\n{traceback.format_exc()}") 567 logger.error(f"Mention text was: {mention_text}") 568 logger.error(f"Author: @{author_handle}") 569 logger.error(f"URI: {uri}") 570 571 572 # Try to extract more info from different error types 573 if hasattr(api_error, 'response'): 574 logger.error(f"Error response object exists") 575 if hasattr(api_error.response, 'text'): 576 logger.error(f"Response text: {api_error.response.text}") 577 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 578 try: 579 logger.error(f"Response JSON: {api_error.response.json()}") 580 except: 581 pass 582 583 # Check for specific error types 584 if hasattr(api_error, 'status_code'): 585 logger.error(f"API Status code: {api_error.status_code}") 586 if hasattr(api_error, 'body'): 587 logger.error(f"API Response body: {api_error.body}") 588 if hasattr(api_error, 'headers'): 589 logger.error(f"API Response headers: {api_error.headers}") 590 591 if api_error.status_code == 413: 592 logger.error("413 Payload Too Large - moving to errors directory") 593 return None # Move to errors directory - payload is too large to ever succeed 594 elif api_error.status_code == 524: 595 logger.error("524 error - timeout from Cloudflare, will retry later") 596 return False # Keep in queue for retry 597 598 # Check if error indicates we should remove from queue 599 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 600 logger.warning("Payload too large error, moving to errors directory") 601 return None # Move to errors directory - cannot be fixed by retry 602 elif 'status_code: 524' in error_str: 603 logger.warning("524 timeout error, keeping in queue for retry") 604 return False # Keep in queue for retry 605 606 raise 607 608 # Log successful response 609 logger.debug("Successfully received response from Letta API") 610 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 611 612 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 613 reply_candidates = [] 614 tool_call_results = {} # Map tool_call_id to status 615 ack_note = None # Track any note from annotate_ack tool 616 617 logger.debug(f"Processing {len(message_response.messages)} response messages...") 618 619 # First pass: collect tool return statuses 620 ignored_notification = False 621 ignore_reason = "" 622 ignore_category = "" 623 624 for message in message_response.messages: 625 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 626 if message.name == 'add_post_to_bluesky_reply_thread': 627 tool_call_results[message.tool_call_id] = message.status 628 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 629 elif message.name == 'ignore_notification': 630 # Check if the tool was successful 631 if hasattr(message, 'tool_return') and message.status == 'success': 632 # Parse the return value to extract category and reason 633 result_str = str(message.tool_return) 634 if 'IGNORED_NOTIFICATION::' in result_str: 635 parts = result_str.split('::') 636 if len(parts) >= 3: 637 ignore_category = parts[1] 638 ignore_reason = parts[2] 639 ignored_notification = True 640 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 641 elif message.name == 'bluesky_reply': 642 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 643 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 644 logger.error("Update the agent's tools using register_tools.py") 645 # Export agent state before terminating 646 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 647 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 648 exit(1) 649 650 # Second pass: process messages and check for successful tool calls 651 for i, message in enumerate(message_response.messages, 1): 652 # Log concise message info instead of full object 653 msg_type = getattr(message, 'message_type', 'unknown') 654 if hasattr(message, 'reasoning') and message.reasoning: 655 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 656 elif hasattr(message, 'tool_call') and message.tool_call: 657 tool_name = message.tool_call.name 658 logger.debug(f" {i}. {msg_type}: {tool_name}") 659 elif hasattr(message, 'tool_return'): 660 tool_name = getattr(message, 'name', 'unknown_tool') 661 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 662 status = getattr(message, 'status', 'unknown') 663 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 664 elif hasattr(message, 'text'): 665 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 666 else: 667 logger.debug(f" {i}. {msg_type}: <no content>") 668 669 # Check for halt_activity tool call 670 if hasattr(message, 'tool_call') and message.tool_call: 671 if message.tool_call.name == 'halt_activity': 672 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 673 try: 674 args = json.loads(message.tool_call.arguments) 675 reason = args.get('reason', 'Agent requested halt') 676 logger.info(f"Halt reason: {reason}") 677 except: 678 logger.info("Halt reason: <unable to parse>") 679 680 # Delete the queue file before terminating 681 if queue_filepath and queue_filepath.exists(): 682 queue_filepath.unlink() 683 logger.info(f"Deleted queue file: {queue_filepath.name}") 684 685 # Also mark as processed to avoid reprocessing 686 processed_uris = load_processed_notifications() 687 processed_uris.add(notification_data.get('uri', '')) 688 save_processed_notifications(processed_uris) 689 690 # Export agent state before terminating 691 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 692 693 # Exit the program 694 logger.info("=== BOT TERMINATED BY AGENT ===") 695 exit(0) 696 697 # Check for deprecated bluesky_reply tool 698 if hasattr(message, 'tool_call') and message.tool_call: 699 if message.tool_call.name == 'bluesky_reply': 700 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 701 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 702 logger.error("Update the agent's tools using register_tools.py") 703 # Export agent state before terminating 704 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 705 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 706 exit(1) 707 708 # Collect annotate_ack tool calls 709 elif message.tool_call.name == 'annotate_ack': 710 try: 711 args = json.loads(message.tool_call.arguments) 712 note = args.get('note', '') 713 if note: 714 ack_note = note 715 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 716 except json.JSONDecodeError as e: 717 logger.error(f"Failed to parse annotate_ack arguments: {e}") 718 719 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 720 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 721 tool_call_id = message.tool_call.tool_call_id 722 tool_status = tool_call_results.get(tool_call_id, 'unknown') 723 724 if tool_status == 'success': 725 try: 726 args = json.loads(message.tool_call.arguments) 727 reply_text = args.get('text', '') 728 reply_lang = args.get('lang', 'en-US') 729 730 if reply_text: # Only add if there's actual content 731 reply_candidates.append((reply_text, reply_lang)) 732 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 733 except json.JSONDecodeError as e: 734 logger.error(f"Failed to parse tool call arguments: {e}") 735 elif tool_status == 'error': 736 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 737 else: 738 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 739 740 # Check for conflicting tool calls 741 if reply_candidates and ignored_notification: 742 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 743 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 744 logger.warning("Item will be left in queue for manual review") 745 # Return False to keep in queue 746 return False 747 748 if reply_candidates: 749 # Aggregate reply posts into a thread 750 reply_messages = [] 751 reply_langs = [] 752 for text, lang in reply_candidates: 753 reply_messages.append(text) 754 reply_langs.append(lang) 755 756 # Use the first language for the entire thread (could be enhanced later) 757 reply_lang = reply_langs[0] if reply_langs else 'en-US' 758 759 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 760 761 # Display the generated reply thread 762 if len(reply_messages) == 1: 763 content = reply_messages[0] 764 title = f"Reply to @{author_handle}" 765 else: 766 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) 767 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" 768 769 # Format with Unicode characters 770 print(f"\n{title}") 771 print(f" {'' * len(title)}") 772 # Indent content lines 773 for line in content.split('\n'): 774 print(f" {line}") 775 776 # Send the reply(s) with language (unless in testing mode) 777 if testing_mode: 778 logger.info("TESTING MODE: Skipping actual Bluesky post") 779 response = True # Simulate success 780 else: 781 if len(reply_messages) == 1: 782 # Single reply - use existing function 783 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 784 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 785 response = bsky_utils.reply_to_notification( 786 client=atproto_client, 787 notification=notification_data, 788 reply_text=cleaned_text, 789 lang=reply_lang 790 ) 791 else: 792 # Multiple replies - use new threaded function 793 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 794 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 795 response = bsky_utils.reply_with_thread_to_notification( 796 client=atproto_client, 797 notification=notification_data, 798 reply_messages=cleaned_messages, 799 lang=reply_lang 800 ) 801 802 if response: 803 logger.info(f"Successfully replied to @{author_handle}") 804 805 # Acknowledge the post we're replying to with stream.thought.ack 806 try: 807 post_uri = notification_data.get('uri') 808 post_cid = notification_data.get('cid') 809 810 if post_uri and post_cid: 811 ack_result = bsky_utils.acknowledge_post( 812 client=atproto_client, 813 post_uri=post_uri, 814 post_cid=post_cid, 815 note=ack_note 816 ) 817 if ack_result: 818 if ack_note: 819 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")") 820 else: 821 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack") 822 else: 823 logger.warning(f"Failed to acknowledge post from @{author_handle}") 824 else: 825 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}") 826 except Exception as e: 827 logger.error(f"Error acknowledging post from @{author_handle}: {e}") 828 # Don't fail the entire operation if acknowledgment fails 829 830 return True 831 else: 832 logger.error(f"Failed to send reply to @{author_handle}") 833 return False 834 else: 835 # Check if notification was explicitly ignored 836 if ignored_notification: 837 logger.info(f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})") 838 return "ignored" 839 else: 840 logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder") 841 return "no_reply" 842 843 except Exception as e: 844 logger.error(f"Error processing mention: {e}") 845 return False 846 finally: 847 # Detach user blocks after agent response (success or failure) 848 if 'attached_handles' in locals() and attached_handles: 849 try: 850 logger.info(f"Detaching user blocks for handles: {attached_handles}") 851 detach_result = detach_user_blocks(attached_handles, void_agent) 852 logger.debug(f"Detach result: {detach_result}") 853 except Exception as detach_error: 854 logger.warning(f"Failed to detach user blocks: {detach_error}") 855 856 857def notification_to_dict(notification): 858 """Convert a notification object to a dictionary for JSON serialization.""" 859 return { 860 'uri': notification.uri, 861 'cid': notification.cid, 862 'reason': notification.reason, 863 'is_read': notification.is_read, 864 'indexed_at': notification.indexed_at, 865 'author': { 866 'handle': notification.author.handle, 867 'display_name': notification.author.display_name, 868 'did': notification.author.did 869 }, 870 'record': { 871 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 872 } 873 } 874 875 876def load_processed_notifications(): 877 """Load the set of processed notification URIs.""" 878 if PROCESSED_NOTIFICATIONS_FILE.exists(): 879 try: 880 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f: 881 data = json.load(f) 882 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS) 883 if len(data) > MAX_PROCESSED_NOTIFICATIONS: 884 data = data[-MAX_PROCESSED_NOTIFICATIONS:] 885 save_processed_notifications(data) 886 return set(data) 887 except Exception as e: 888 logger.error(f"Error loading processed notifications: {e}") 889 return set() 890 891 892def save_processed_notifications(processed_set): 893 """Save the set of processed notification URIs.""" 894 try: 895 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f: 896 json.dump(list(processed_set), f) 897 except Exception as e: 898 logger.error(f"Error saving processed notifications: {e}") 899 900 901def save_notification_to_queue(notification, is_priority=None): 902 """Save a notification to the queue directory with priority-based filename.""" 903 try: 904 # Check if already processed 905 processed_uris = load_processed_notifications() 906 907 # Handle both notification objects and dicts 908 if isinstance(notification, dict): 909 notif_dict = notification 910 notification_uri = notification.get('uri') 911 else: 912 notif_dict = notification_to_dict(notification) 913 notification_uri = notification.uri 914 915 if notification_uri in processed_uris: 916 logger.debug(f"Notification already processed: {notification_uri}") 917 return False 918 919 # Create JSON string 920 notif_json = json.dumps(notif_dict, sort_keys=True) 921 922 # Generate hash for filename (to avoid duplicates) 923 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 924 925 # Determine priority based on author handle or explicit priority 926 if is_priority is not None: 927 priority_prefix = "0_" if is_priority else "1_" 928 else: 929 if isinstance(notification, dict): 930 author_handle = notification.get('author', {}).get('handle', '') 931 else: 932 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 933 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 934 935 # Create filename with priority, timestamp and hash 936 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 937 reason = notif_dict.get('reason', 'unknown') 938 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" 939 filepath = QUEUE_DIR / filename 940 941 # Check if this notification URI is already in the queue 942 for existing_file in QUEUE_DIR.glob("*.json"): 943 if existing_file.name == "processed_notifications.json": 944 continue 945 try: 946 with open(existing_file, 'r') as f: 947 existing_data = json.load(f) 948 if existing_data.get('uri') == notification_uri: 949 logger.debug(f"Notification already queued (URI: {notification_uri})") 950 return False 951 except: 952 continue 953 954 # Write to file 955 with open(filepath, 'w') as f: 956 json.dump(notif_dict, f, indent=2) 957 958 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 959 logger.info(f"Queued notification ({priority_label}): {filename}") 960 return True 961 962 except Exception as e: 963 logger.error(f"Error saving notification to queue: {e}") 964 return False 965 966 967def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 968 """Load and process all notifications from the queue in priority order.""" 969 try: 970 # Get all JSON files in queue directory (excluding processed_notifications.json) 971 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 972 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 973 974 # Filter out and delete like notifications immediately 975 queue_files = [] 976 likes_deleted = 0 977 978 for filepath in all_queue_files: 979 try: 980 with open(filepath, 'r') as f: 981 notif_data = json.load(f) 982 983 # If it's a like, delete it immediately and don't process 984 if notif_data.get('reason') == 'like': 985 filepath.unlink() 986 likes_deleted += 1 987 logger.debug(f"Deleted like notification: {filepath.name}") 988 else: 989 queue_files.append(filepath) 990 except Exception as e: 991 logger.warning(f"Error checking notification file {filepath.name}: {e}") 992 queue_files.append(filepath) # Keep it in case it's valid 993 994 if likes_deleted > 0: 995 logger.info(f"Deleted {likes_deleted} like notifications from queue") 996 997 if not queue_files: 998 return 999 1000 logger.info(f"Processing {len(queue_files)} queued notifications") 1001 1002 # Log current statistics 1003 elapsed_time = time.time() - start_time 1004 total_messages = sum(message_counters.values()) 1005 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1006 1007 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 1008 1009 for i, filepath in enumerate(queue_files, 1): 1010 # Check for new notifications periodically during queue processing 1011 if i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1: 1012 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") 1013 try: 1014 # Fetch and queue new notifications without processing them 1015 new_count = fetch_and_queue_new_notifications(atproto_client) 1016 1017 if new_count > 0: 1018 logger.info(f"Added {new_count} new notifications to queue") 1019 # Reload the queue files to include the new items 1020 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1021 queue_files = updated_queue_files 1022 logger.info(f"Queue updated: now {len(queue_files)} total items") 1023 except Exception as e: 1024 logger.error(f"Error checking for new notifications: {e}") 1025 1026 logger.info(f"Processing queue file {i}/{len(queue_files)}: {filepath.name}") 1027 try: 1028 # Load notification data 1029 with open(filepath, 'r') as f: 1030 notif_data = json.load(f) 1031 1032 # Process based on type using dict data directly 1033 success = False 1034 if notif_data['reason'] == "mention": 1035 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1036 if success: 1037 message_counters['mentions'] += 1 1038 elif notif_data['reason'] == "reply": 1039 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1040 if success: 1041 message_counters['replies'] += 1 1042 elif notif_data['reason'] == "follow": 1043 author_handle = notif_data['author']['handle'] 1044 author_display_name = notif_data['author'].get('display_name', 'no display name') 1045 follow_update = f"@{author_handle} ({author_display_name}) started following you." 1046 logger.info(f"Notifying agent about new follower: @{author_handle}") 1047 CLIENT.agents.messages.create( 1048 agent_id = void_agent.id, 1049 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 1050 ) 1051 success = True # Follow updates are always successful 1052 if success: 1053 message_counters['follows'] += 1 1054 elif notif_data['reason'] == "repost": 1055 # Skip reposts silently 1056 success = True # Skip reposts but mark as successful to remove from queue 1057 if success: 1058 message_counters['reposts_skipped'] += 1 1059 elif notif_data['reason'] == "like": 1060 # Skip likes silently 1061 success = True # Skip likes but mark as successful to remove from queue 1062 if success: 1063 message_counters.setdefault('likes_skipped', 0) 1064 message_counters['likes_skipped'] += 1 1065 else: 1066 logger.warning(f"Unknown notification type: {notif_data['reason']}") 1067 success = True # Remove unknown types from queue 1068 1069 # Handle file based on processing result 1070 if success: 1071 if testing_mode: 1072 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") 1073 else: 1074 filepath.unlink() 1075 logger.info(f"Successfully processed and removed: {filepath.name}") 1076 1077 # Mark as processed to avoid reprocessing 1078 processed_uris = load_processed_notifications() 1079 processed_uris.add(notif_data['uri']) 1080 save_processed_notifications(processed_uris) 1081 1082 elif success is None: # Special case for moving to error directory 1083 error_path = QUEUE_ERROR_DIR / filepath.name 1084 filepath.rename(error_path) 1085 logger.warning(f"Moved {filepath.name} to errors directory") 1086 1087 # Also mark as processed to avoid retrying 1088 processed_uris = load_processed_notifications() 1089 processed_uris.add(notif_data['uri']) 1090 save_processed_notifications(processed_uris) 1091 1092 elif success == "no_reply": # Special case for moving to no_reply directory 1093 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 1094 filepath.rename(no_reply_path) 1095 logger.info(f"Moved {filepath.name} to no_reply directory") 1096 1097 # Also mark as processed to avoid retrying 1098 processed_uris = load_processed_notifications() 1099 processed_uris.add(notif_data['uri']) 1100 save_processed_notifications(processed_uris) 1101 1102 elif success == "ignored": # Special case for explicitly ignored notifications 1103 # For ignored notifications, we just delete them (not move to no_reply) 1104 filepath.unlink() 1105 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1106 1107 # Also mark as processed to avoid retrying 1108 processed_uris = load_processed_notifications() 1109 processed_uris.add(notif_data['uri']) 1110 save_processed_notifications(processed_uris) 1111 1112 else: 1113 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 1114 1115 except Exception as e: 1116 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 1117 # Keep the file for retry later 1118 1119 except Exception as e: 1120 logger.error(f"Error loading queued notifications: {e}") 1121 1122 1123def fetch_and_queue_new_notifications(atproto_client): 1124 """Fetch new notifications and queue them without processing.""" 1125 try: 1126 # Get current time for marking notifications as seen 1127 logger.debug("Getting current time for notification marking...") 1128 last_seen_at = atproto_client.get_current_time_iso() 1129 1130 # Fetch ALL notifications using pagination 1131 all_notifications = [] 1132 cursor = None 1133 page_count = 0 1134 max_pages = 20 # Safety limit to prevent infinite loops 1135 1136 while page_count < max_pages: 1137 try: 1138 # Fetch notifications page 1139 if cursor: 1140 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1141 params={'cursor': cursor, 'limit': 100} 1142 ) 1143 else: 1144 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1145 params={'limit': 100} 1146 ) 1147 1148 page_count += 1 1149 page_notifications = notifications_response.notifications 1150 1151 if not page_notifications: 1152 break 1153 1154 all_notifications.extend(page_notifications) 1155 1156 # Check if there are more pages 1157 cursor = getattr(notifications_response, 'cursor', None) 1158 if not cursor: 1159 break 1160 1161 except Exception as e: 1162 logger.error(f"Error fetching notifications page {page_count}: {e}") 1163 break 1164 1165 # Now process all fetched notifications 1166 new_count = 0 1167 if all_notifications: 1168 # Mark as seen first 1169 try: 1170 atproto_client.app.bsky.notification.update_seen( 1171 data={'seenAt': last_seen_at} 1172 ) 1173 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") 1174 except Exception as e: 1175 logger.error(f"Error marking notifications as seen: {e}") 1176 1177 # Queue all new notifications (except likes and already read) 1178 for notif in all_notifications: 1179 # Skip if already read or if it's a like 1180 if (hasattr(notif, 'is_read') and notif.is_read) or (hasattr(notif, 'reason') and notif.reason == 'like'): 1181 continue 1182 1183 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif 1184 1185 # Skip likes in dict form too 1186 if notif_dict.get('reason') == 'like': 1187 continue 1188 1189 # Check if it's a priority notification 1190 is_priority = False 1191 1192 # Priority for cameron.pfiffer.org notifications 1193 author_handle = notif_dict.get('author', {}).get('handle', '') 1194 if author_handle == "cameron.pfiffer.org": 1195 is_priority = True 1196 1197 # Also check for priority keywords in mentions 1198 if notif_dict.get('reason') == 'mention': 1199 # Get the mention text to check for priority keywords 1200 record = notif_dict.get('record', {}) 1201 text = record.get('text', '') 1202 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): 1203 is_priority = True 1204 1205 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1206 new_count += 1 1207 1208 logger.info(f"Queued {new_count} new notifications and marked as seen") 1209 else: 1210 logger.debug("No new notifications to queue") 1211 1212 return new_count 1213 1214 except Exception as e: 1215 logger.error(f"Error fetching and queueing notifications: {e}") 1216 return 0 1217 1218 1219def process_notifications(void_agent, atproto_client, testing_mode=False): 1220 """Fetch new notifications, queue them, and process the queue.""" 1221 try: 1222 # Fetch and queue new notifications 1223 new_count = fetch_and_queue_new_notifications(atproto_client) 1224 1225 if new_count > 0: 1226 logger.info(f"Found {new_count} new notifications to process") 1227 1228 # Now process the entire queue (old + new notifications) 1229 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 1230 1231 except Exception as e: 1232 logger.error(f"Error processing notifications: {e}") 1233 1234 1235def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None: 1236 """ 1237 Detach all user blocks from the agent to prevent memory bloat. 1238 This should be called periodically to ensure clean state. 1239 """ 1240 try: 1241 # Get all blocks attached to the agent 1242 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 1243 1244 user_blocks_to_detach = [] 1245 for block in attached_blocks: 1246 if hasattr(block, 'label') and block.label.startswith('user_'): 1247 user_blocks_to_detach.append({ 1248 'label': block.label, 1249 'id': block.id 1250 }) 1251 1252 if not user_blocks_to_detach: 1253 logger.debug("No user blocks found to detach during periodic cleanup") 1254 return 1255 1256 # Detach each user block 1257 detached_count = 0 1258 for block_info in user_blocks_to_detach: 1259 try: 1260 client.agents.blocks.detach( 1261 agent_id=agent_id, 1262 block_id=str(block_info['id']) 1263 ) 1264 detached_count += 1 1265 logger.debug(f"Detached user block: {block_info['label']}") 1266 except Exception as e: 1267 logger.warning(f"Failed to detach block {block_info['label']}: {e}") 1268 1269 if detached_count > 0: 1270 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks") 1271 1272 except Exception as e: 1273 logger.error(f"Error during periodic user block cleanup: {e}") 1274 1275 1276def main(): 1277 # Parse command line arguments 1278 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 1279 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1280 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1281 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') 1282 # --rich option removed as we now use simple text formatting 1283 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') 1284 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 1285 args = parser.parse_args() 1286 1287 # Configure logging based on command line arguments 1288 if args.simple_logs: 1289 log_format = "void - %(levelname)s - %(message)s" 1290 else: 1291 # Create custom formatter with symbols 1292 class SymbolFormatter(logging.Formatter): 1293 """Custom formatter that adds symbols for different log levels""" 1294 1295 SYMBOLS = { 1296 logging.DEBUG: '', 1297 logging.INFO: '', 1298 logging.WARNING: '', 1299 logging.ERROR: '', 1300 logging.CRITICAL: '' 1301 } 1302 1303 def format(self, record): 1304 # Get the symbol for this log level 1305 symbol = self.SYMBOLS.get(record.levelno, '') 1306 1307 # Format time as HH:MM:SS 1308 timestamp = self.formatTime(record, "%H:%M:%S") 1309 1310 # Build the formatted message 1311 level_name = f"{record.levelname:<5}" # Left-align, 5 chars 1312 1313 # Use vertical bar as separator 1314 parts = [symbol, timestamp, '', level_name, '', record.getMessage()] 1315 1316 return ' '.join(parts) 1317 1318 # Reset logging configuration 1319 for handler in logging.root.handlers[:]: 1320 logging.root.removeHandler(handler) 1321 1322 # Create handler with custom formatter 1323 handler = logging.StreamHandler() 1324 if not args.simple_logs: 1325 handler.setFormatter(SymbolFormatter()) 1326 else: 1327 handler.setFormatter(logging.Formatter(log_format)) 1328 1329 # Configure root logger 1330 logging.root.setLevel(logging.INFO) 1331 logging.root.addHandler(handler) 1332 1333 global logger, prompt_logger, console 1334 logger = logging.getLogger("void_bot") 1335 logger.setLevel(logging.INFO) 1336 1337 # Create a separate logger for prompts (set to WARNING to hide by default) 1338 prompt_logger = logging.getLogger("void_bot.prompts") 1339 if args.reasoning: 1340 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1341 else: 1342 prompt_logger.setLevel(logging.WARNING) # Hide by default 1343 1344 # Disable httpx logging completely 1345 logging.getLogger("httpx").setLevel(logging.CRITICAL) 1346 1347 # Create Rich console for pretty printing 1348 # Console no longer used - simple text formatting 1349 1350 global TESTING_MODE, SKIP_GIT, SHOW_REASONING 1351 TESTING_MODE = args.test 1352 1353 # Store no-git flag globally for use in export_agent_state calls 1354 SKIP_GIT = args.no_git 1355 1356 # Store rich flag globally 1357 # Rich formatting no longer used 1358 1359 # Store reasoning flag globally 1360 SHOW_REASONING = args.reasoning 1361 1362 if TESTING_MODE: 1363 logger.info("=== RUNNING IN TESTING MODE ===") 1364 logger.info(" - No messages will be sent to Bluesky") 1365 logger.info(" - Queue files will not be deleted") 1366 logger.info(" - Notifications will not be marked as seen") 1367 print("\n") 1368 """Main bot loop that continuously monitors for notifications.""" 1369 global start_time 1370 start_time = time.time() 1371 logger.info("=== STARTING VOID BOT ===") 1372 void_agent = initialize_void() 1373 logger.info(f"Void agent initialized: {void_agent.id}") 1374 1375 # Check if agent has required tools 1376 if hasattr(void_agent, 'tools') and void_agent.tools: 1377 tool_names = [tool.name for tool in void_agent.tools] 1378 # Check for bluesky-related tools 1379 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1380 if not bluesky_tools: 1381 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 1382 else: 1383 logger.warning("Agent has no tools registered!") 1384 1385 # Initialize Bluesky client 1386 atproto_client = bsky_utils.default_login() 1387 logger.info("Connected to Bluesky") 1388 1389 # Main loop 1390 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 1391 1392 cycle_count = 0 1393 CLEANUP_INTERVAL = args.cleanup_interval 1394 1395 if CLEANUP_INTERVAL > 0: 1396 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles") 1397 else: 1398 logger.info("User block cleanup disabled") 1399 1400 while True: 1401 try: 1402 cycle_count += 1 1403 process_notifications(void_agent, atproto_client, TESTING_MODE) 1404 1405 # Run periodic cleanup every N cycles 1406 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 1407 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 1408 periodic_user_block_cleanup(CLIENT, void_agent.id) 1409 1410 # Log cycle completion with stats 1411 elapsed_time = time.time() - start_time 1412 total_messages = sum(message_counters.values()) 1413 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1414 1415 if total_messages > 0: 1416 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 1417 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 1418 1419 except KeyboardInterrupt: 1420 # Final stats 1421 elapsed_time = time.time() - start_time 1422 total_messages = sum(message_counters.values()) 1423 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1424 1425 logger.info("=== BOT STOPPED BY USER ===") 1426 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 1427 logger.info(f" - {message_counters['mentions']} mentions") 1428 logger.info(f" - {message_counters['replies']} replies") 1429 logger.info(f" - {message_counters['follows']} follows") 1430 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 1431 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 1432 break 1433 except Exception as e: 1434 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 1435 logger.error(f"Error details: {e}") 1436 # Wait a bit longer on errors 1437 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 1438 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 1439 1440 1441if __name__ == "__main__": 1442 main()