a digital person for bluesky
1# Rich imports removed - using simple text formatting 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14import argparse 15 16from utils import ( 17 upsert_block, 18 upsert_agent 19) 20 21import bsky_utils 22from tools.blocks import attach_user_blocks, detach_user_blocks 23from datetime import date 24 25def extract_handles_from_data(data): 26 """Recursively extract all unique handles from nested data structure.""" 27 handles = set() 28 29 def _extract_recursive(obj): 30 if isinstance(obj, dict): 31 # Check if this dict has a 'handle' key 32 if 'handle' in obj: 33 handles.add(obj['handle']) 34 # Recursively check all values 35 for value in obj.values(): 36 _extract_recursive(value) 37 elif isinstance(obj, list): 38 # Recursively check all list items 39 for item in obj: 40 _extract_recursive(item) 41 42 _extract_recursive(data) 43 return list(handles) 44 45# Logging will be configured after argument parsing 46logger = None 47prompt_logger = None 48# Simple text formatting (Rich no longer used) 49SHOW_REASONING = False 50last_archival_query = "archival memory search" 51 52def log_with_panel(message, title=None, border_color="white"): 53 """Log a message with Unicode box-drawing characters""" 54 if title: 55 # Map old color names to appropriate symbols 56 symbol_map = { 57 "blue": "", # Tool calls 58 "green": "", # Success/completion 59 "yellow": "", # Reasoning 60 "red": "", # Errors 61 "white": "", # Default/mentions 62 "cyan": "", # Posts 63 } 64 symbol = symbol_map.get(border_color, "") 65 66 print(f"\n{symbol} {title}") 67 print(f" {'' * len(title)}") 68 # Indent message lines 69 for line in message.split('\n'): 70 print(f" {line}") 71 else: 72 print(message) 73 74 75# Create a client with extended timeout for LLM operations 76CLIENT= Letta( 77 token=os.environ["LETTA_API_KEY"], 78 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout 79) 80 81# Use the "Bluesky" project 82PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 83 84# Notification check delay 85FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response 86 87# Check for new notifications every N queue items 88CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing 89 90# Queue directory 91QUEUE_DIR = Path("queue") 92QUEUE_DIR.mkdir(exist_ok=True) 93QUEUE_ERROR_DIR = Path("queue/errors") 94QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 95QUEUE_NO_REPLY_DIR = Path("queue/no_reply") 96QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 97PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json") 98 99# Maximum number of processed notifications to track 100MAX_PROCESSED_NOTIFICATIONS = 10000 101 102# Message tracking counters 103message_counters = defaultdict(int) 104start_time = time.time() 105 106# Testing mode flag 107TESTING_MODE = False 108 109# Skip git operations flag 110SKIP_GIT = False 111 112# Synthesis message tracking 113last_synthesis_time = time.time() 114 115def export_agent_state(client, agent, skip_git=False): 116 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 117 try: 118 # Confirm export with user unless git is being skipped 119 if not skip_git: 120 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 121 if response not in ['y', 'yes']: 122 logger.info("Agent export cancelled by user.") 123 return 124 else: 125 logger.info("Exporting agent state (git staging disabled)") 126 127 # Create directories if they don't exist 128 os.makedirs("agent_archive", exist_ok=True) 129 os.makedirs("agents", exist_ok=True) 130 131 # Export agent data 132 logger.info(f"Exporting agent {agent.id}. This takes some time...") 133 agent_data = client.agents.export_file(agent_id=agent.id) 134 135 # Save timestamped archive copy 136 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 137 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 138 with open(archive_file, 'w', encoding='utf-8') as f: 139 json.dump(agent_data, f, indent=2, ensure_ascii=False) 140 141 # Save current agent state 142 current_file = os.path.join("agents", "void.af") 143 with open(current_file, 'w', encoding='utf-8') as f: 144 json.dump(agent_data, f, indent=2, ensure_ascii=False) 145 146 logger.info(f"Agent exported to {archive_file} and {current_file}") 147 148 # Git add only the current agent file (archive is ignored) unless skip_git is True 149 if not skip_git: 150 try: 151 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 152 logger.info("Added current agent file to git staging") 153 except subprocess.CalledProcessError as e: 154 logger.warning(f"Failed to git add agent file: {e}") 155 156 except Exception as e: 157 logger.error(f"Failed to export agent: {e}") 158 159def initialize_void(): 160 logger.info("Starting void agent initialization...") 161 162 # Get the configured void agent by ID 163 logger.info("Loading void agent from config...") 164 from config_loader import get_letta_config 165 letta_config = get_letta_config() 166 agent_id = letta_config['agent_id'] 167 168 try: 169 void_agent = CLIENT.agents.retrieve(agent_id=agent_id) 170 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})") 171 except Exception as e: 172 logger.error(f"Failed to load void agent {agent_id}: {e}") 173 logger.error("Please ensure the agent_id in config.yaml is correct") 174 raise e 175 176 # Export agent state 177 logger.info("Exporting agent state...") 178 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 179 180 # Log agent details 181 logger.info(f"Void agent details - ID: {void_agent.id}") 182 logger.info(f"Agent name: {void_agent.name}") 183 if hasattr(void_agent, 'llm_config'): 184 logger.info(f"Agent model: {void_agent.llm_config.model}") 185 logger.info(f"Agent project_id: {void_agent.project_id}") 186 if hasattr(void_agent, 'tools'): 187 logger.info(f"Agent has {len(void_agent.tools)} tools") 188 for tool in void_agent.tools[:3]: # Show first 3 tools 189 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 190 191 return void_agent 192 193 194def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 195 """Process a mention and generate a reply using the Letta agent. 196 197 Args: 198 void_agent: The Letta agent instance 199 atproto_client: The AT Protocol client 200 notification_data: The notification data dictionary 201 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 202 203 Returns: 204 True: Successfully processed, remove from queue 205 False: Failed but retryable, keep in queue 206 None: Failed with non-retryable error, move to errors directory 207 "no_reply": No reply was generated, move to no_reply directory 208 """ 209 try: 210 logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}") 211 212 # Handle both dict and object inputs for backwards compatibility 213 if isinstance(notification_data, dict): 214 uri = notification_data['uri'] 215 mention_text = notification_data.get('record', {}).get('text', '') 216 author_handle = notification_data['author']['handle'] 217 author_name = notification_data['author'].get('display_name') or author_handle 218 else: 219 # Legacy object access 220 uri = notification_data.uri 221 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 222 author_handle = notification_data.author.handle 223 author_name = notification_data.author.display_name or author_handle 224 225 logger.debug(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...") 226 227 # Retrieve the entire thread associated with the mention 228 try: 229 thread = atproto_client.app.bsky.feed.get_post_thread({ 230 'uri': uri, 231 'parent_height': 40, 232 'depth': 10 233 }) 234 except Exception as e: 235 error_str = str(e) 236 # Check if this is a NotFound error 237 if 'NotFound' in error_str or 'Post not found' in error_str: 238 logger.warning(f"Post not found for URI {uri}, removing from queue") 239 return True # Return True to remove from queue 240 else: 241 # Re-raise other errors 242 logger.error(f"Error fetching thread: {e}") 243 raise 244 245 # Get thread context as YAML string 246 logger.debug("Converting thread to YAML string") 247 try: 248 thread_context = thread_to_yaml_string(thread) 249 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 250 251 # Check if #voidstop appears anywhere in the thread 252 if "#voidstop" in thread_context.lower(): 253 logger.info("Found #voidstop in thread context, skipping this mention") 254 return True # Return True to remove from queue 255 256 # Also check the mention text directly 257 if "#voidstop" in mention_text.lower(): 258 logger.info("Found #voidstop in mention text, skipping this mention") 259 return True # Return True to remove from queue 260 261 # Create a more informative preview by extracting meaningful content 262 lines = thread_context.split('\n') 263 meaningful_lines = [] 264 265 for line in lines: 266 stripped = line.strip() 267 if not stripped: 268 continue 269 270 # Look for lines with actual content (not just structure) 271 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 272 meaningful_lines.append(line) 273 if len(meaningful_lines) >= 5: 274 break 275 276 if meaningful_lines: 277 preview = '\n'.join(meaningful_lines) 278 logger.debug(f"Thread content preview:\n{preview}") 279 else: 280 # If no content fields found, just show it's a thread structure 281 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 282 except Exception as yaml_error: 283 import traceback 284 logger.error(f"Error converting thread to YAML: {yaml_error}") 285 logger.error(f"Full traceback:\n{traceback.format_exc()}") 286 logger.error(f"Thread type: {type(thread)}") 287 if hasattr(thread, '__dict__'): 288 logger.error(f"Thread attributes: {thread.__dict__}") 289 # Try to continue with a simple context 290 thread_context = f"Error processing thread context: {str(yaml_error)}" 291 292 # Create a prompt for the Letta agent with thread context 293 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 294 295MOST RECENT POST (the mention you're responding to): 296"{mention_text}" 297 298FULL THREAD CONTEXT: 299```yaml 300{thread_context} 301``` 302 303The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 304 305To reply, use the add_post_to_bluesky_reply_thread tool: 306- Each call creates one post (max 300 characters) 307- For most responses, a single call is sufficient 308- Only use multiple calls for threaded replies when: 309 * The topic requires extended explanation that cannot fit in 300 characters 310 * You're explicitly asked for a detailed/long response 311 * The conversation naturally benefits from a structured multi-part answer 312- Avoid unnecessary threads - be concise when possible""" 313 314 # Extract all handles from notification and thread data 315 all_handles = set() 316 all_handles.update(extract_handles_from_data(notification_data)) 317 all_handles.update(extract_handles_from_data(thread.model_dump())) 318 unique_handles = list(all_handles) 319 320 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 321 322 # Check if any handles are in known_bots list 323 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs 324 import json 325 326 try: 327 # Check for known bots in thread 328 bot_check_result = check_known_bots(unique_handles, void_agent) 329 bot_check_data = json.loads(bot_check_result) 330 331 if bot_check_data.get("bot_detected", False): 332 detected_bots = bot_check_data.get("detected_bots", []) 333 logger.info(f"Bot detected in thread: {detected_bots}") 334 335 # Decide whether to respond (10% chance) 336 if not should_respond_to_bot_thread(): 337 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}") 338 # Return False to keep in queue for potential later processing 339 return False 340 else: 341 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}") 342 else: 343 logger.debug("No known bots detected in thread") 344 345 except Exception as bot_check_error: 346 logger.warning(f"Error checking for bots: {bot_check_error}") 347 # Continue processing if bot check fails 348 349 # Attach user blocks before agent call 350 attached_handles = [] 351 if unique_handles: 352 try: 353 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 354 attach_result = attach_user_blocks(unique_handles, void_agent) 355 attached_handles = unique_handles # Track successfully attached handles 356 logger.debug(f"Attach result: {attach_result}") 357 except Exception as attach_error: 358 logger.warning(f"Failed to attach user blocks: {attach_error}") 359 # Continue without user blocks rather than failing completely 360 361 # Get response from Letta agent 362 # Format with Unicode characters 363 title = f"MENTION FROM @{author_handle}" 364 print(f"\n{title}") 365 print(f" {'' * len(title)}") 366 # Indent the mention text 367 for line in mention_text.split('\n'): 368 print(f" {line}") 369 370 # Log prompt details to separate logger 371 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 372 373 # Log concise prompt info to main logger 374 thread_handles_count = len(unique_handles) 375 prompt_char_count = len(prompt) 376 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars") 377 378 try: 379 # Use streaming to avoid 524 timeout errors 380 message_stream = CLIENT.agents.messages.create_stream( 381 agent_id=void_agent.id, 382 messages=[{"role": "user", "content": prompt}], 383 stream_tokens=False, # Step streaming only (faster than token streaming) 384 max_steps=100 385 ) 386 387 # Collect the streaming response 388 all_messages = [] 389 for chunk in message_stream: 390 # Log condensed chunk info 391 if hasattr(chunk, 'message_type'): 392 if chunk.message_type == 'reasoning_message': 393 # Show full reasoning without truncation 394 if SHOW_REASONING: 395 # Format with Unicode characters 396 print("\n◆ Reasoning") 397 print(" ─────────") 398 # Indent reasoning lines 399 for line in chunk.reasoning.split('\n'): 400 print(f" {line}") 401 else: 402 # Default log format (only when --reasoning is used due to log level) 403 # Format with Unicode characters 404 print("\n◆ Reasoning") 405 print(" ─────────") 406 # Indent reasoning lines 407 for line in chunk.reasoning.split('\n'): 408 print(f" {line}") 409 410 # Create ATProto record for reasoning (unless in testing mode) 411 if not testing_mode and hasattr(chunk, 'reasoning'): 412 try: 413 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 414 except Exception as e: 415 logger.debug(f"Failed to create reasoning record: {e}") 416 elif chunk.message_type == 'tool_call_message': 417 # Parse tool arguments for better display 418 tool_name = chunk.tool_call.name 419 420 # Create ATProto record for tool call (unless in testing mode) 421 if not testing_mode: 422 try: 423 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 424 bsky_utils.create_tool_call_record( 425 atproto_client, 426 tool_name, 427 chunk.tool_call.arguments, 428 tool_call_id 429 ) 430 except Exception as e: 431 logger.debug(f"Failed to create tool call record: {e}") 432 433 try: 434 args = json.loads(chunk.tool_call.arguments) 435 # Format based on tool type 436 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: 437 # Extract the text being posted 438 text = args.get('text', '') 439 if text: 440 # Format with Unicode characters 441 print("\n✎ Bluesky Post") 442 print(" ────────────") 443 # Indent post text 444 for line in text.split('\n'): 445 print(f" {line}") 446 else: 447 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 448 elif tool_name == 'archival_memory_search': 449 query = args.get('query', 'unknown') 450 global last_archival_query 451 last_archival_query = query 452 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 453 elif tool_name == 'archival_memory_insert': 454 content = args.get('content', '') 455 # Show the full content being inserted 456 log_with_panel(content, f"Tool call: {tool_name}", "blue") 457 elif tool_name == 'update_block': 458 label = args.get('label', 'unknown') 459 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 460 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 461 else: 462 # Generic display for other tools 463 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 464 if len(args_str) > 150: 465 args_str = args_str[:150] + "..." 466 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 467 except: 468 # Fallback to original format if parsing fails 469 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 470 elif chunk.message_type == 'tool_return_message': 471 # Enhanced tool result logging 472 tool_name = chunk.name 473 status = chunk.status 474 475 if status == 'success': 476 # Try to show meaningful result info based on tool type 477 if hasattr(chunk, 'tool_return') and chunk.tool_return: 478 result_str = str(chunk.tool_return) 479 if tool_name == 'archival_memory_search': 480 481 try: 482 # Handle both string and list formats 483 if isinstance(chunk.tool_return, str): 484 # The string format is: "([{...}, {...}], count)" 485 # We need to extract just the list part 486 if chunk.tool_return.strip(): 487 # Find the list part between the first [ and last ] 488 start_idx = chunk.tool_return.find('[') 489 end_idx = chunk.tool_return.rfind(']') 490 if start_idx != -1 and end_idx != -1: 491 list_str = chunk.tool_return[start_idx:end_idx+1] 492 # Use ast.literal_eval since this is Python literal syntax, not JSON 493 import ast 494 results = ast.literal_eval(list_str) 495 else: 496 logger.warning("Could not find list in archival_memory_search result") 497 results = [] 498 else: 499 logger.warning("Empty string returned from archival_memory_search") 500 results = [] 501 else: 502 # If it's already a list, use directly 503 results = chunk.tool_return 504 505 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name}", "green") 506 507 # Use the captured search query from the tool call 508 search_query = last_archival_query 509 510 # Combine all results into a single text block 511 content_text = "" 512 for i, entry in enumerate(results, 1): 513 timestamp = entry.get('timestamp', 'N/A') 514 content = entry.get('content', '') 515 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" 516 517 # Format with Unicode characters 518 title = f"{search_query} ({len(results)} results)" 519 print(f"\n{title}") 520 print(f" {'' * len(title)}") 521 # Indent content text 522 for line in content_text.strip().split('\n'): 523 print(f" {line}") 524 525 except Exception as e: 526 logger.error(f"Error formatting archival memory results: {e}") 527 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name}", "green") 528 elif tool_name == 'add_post_to_bluesky_reply_thread': 529 # Just show success for bluesky posts, the text was already shown in tool call 530 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") 531 elif tool_name == 'archival_memory_insert': 532 # Skip archival memory insert results (always returns None) 533 pass 534 elif tool_name == 'update_block': 535 log_with_panel("Memory block updated", f"Tool result: {tool_name}", "green") 536 else: 537 # Generic success with preview 538 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 539 log_with_panel(preview, f"Tool result: {tool_name}", "green") 540 else: 541 log_with_panel("Success", f"Tool result: {tool_name}", "green") 542 elif status == 'error': 543 # Show error details 544 if tool_name == 'add_post_to_bluesky_reply_thread': 545 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" 546 log_with_panel(error_str, f"Bluesky Post ✗", "red") 547 elif tool_name == 'archival_memory_insert': 548 # Skip archival memory insert errors too 549 pass 550 else: 551 error_preview = "" 552 if hasattr(chunk, 'tool_return') and chunk.tool_return: 553 error_str = str(chunk.tool_return) 554 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 555 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name}", "red") 556 else: 557 log_with_panel("Error occurred", f"Tool result: {tool_name}", "red") 558 else: 559 logger.info(f"Tool result: {tool_name} - {status}") 560 elif chunk.message_type == 'assistant_message': 561 # Format with Unicode characters 562 print("\n▶ Assistant Response") 563 print(" ──────────────────") 564 # Indent response text 565 for line in chunk.content.split('\n'): 566 print(f" {line}") 567 else: 568 # Filter out verbose message types 569 if chunk.message_type not in ['usage_statistics', 'stop_reason']: 570 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") 571 else: 572 logger.info(f"📦 Stream status: {chunk}") 573 574 # Log full chunk for debugging 575 logger.debug(f"Full streaming chunk: {chunk}") 576 all_messages.append(chunk) 577 if str(chunk) == 'done': 578 break 579 580 # Convert streaming response to standard format for compatibility 581 message_response = type('StreamingResponse', (), { 582 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 583 })() 584 except Exception as api_error: 585 import traceback 586 error_str = str(api_error) 587 logger.error(f"Letta API error: {api_error}") 588 logger.error(f"Error type: {type(api_error).__name__}") 589 logger.error(f"Full traceback:\n{traceback.format_exc()}") 590 logger.error(f"Mention text was: {mention_text}") 591 logger.error(f"Author: @{author_handle}") 592 logger.error(f"URI: {uri}") 593 594 595 # Try to extract more info from different error types 596 if hasattr(api_error, 'response'): 597 logger.error(f"Error response object exists") 598 if hasattr(api_error.response, 'text'): 599 logger.error(f"Response text: {api_error.response.text}") 600 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 601 try: 602 logger.error(f"Response JSON: {api_error.response.json()}") 603 except: 604 pass 605 606 # Check for specific error types 607 if hasattr(api_error, 'status_code'): 608 logger.error(f"API Status code: {api_error.status_code}") 609 if hasattr(api_error, 'body'): 610 logger.error(f"API Response body: {api_error.body}") 611 if hasattr(api_error, 'headers'): 612 logger.error(f"API Response headers: {api_error.headers}") 613 614 if api_error.status_code == 413: 615 logger.error("413 Payload Too Large - moving to errors directory") 616 return None # Move to errors directory - payload is too large to ever succeed 617 elif api_error.status_code == 524: 618 logger.error("524 error - timeout from Cloudflare, will retry later") 619 return False # Keep in queue for retry 620 621 # Check if error indicates we should remove from queue 622 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 623 logger.warning("Payload too large error, moving to errors directory") 624 return None # Move to errors directory - cannot be fixed by retry 625 elif 'status_code: 524' in error_str: 626 logger.warning("524 timeout error, keeping in queue for retry") 627 return False # Keep in queue for retry 628 629 raise 630 631 # Log successful response 632 logger.debug("Successfully received response from Letta API") 633 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 634 635 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 636 reply_candidates = [] 637 tool_call_results = {} # Map tool_call_id to status 638 ack_note = None # Track any note from annotate_ack tool 639 640 logger.debug(f"Processing {len(message_response.messages)} response messages...") 641 642 # First pass: collect tool return statuses 643 ignored_notification = False 644 ignore_reason = "" 645 ignore_category = "" 646 647 for message in message_response.messages: 648 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 649 if message.name == 'add_post_to_bluesky_reply_thread': 650 tool_call_results[message.tool_call_id] = message.status 651 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 652 elif message.name == 'ignore_notification': 653 # Check if the tool was successful 654 if hasattr(message, 'tool_return') and message.status == 'success': 655 # Parse the return value to extract category and reason 656 result_str = str(message.tool_return) 657 if 'IGNORED_NOTIFICATION::' in result_str: 658 parts = result_str.split('::') 659 if len(parts) >= 3: 660 ignore_category = parts[1] 661 ignore_reason = parts[2] 662 ignored_notification = True 663 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 664 elif message.name == 'bluesky_reply': 665 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 666 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 667 logger.error("Update the agent's tools using register_tools.py") 668 # Export agent state before terminating 669 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 670 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 671 exit(1) 672 673 # Second pass: process messages and check for successful tool calls 674 for i, message in enumerate(message_response.messages, 1): 675 # Log concise message info instead of full object 676 msg_type = getattr(message, 'message_type', 'unknown') 677 if hasattr(message, 'reasoning') and message.reasoning: 678 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 679 elif hasattr(message, 'tool_call') and message.tool_call: 680 tool_name = message.tool_call.name 681 logger.debug(f" {i}. {msg_type}: {tool_name}") 682 elif hasattr(message, 'tool_return'): 683 tool_name = getattr(message, 'name', 'unknown_tool') 684 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 685 status = getattr(message, 'status', 'unknown') 686 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 687 elif hasattr(message, 'text'): 688 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 689 else: 690 logger.debug(f" {i}. {msg_type}: <no content>") 691 692 # Check for halt_activity tool call 693 if hasattr(message, 'tool_call') and message.tool_call: 694 if message.tool_call.name == 'halt_activity': 695 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 696 try: 697 args = json.loads(message.tool_call.arguments) 698 reason = args.get('reason', 'Agent requested halt') 699 logger.info(f"Halt reason: {reason}") 700 except: 701 logger.info("Halt reason: <unable to parse>") 702 703 # Delete the queue file before terminating 704 if queue_filepath and queue_filepath.exists(): 705 queue_filepath.unlink() 706 logger.info(f"Deleted queue file: {queue_filepath.name}") 707 708 # Also mark as processed to avoid reprocessing 709 processed_uris = load_processed_notifications() 710 processed_uris.add(notification_data.get('uri', '')) 711 save_processed_notifications(processed_uris) 712 713 # Export agent state before terminating 714 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 715 716 # Exit the program 717 logger.info("=== BOT TERMINATED BY AGENT ===") 718 exit(0) 719 720 # Check for deprecated bluesky_reply tool 721 if hasattr(message, 'tool_call') and message.tool_call: 722 if message.tool_call.name == 'bluesky_reply': 723 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 724 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 725 logger.error("Update the agent's tools using register_tools.py") 726 # Export agent state before terminating 727 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 728 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 729 exit(1) 730 731 # Collect annotate_ack tool calls 732 elif message.tool_call.name == 'annotate_ack': 733 try: 734 args = json.loads(message.tool_call.arguments) 735 note = args.get('note', '') 736 if note: 737 ack_note = note 738 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 739 except json.JSONDecodeError as e: 740 logger.error(f"Failed to parse annotate_ack arguments: {e}") 741 742 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 743 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 744 tool_call_id = message.tool_call.tool_call_id 745 tool_status = tool_call_results.get(tool_call_id, 'unknown') 746 747 if tool_status == 'success': 748 try: 749 args = json.loads(message.tool_call.arguments) 750 reply_text = args.get('text', '') 751 reply_lang = args.get('lang', 'en-US') 752 753 if reply_text: # Only add if there's actual content 754 reply_candidates.append((reply_text, reply_lang)) 755 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 756 except json.JSONDecodeError as e: 757 logger.error(f"Failed to parse tool call arguments: {e}") 758 elif tool_status == 'error': 759 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 760 else: 761 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 762 763 # Check for conflicting tool calls 764 if reply_candidates and ignored_notification: 765 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 766 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 767 logger.warning("Item will be left in queue for manual review") 768 # Return False to keep in queue 769 return False 770 771 if reply_candidates: 772 # Aggregate reply posts into a thread 773 reply_messages = [] 774 reply_langs = [] 775 for text, lang in reply_candidates: 776 reply_messages.append(text) 777 reply_langs.append(lang) 778 779 # Use the first language for the entire thread (could be enhanced later) 780 reply_lang = reply_langs[0] if reply_langs else 'en-US' 781 782 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 783 784 # Display the generated reply thread 785 if len(reply_messages) == 1: 786 content = reply_messages[0] 787 title = f"Reply to @{author_handle}" 788 else: 789 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) 790 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" 791 792 # Format with Unicode characters 793 print(f"\n{title}") 794 print(f" {'' * len(title)}") 795 # Indent content lines 796 for line in content.split('\n'): 797 print(f" {line}") 798 799 # Send the reply(s) with language (unless in testing mode) 800 if testing_mode: 801 logger.info("TESTING MODE: Skipping actual Bluesky post") 802 response = True # Simulate success 803 else: 804 if len(reply_messages) == 1: 805 # Single reply - use existing function 806 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 807 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 808 response = bsky_utils.reply_to_notification( 809 client=atproto_client, 810 notification=notification_data, 811 reply_text=cleaned_text, 812 lang=reply_lang 813 ) 814 else: 815 # Multiple replies - use new threaded function 816 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 817 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 818 response = bsky_utils.reply_with_thread_to_notification( 819 client=atproto_client, 820 notification=notification_data, 821 reply_messages=cleaned_messages, 822 lang=reply_lang 823 ) 824 825 if response: 826 logger.info(f"Successfully replied to @{author_handle}") 827 828 # Acknowledge the post we're replying to with stream.thought.ack 829 try: 830 post_uri = notification_data.get('uri') 831 post_cid = notification_data.get('cid') 832 833 if post_uri and post_cid: 834 ack_result = bsky_utils.acknowledge_post( 835 client=atproto_client, 836 post_uri=post_uri, 837 post_cid=post_cid, 838 note=ack_note 839 ) 840 if ack_result: 841 if ack_note: 842 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")") 843 else: 844 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack") 845 else: 846 logger.warning(f"Failed to acknowledge post from @{author_handle}") 847 else: 848 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}") 849 except Exception as e: 850 logger.error(f"Error acknowledging post from @{author_handle}: {e}") 851 # Don't fail the entire operation if acknowledgment fails 852 853 return True 854 else: 855 logger.error(f"Failed to send reply to @{author_handle}") 856 return False 857 else: 858 # Check if notification was explicitly ignored 859 if ignored_notification: 860 logger.info(f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})") 861 return "ignored" 862 else: 863 logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder") 864 return "no_reply" 865 866 except Exception as e: 867 logger.error(f"Error processing mention: {e}") 868 return False 869 finally: 870 # Detach user blocks after agent response (success or failure) 871 if 'attached_handles' in locals() and attached_handles: 872 try: 873 logger.info(f"Detaching user blocks for handles: {attached_handles}") 874 detach_result = detach_user_blocks(attached_handles, void_agent) 875 logger.debug(f"Detach result: {detach_result}") 876 except Exception as detach_error: 877 logger.warning(f"Failed to detach user blocks: {detach_error}") 878 879 880def notification_to_dict(notification): 881 """Convert a notification object to a dictionary for JSON serialization.""" 882 return { 883 'uri': notification.uri, 884 'cid': notification.cid, 885 'reason': notification.reason, 886 'is_read': notification.is_read, 887 'indexed_at': notification.indexed_at, 888 'author': { 889 'handle': notification.author.handle, 890 'display_name': notification.author.display_name, 891 'did': notification.author.did 892 }, 893 'record': { 894 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 895 } 896 } 897 898 899def load_processed_notifications(): 900 """Load the set of processed notification URIs.""" 901 if PROCESSED_NOTIFICATIONS_FILE.exists(): 902 try: 903 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f: 904 data = json.load(f) 905 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS) 906 if len(data) > MAX_PROCESSED_NOTIFICATIONS: 907 data = data[-MAX_PROCESSED_NOTIFICATIONS:] 908 save_processed_notifications(data) 909 return set(data) 910 except Exception as e: 911 logger.error(f"Error loading processed notifications: {e}") 912 return set() 913 914 915def save_processed_notifications(processed_set): 916 """Save the set of processed notification URIs.""" 917 try: 918 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f: 919 json.dump(list(processed_set), f) 920 except Exception as e: 921 logger.error(f"Error saving processed notifications: {e}") 922 923 924def save_notification_to_queue(notification, is_priority=None): 925 """Save a notification to the queue directory with priority-based filename.""" 926 try: 927 # Check if already processed 928 processed_uris = load_processed_notifications() 929 930 # Handle both notification objects and dicts 931 if isinstance(notification, dict): 932 notif_dict = notification 933 notification_uri = notification.get('uri') 934 else: 935 notif_dict = notification_to_dict(notification) 936 notification_uri = notification.uri 937 938 if notification_uri in processed_uris: 939 logger.debug(f"Notification already processed: {notification_uri}") 940 return False 941 942 # Create JSON string 943 notif_json = json.dumps(notif_dict, sort_keys=True) 944 945 # Generate hash for filename (to avoid duplicates) 946 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 947 948 # Determine priority based on author handle or explicit priority 949 if is_priority is not None: 950 priority_prefix = "0_" if is_priority else "1_" 951 else: 952 if isinstance(notification, dict): 953 author_handle = notification.get('author', {}).get('handle', '') 954 else: 955 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 956 # Prioritize cameron.pfiffer.org responses 957 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 958 959 # Create filename with priority, timestamp and hash 960 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 961 reason = notif_dict.get('reason', 'unknown') 962 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" 963 filepath = QUEUE_DIR / filename 964 965 # Check if this notification URI is already in the queue 966 for existing_file in QUEUE_DIR.glob("*.json"): 967 if existing_file.name == "processed_notifications.json": 968 continue 969 try: 970 with open(existing_file, 'r') as f: 971 existing_data = json.load(f) 972 if existing_data.get('uri') == notification_uri: 973 logger.debug(f"Notification already queued (URI: {notification_uri})") 974 return False 975 except: 976 continue 977 978 # Write to file 979 with open(filepath, 'w') as f: 980 json.dump(notif_dict, f, indent=2) 981 982 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 983 logger.info(f"Queued notification ({priority_label}): {filename}") 984 return True 985 986 except Exception as e: 987 logger.error(f"Error saving notification to queue: {e}") 988 return False 989 990 991def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 992 """Load and process all notifications from the queue in priority order.""" 993 try: 994 # Get all JSON files in queue directory (excluding processed_notifications.json) 995 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 996 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 997 998 # Filter out and delete like notifications immediately 999 queue_files = [] 1000 likes_deleted = 0 1001 1002 for filepath in all_queue_files: 1003 try: 1004 with open(filepath, 'r') as f: 1005 notif_data = json.load(f) 1006 1007 # If it's a like, delete it immediately and don't process 1008 if notif_data.get('reason') == 'like': 1009 filepath.unlink() 1010 likes_deleted += 1 1011 logger.debug(f"Deleted like notification: {filepath.name}") 1012 else: 1013 queue_files.append(filepath) 1014 except Exception as e: 1015 logger.warning(f"Error checking notification file {filepath.name}: {e}") 1016 queue_files.append(filepath) # Keep it in case it's valid 1017 1018 if likes_deleted > 0: 1019 logger.info(f"Deleted {likes_deleted} like notifications from queue") 1020 1021 if not queue_files: 1022 return 1023 1024 logger.info(f"Processing {len(queue_files)} queued notifications") 1025 1026 # Log current statistics 1027 elapsed_time = time.time() - start_time 1028 total_messages = sum(message_counters.values()) 1029 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1030 1031 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 1032 1033 for i, filepath in enumerate(queue_files, 1): 1034 # Determine if this is a priority notification 1035 is_priority = filepath.name.startswith("0_") 1036 1037 # Check for new notifications periodically during queue processing 1038 # Also check immediately after processing each priority item 1039 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1) 1040 1041 # If we just processed a priority item, immediately check for new priority notifications 1042 if is_priority and i > 1: 1043 should_check_notifications = True 1044 1045 if should_check_notifications: 1046 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") 1047 try: 1048 # Fetch and queue new notifications without processing them 1049 new_count = fetch_and_queue_new_notifications(atproto_client) 1050 1051 if new_count > 0: 1052 logger.info(f"Added {new_count} new notifications to queue") 1053 # Reload the queue files to include the new items 1054 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1055 queue_files = updated_queue_files 1056 logger.info(f"Queue updated: now {len(queue_files)} total items") 1057 except Exception as e: 1058 logger.error(f"Error checking for new notifications: {e}") 1059 1060 priority_label = " [PRIORITY]" if is_priority else "" 1061 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}") 1062 try: 1063 # Load notification data 1064 with open(filepath, 'r') as f: 1065 notif_data = json.load(f) 1066 1067 # Process based on type using dict data directly 1068 success = False 1069 if notif_data['reason'] == "mention": 1070 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1071 if success: 1072 message_counters['mentions'] += 1 1073 elif notif_data['reason'] == "reply": 1074 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1075 if success: 1076 message_counters['replies'] += 1 1077 elif notif_data['reason'] == "follow": 1078 author_handle = notif_data['author']['handle'] 1079 author_display_name = notif_data['author'].get('display_name', 'no display name') 1080 follow_update = f"@{author_handle} ({author_display_name}) started following you." 1081 follow_message = f"Update: {follow_update}" 1082 logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars") 1083 CLIENT.agents.messages.create( 1084 agent_id = void_agent.id, 1085 messages = [{"role":"user", "content": follow_message}] 1086 ) 1087 success = True # Follow updates are always successful 1088 if success: 1089 message_counters['follows'] += 1 1090 elif notif_data['reason'] == "repost": 1091 # Skip reposts silently 1092 success = True # Skip reposts but mark as successful to remove from queue 1093 if success: 1094 message_counters['reposts_skipped'] += 1 1095 elif notif_data['reason'] == "like": 1096 # Skip likes silently 1097 success = True # Skip likes but mark as successful to remove from queue 1098 if success: 1099 message_counters.setdefault('likes_skipped', 0) 1100 message_counters['likes_skipped'] += 1 1101 else: 1102 logger.warning(f"Unknown notification type: {notif_data['reason']}") 1103 success = True # Remove unknown types from queue 1104 1105 # Handle file based on processing result 1106 if success: 1107 if testing_mode: 1108 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") 1109 else: 1110 filepath.unlink() 1111 logger.info(f"Successfully processed and removed: {filepath.name}") 1112 1113 # Mark as processed to avoid reprocessing 1114 processed_uris = load_processed_notifications() 1115 processed_uris.add(notif_data['uri']) 1116 save_processed_notifications(processed_uris) 1117 1118 elif success is None: # Special case for moving to error directory 1119 error_path = QUEUE_ERROR_DIR / filepath.name 1120 filepath.rename(error_path) 1121 logger.warning(f"Moved {filepath.name} to errors directory") 1122 1123 # Also mark as processed to avoid retrying 1124 processed_uris = load_processed_notifications() 1125 processed_uris.add(notif_data['uri']) 1126 save_processed_notifications(processed_uris) 1127 1128 elif success == "no_reply": # Special case for moving to no_reply directory 1129 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 1130 filepath.rename(no_reply_path) 1131 logger.info(f"Moved {filepath.name} to no_reply directory") 1132 1133 # Also mark as processed to avoid retrying 1134 processed_uris = load_processed_notifications() 1135 processed_uris.add(notif_data['uri']) 1136 save_processed_notifications(processed_uris) 1137 1138 elif success == "ignored": # Special case for explicitly ignored notifications 1139 # For ignored notifications, we just delete them (not move to no_reply) 1140 filepath.unlink() 1141 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1142 1143 # Also mark as processed to avoid retrying 1144 processed_uris = load_processed_notifications() 1145 processed_uris.add(notif_data['uri']) 1146 save_processed_notifications(processed_uris) 1147 1148 else: 1149 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 1150 1151 except Exception as e: 1152 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 1153 # Keep the file for retry later 1154 1155 except Exception as e: 1156 logger.error(f"Error loading queued notifications: {e}") 1157 1158 1159def fetch_and_queue_new_notifications(atproto_client): 1160 """Fetch new notifications and queue them without processing.""" 1161 try: 1162 # Get current time for marking notifications as seen 1163 logger.debug("Getting current time for notification marking...") 1164 last_seen_at = atproto_client.get_current_time_iso() 1165 1166 # Fetch ALL notifications using pagination 1167 all_notifications = [] 1168 cursor = None 1169 page_count = 0 1170 max_pages = 20 # Safety limit to prevent infinite loops 1171 1172 while page_count < max_pages: 1173 try: 1174 # Fetch notifications page 1175 if cursor: 1176 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1177 params={'cursor': cursor, 'limit': 100} 1178 ) 1179 else: 1180 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1181 params={'limit': 100} 1182 ) 1183 1184 page_count += 1 1185 page_notifications = notifications_response.notifications 1186 1187 if not page_notifications: 1188 break 1189 1190 all_notifications.extend(page_notifications) 1191 1192 # Check if there are more pages 1193 cursor = getattr(notifications_response, 'cursor', None) 1194 if not cursor: 1195 break 1196 1197 except Exception as e: 1198 logger.error(f"Error fetching notifications page {page_count}: {e}") 1199 break 1200 1201 # Now process all fetched notifications 1202 new_count = 0 1203 if all_notifications: 1204 # Mark as seen first 1205 try: 1206 atproto_client.app.bsky.notification.update_seen( 1207 data={'seenAt': last_seen_at} 1208 ) 1209 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") 1210 except Exception as e: 1211 logger.error(f"Error marking notifications as seen: {e}") 1212 1213 # Queue all new notifications (except likes and already read) 1214 for notif in all_notifications: 1215 # Skip if already read or if it's a like 1216 if (hasattr(notif, 'is_read') and notif.is_read) or (hasattr(notif, 'reason') and notif.reason == 'like'): 1217 continue 1218 1219 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif 1220 1221 # Skip likes in dict form too 1222 if notif_dict.get('reason') == 'like': 1223 continue 1224 1225 # Check if it's a priority notification 1226 is_priority = False 1227 1228 # Priority for cameron.pfiffer.org notifications 1229 author_handle = notif_dict.get('author', {}).get('handle', '') 1230 if author_handle == "cameron.pfiffer.org": 1231 is_priority = True 1232 1233 # Also check for priority keywords in mentions 1234 if notif_dict.get('reason') == 'mention': 1235 # Get the mention text to check for priority keywords 1236 record = notif_dict.get('record', {}) 1237 text = record.get('text', '') 1238 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): 1239 is_priority = True 1240 1241 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1242 new_count += 1 1243 1244 logger.info(f"Queued {new_count} new notifications and marked as seen") 1245 else: 1246 logger.debug("No new notifications to queue") 1247 1248 return new_count 1249 1250 except Exception as e: 1251 logger.error(f"Error fetching and queueing notifications: {e}") 1252 return 0 1253 1254 1255def process_notifications(void_agent, atproto_client, testing_mode=False): 1256 """Fetch new notifications, queue them, and process the queue.""" 1257 try: 1258 # Fetch and queue new notifications 1259 new_count = fetch_and_queue_new_notifications(atproto_client) 1260 1261 if new_count > 0: 1262 logger.info(f"Found {new_count} new notifications to process") 1263 1264 # Now process the entire queue (old + new notifications) 1265 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 1266 1267 except Exception as e: 1268 logger.error(f"Error processing notifications: {e}") 1269 1270 1271def send_synthesis_message(client: Letta, agent_id: str, atproto_client=None) -> None: 1272 """ 1273 Send a synthesis message to the agent every 10 minutes. 1274 This prompts the agent to synthesize its recent experiences. 1275 1276 Args: 1277 client: Letta client 1278 agent_id: Agent ID to send synthesis to 1279 atproto_client: Optional AT Protocol client for posting synthesis results 1280 """ 1281 # Track attached temporal blocks for cleanup 1282 attached_temporal_labels = [] 1283 1284 try: 1285 logger.info("🧠 Preparing synthesis with temporal journal blocks") 1286 1287 # Attach temporal blocks before synthesis 1288 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id) 1289 if not success: 1290 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway") 1291 1292 # Create enhanced synthesis prompt 1293 today = date.today() 1294 synthesis_prompt = f"""Time for synthesis and reflection. 1295 1296You have access to temporal journal blocks for recording your thoughts and experiences: 1297- void_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')}) 1298- void_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')}) 1299- void_year_{today.year}: This year's journal ({today.year}) 1300 1301These journal blocks are attached temporarily for this synthesis session. Use them to: 13021. Record significant interactions and insights from recent experiences 13032. Track patterns in conversations and user behaviors 13043. Note your evolving understanding of the digital social environment 13054. Reflect on your growth and changes in perspective 13065. Document memorable moments or interesting discoveries 1307 1308The journal entries should be cumulative - add to existing content rather than replacing it. 1309Consider both immediate experiences (daily) and longer-term patterns (monthly/yearly). 1310 1311After recording in your journals, synthesize your recent experiences into your core memory blocks 1312(zeitgeist, void-persona, void-humans) as you normally would. 1313 1314Begin your synthesis and journaling now.""" 1315 1316 logger.info("🧠 Sending enhanced synthesis prompt to agent") 1317 1318 # Send synthesis message with streaming to show tool use 1319 message_stream = client.agents.messages.create_stream( 1320 agent_id=agent_id, 1321 messages=[{"role": "user", "content": synthesis_prompt}], 1322 stream_tokens=False, 1323 max_steps=100 1324 ) 1325 1326 # Track synthesis content for potential posting 1327 synthesis_posts = [] 1328 ack_note = None 1329 1330 # Process the streaming response 1331 for chunk in message_stream: 1332 if hasattr(chunk, 'message_type'): 1333 if chunk.message_type == 'reasoning_message': 1334 if SHOW_REASONING: 1335 print("\n◆ Reasoning") 1336 print(" ─────────") 1337 for line in chunk.reasoning.split('\n'): 1338 print(f" {line}") 1339 1340 # Create ATProto record for reasoning (if we have atproto client) 1341 if atproto_client and hasattr(chunk, 'reasoning'): 1342 try: 1343 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 1344 except Exception as e: 1345 logger.debug(f"Failed to create reasoning record during synthesis: {e}") 1346 elif chunk.message_type == 'tool_call_message': 1347 tool_name = chunk.tool_call.name 1348 1349 # Create ATProto record for tool call (if we have atproto client) 1350 if atproto_client: 1351 try: 1352 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 1353 bsky_utils.create_tool_call_record( 1354 atproto_client, 1355 tool_name, 1356 chunk.tool_call.arguments, 1357 tool_call_id 1358 ) 1359 except Exception as e: 1360 logger.debug(f"Failed to create tool call record during synthesis: {e}") 1361 try: 1362 args = json.loads(chunk.tool_call.arguments) 1363 if tool_name == 'archival_memory_search': 1364 query = args.get('query', 'unknown') 1365 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 1366 elif tool_name == 'archival_memory_insert': 1367 content = args.get('content', '') 1368 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue") 1369 elif tool_name == 'update_block': 1370 label = args.get('label', 'unknown') 1371 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', '')) 1372 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 1373 elif tool_name == 'annotate_ack': 1374 note = args.get('note', '') 1375 if note: 1376 ack_note = note 1377 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue") 1378 elif tool_name == 'add_post_to_bluesky_reply_thread': 1379 text = args.get('text', '') 1380 synthesis_posts.append(text) 1381 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue") 1382 else: 1383 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 1384 if len(args_str) > 150: 1385 args_str = args_str[:150] + "..." 1386 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 1387 except: 1388 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 1389 elif chunk.message_type == 'tool_return_message': 1390 if chunk.status == 'success': 1391 log_with_panel("Success", f"Tool result: {chunk.name}", "green") 1392 else: 1393 log_with_panel("Error", f"Tool result: {chunk.name}", "red") 1394 elif chunk.message_type == 'assistant_message': 1395 print("\n▶ Synthesis Response") 1396 print(" ──────────────────") 1397 for line in chunk.content.split('\n'): 1398 print(f" {line}") 1399 1400 if str(chunk) == 'done': 1401 break 1402 1403 logger.info("🧠 Synthesis message processed successfully") 1404 1405 # Handle synthesis acknowledgments if we have an atproto client 1406 if atproto_client and ack_note: 1407 try: 1408 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note) 1409 if result: 1410 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...") 1411 else: 1412 logger.warning("Failed to create synthesis acknowledgment") 1413 except Exception as e: 1414 logger.error(f"Error creating synthesis acknowledgment: {e}") 1415 1416 # Handle synthesis posts if any were generated 1417 if atproto_client and synthesis_posts: 1418 try: 1419 for post_text in synthesis_posts: 1420 cleaned_text = bsky_utils.remove_outside_quotes(post_text) 1421 response = bsky_utils.send_post(atproto_client, cleaned_text) 1422 if response: 1423 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...") 1424 else: 1425 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...") 1426 except Exception as e: 1427 logger.error(f"Error posting synthesis content: {e}") 1428 1429 except Exception as e: 1430 logger.error(f"Error sending synthesis message: {e}") 1431 finally: 1432 # Always detach temporal blocks after synthesis 1433 if attached_temporal_labels: 1434 logger.info("🧠 Detaching temporal journal blocks after synthesis") 1435 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels) 1436 if not detach_success: 1437 logger.warning("Some temporal blocks may not have been detached properly") 1438 1439 1440def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None: 1441 """ 1442 Detach all user blocks from the agent to prevent memory bloat. 1443 This should be called periodically to ensure clean state. 1444 """ 1445 try: 1446 # Get all blocks attached to the agent 1447 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 1448 1449 user_blocks_to_detach = [] 1450 for block in attached_blocks: 1451 if hasattr(block, 'label') and block.label.startswith('user_'): 1452 user_blocks_to_detach.append({ 1453 'label': block.label, 1454 'id': block.id 1455 }) 1456 1457 if not user_blocks_to_detach: 1458 logger.debug("No user blocks found to detach during periodic cleanup") 1459 return 1460 1461 # Detach each user block 1462 detached_count = 0 1463 for block_info in user_blocks_to_detach: 1464 try: 1465 client.agents.blocks.detach( 1466 agent_id=agent_id, 1467 block_id=str(block_info['id']) 1468 ) 1469 detached_count += 1 1470 logger.debug(f"Detached user block: {block_info['label']}") 1471 except Exception as e: 1472 logger.warning(f"Failed to detach block {block_info['label']}: {e}") 1473 1474 if detached_count > 0: 1475 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks") 1476 1477 except Exception as e: 1478 logger.error(f"Error during periodic user block cleanup: {e}") 1479 1480 1481def attach_temporal_blocks(client: Letta, agent_id: str) -> tuple: 1482 """ 1483 Attach temporal journal blocks (day, month, year) to the agent for synthesis. 1484 Creates blocks if they don't exist. 1485 1486 Returns: 1487 Tuple of (success: bool, attached_labels: list) 1488 """ 1489 try: 1490 today = date.today() 1491 1492 # Generate temporal block labels 1493 day_label = f"void_day_{today.strftime('%Y_%m_%d')}" 1494 month_label = f"void_month_{today.strftime('%Y_%m')}" 1495 year_label = f"void_year_{today.year}" 1496 1497 temporal_labels = [day_label, month_label, year_label] 1498 attached_labels = [] 1499 1500 # Get current blocks attached to agent 1501 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1502 current_block_labels = {block.label for block in current_blocks} 1503 current_block_ids = {str(block.id) for block in current_blocks} 1504 1505 for label in temporal_labels: 1506 try: 1507 # Skip if already attached 1508 if label in current_block_labels: 1509 logger.debug(f"Temporal block already attached: {label}") 1510 attached_labels.append(label) 1511 continue 1512 1513 # Check if block exists globally 1514 blocks = client.blocks.list(label=label) 1515 1516 if blocks and len(blocks) > 0: 1517 block = blocks[0] 1518 # Check if already attached by ID 1519 if str(block.id) in current_block_ids: 1520 logger.debug(f"Temporal block already attached by ID: {label}") 1521 attached_labels.append(label) 1522 continue 1523 else: 1524 # Create new temporal block with appropriate header 1525 if "day" in label: 1526 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}" 1527 initial_content = f"{header}\n\nNo entries yet for today." 1528 elif "month" in label: 1529 header = f"# Monthly Journal - {today.strftime('%B %Y')}" 1530 initial_content = f"{header}\n\nNo entries yet for this month." 1531 else: # year 1532 header = f"# Yearly Journal - {today.year}" 1533 initial_content = f"{header}\n\nNo entries yet for this year." 1534 1535 block = client.blocks.create( 1536 label=label, 1537 value=initial_content, 1538 limit=10000 # Larger limit for journal blocks 1539 ) 1540 logger.info(f"Created new temporal block: {label}") 1541 1542 # Attach the block 1543 client.agents.blocks.attach( 1544 agent_id=agent_id, 1545 block_id=str(block.id) 1546 ) 1547 attached_labels.append(label) 1548 logger.info(f"Attached temporal block: {label}") 1549 1550 except Exception as e: 1551 # Check for duplicate constraint errors 1552 error_str = str(e) 1553 if "duplicate key value violates unique constraint" in error_str: 1554 logger.debug(f"Temporal block already attached (constraint): {label}") 1555 attached_labels.append(label) 1556 else: 1557 logger.warning(f"Failed to attach temporal block {label}: {e}") 1558 1559 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}") 1560 return True, attached_labels 1561 1562 except Exception as e: 1563 logger.error(f"Error attaching temporal blocks: {e}") 1564 return False, [] 1565 1566 1567def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: list = None) -> bool: 1568 """ 1569 Detach temporal journal blocks from the agent after synthesis. 1570 1571 Args: 1572 client: Letta client 1573 agent_id: Agent ID 1574 labels_to_detach: Optional list of specific labels to detach. 1575 If None, detaches all temporal blocks. 1576 1577 Returns: 1578 bool: Success status 1579 """ 1580 try: 1581 # If no specific labels provided, generate today's labels 1582 if labels_to_detach is None: 1583 today = date.today() 1584 labels_to_detach = [ 1585 f"void_day_{today.strftime('%Y_%m_%d')}", 1586 f"void_month_{today.strftime('%Y_%m')}", 1587 f"void_year_{today.year}" 1588 ] 1589 1590 # Get current blocks and build label to ID mapping 1591 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1592 block_label_to_id = {block.label: str(block.id) for block in current_blocks} 1593 1594 detached_count = 0 1595 for label in labels_to_detach: 1596 if label in block_label_to_id: 1597 try: 1598 client.agents.blocks.detach( 1599 agent_id=agent_id, 1600 block_id=block_label_to_id[label] 1601 ) 1602 detached_count += 1 1603 logger.debug(f"Detached temporal block: {label}") 1604 except Exception as e: 1605 logger.warning(f"Failed to detach temporal block {label}: {e}") 1606 else: 1607 logger.debug(f"Temporal block not attached: {label}") 1608 1609 logger.info(f"Detached {detached_count} temporal blocks") 1610 return True 1611 1612 except Exception as e: 1613 logger.error(f"Error detaching temporal blocks: {e}") 1614 return False 1615 1616 1617def main(): 1618 # Parse command line arguments 1619 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 1620 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1621 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1622 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') 1623 # --rich option removed as we now use simple text formatting 1624 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') 1625 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 1626 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') 1627 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') 1628 args = parser.parse_args() 1629 1630 # Configure logging based on command line arguments 1631 if args.simple_logs: 1632 log_format = "void - %(levelname)s - %(message)s" 1633 else: 1634 # Create custom formatter with symbols 1635 class SymbolFormatter(logging.Formatter): 1636 """Custom formatter that adds symbols for different log levels""" 1637 1638 SYMBOLS = { 1639 logging.DEBUG: '', 1640 logging.INFO: '', 1641 logging.WARNING: '', 1642 logging.ERROR: '', 1643 logging.CRITICAL: '' 1644 } 1645 1646 def format(self, record): 1647 # Get the symbol for this log level 1648 symbol = self.SYMBOLS.get(record.levelno, '') 1649 1650 # Format time as HH:MM:SS 1651 timestamp = self.formatTime(record, "%H:%M:%S") 1652 1653 # Build the formatted message 1654 level_name = f"{record.levelname:<5}" # Left-align, 5 chars 1655 1656 # Use vertical bar as separator 1657 parts = [symbol, timestamp, '', level_name, '', record.getMessage()] 1658 1659 return ' '.join(parts) 1660 1661 # Reset logging configuration 1662 for handler in logging.root.handlers[:]: 1663 logging.root.removeHandler(handler) 1664 1665 # Create handler with custom formatter 1666 handler = logging.StreamHandler() 1667 if not args.simple_logs: 1668 handler.setFormatter(SymbolFormatter()) 1669 else: 1670 handler.setFormatter(logging.Formatter(log_format)) 1671 1672 # Configure root logger 1673 logging.root.setLevel(logging.INFO) 1674 logging.root.addHandler(handler) 1675 1676 global logger, prompt_logger, console 1677 logger = logging.getLogger("void_bot") 1678 logger.setLevel(logging.INFO) 1679 1680 # Create a separate logger for prompts (set to WARNING to hide by default) 1681 prompt_logger = logging.getLogger("void_bot.prompts") 1682 if args.reasoning: 1683 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1684 else: 1685 prompt_logger.setLevel(logging.WARNING) # Hide by default 1686 1687 # Disable httpx logging completely 1688 logging.getLogger("httpx").setLevel(logging.CRITICAL) 1689 1690 # Create Rich console for pretty printing 1691 # Console no longer used - simple text formatting 1692 1693 global TESTING_MODE, SKIP_GIT, SHOW_REASONING 1694 TESTING_MODE = args.test 1695 1696 # Store no-git flag globally for use in export_agent_state calls 1697 SKIP_GIT = args.no_git 1698 1699 # Store rich flag globally 1700 # Rich formatting no longer used 1701 1702 # Store reasoning flag globally 1703 SHOW_REASONING = args.reasoning 1704 1705 if TESTING_MODE: 1706 logger.info("=== RUNNING IN TESTING MODE ===") 1707 logger.info(" - No messages will be sent to Bluesky") 1708 logger.info(" - Queue files will not be deleted") 1709 logger.info(" - Notifications will not be marked as seen") 1710 print("\n") 1711 1712 # Check for synthesis-only mode 1713 SYNTHESIS_ONLY = args.synthesis_only 1714 if SYNTHESIS_ONLY: 1715 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===") 1716 logger.info(" - Only synthesis messages will be sent") 1717 logger.info(" - No notification processing") 1718 logger.info(" - No Bluesky client needed") 1719 print("\n") 1720 """Main bot loop that continuously monitors for notifications.""" 1721 global start_time 1722 start_time = time.time() 1723 logger.info("=== STARTING VOID BOT ===") 1724 void_agent = initialize_void() 1725 logger.info(f"Void agent initialized: {void_agent.id}") 1726 1727 # Ensure correct tools are attached for Bluesky 1728 logger.info("Configuring tools for Bluesky platform...") 1729 try: 1730 from tool_manager import ensure_platform_tools 1731 ensure_platform_tools('bluesky', void_agent.id) 1732 except Exception as e: 1733 logger.error(f"Failed to configure platform tools: {e}") 1734 logger.warning("Continuing with existing tool configuration") 1735 1736 # Check if agent has required tools 1737 if hasattr(void_agent, 'tools') and void_agent.tools: 1738 tool_names = [tool.name for tool in void_agent.tools] 1739 # Check for bluesky-related tools 1740 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1741 if not bluesky_tools: 1742 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 1743 else: 1744 logger.warning("Agent has no tools registered!") 1745 1746 # Clean up all user blocks at startup 1747 logger.info("🧹 Cleaning up user blocks at startup...") 1748 periodic_user_block_cleanup(CLIENT, void_agent.id) 1749 1750 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts) 1751 if not SYNTHESIS_ONLY: 1752 atproto_client = bsky_utils.default_login() 1753 logger.info("Connected to Bluesky") 1754 else: 1755 # In synthesis-only mode, still connect for acks and posts (unless in test mode) 1756 if not args.test: 1757 atproto_client = bsky_utils.default_login() 1758 logger.info("Connected to Bluesky (for synthesis acks/posts)") 1759 else: 1760 atproto_client = None 1761 logger.info("Skipping Bluesky connection (test mode)") 1762 1763 # Configure intervals 1764 CLEANUP_INTERVAL = args.cleanup_interval 1765 SYNTHESIS_INTERVAL = args.synthesis_interval 1766 1767 # Synthesis-only mode 1768 if SYNTHESIS_ONLY: 1769 if SYNTHESIS_INTERVAL <= 0: 1770 logger.error("Synthesis-only mode requires --synthesis-interval > 0") 1771 return 1772 1773 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 1774 1775 while True: 1776 try: 1777 # Send synthesis message immediately on first run 1778 logger.info("🧠 Sending synthesis message") 1779 send_synthesis_message(CLIENT, void_agent.id, atproto_client) 1780 1781 # Wait for next interval 1782 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...") 1783 sleep(SYNTHESIS_INTERVAL) 1784 1785 except KeyboardInterrupt: 1786 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===") 1787 break 1788 except Exception as e: 1789 logger.error(f"Error in synthesis loop: {e}") 1790 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...") 1791 sleep(SYNTHESIS_INTERVAL) 1792 1793 # Normal mode with notification processing 1794 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 1795 1796 cycle_count = 0 1797 1798 if CLEANUP_INTERVAL > 0: 1799 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles") 1800 else: 1801 logger.info("User block cleanup disabled") 1802 1803 if SYNTHESIS_INTERVAL > 0: 1804 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 1805 else: 1806 logger.info("Synthesis messages disabled") 1807 1808 while True: 1809 try: 1810 cycle_count += 1 1811 process_notifications(void_agent, atproto_client, TESTING_MODE) 1812 1813 # Check if synthesis interval has passed 1814 if SYNTHESIS_INTERVAL > 0: 1815 current_time = time.time() 1816 global last_synthesis_time 1817 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL: 1818 logger.info(f"{SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis") 1819 send_synthesis_message(CLIENT, void_agent.id, atproto_client) 1820 last_synthesis_time = current_time 1821 1822 # Run periodic cleanup every N cycles 1823 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 1824 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 1825 periodic_user_block_cleanup(CLIENT, void_agent.id) 1826 1827 # Log cycle completion with stats 1828 elapsed_time = time.time() - start_time 1829 total_messages = sum(message_counters.values()) 1830 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1831 1832 if total_messages > 0: 1833 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 1834 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 1835 1836 except KeyboardInterrupt: 1837 # Final stats 1838 elapsed_time = time.time() - start_time 1839 total_messages = sum(message_counters.values()) 1840 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1841 1842 logger.info("=== BOT STOPPED BY USER ===") 1843 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 1844 logger.info(f" - {message_counters['mentions']} mentions") 1845 logger.info(f" - {message_counters['replies']} replies") 1846 logger.info(f" - {message_counters['follows']} follows") 1847 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 1848 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 1849 break 1850 except Exception as e: 1851 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 1852 logger.error(f"Error details: {e}") 1853 # Wait a bit longer on errors 1854 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 1855 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 1856 1857 1858if __name__ == "__main__": 1859 main()