a digital person for bluesky
42
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 8370a9ed49d97c2497b24a6973590e3a9f3143c4 1845 lines 91 kB view raw
1# Rich imports removed - using simple text formatting 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14import argparse 15 16from utils import ( 17 upsert_block, 18 upsert_agent 19) 20 21import bsky_utils 22from tools.blocks import attach_user_blocks, detach_user_blocks 23from datetime import date 24 25def extract_handles_from_data(data): 26 """Recursively extract all unique handles from nested data structure.""" 27 handles = set() 28 29 def _extract_recursive(obj): 30 if isinstance(obj, dict): 31 # Check if this dict has a 'handle' key 32 if 'handle' in obj: 33 handles.add(obj['handle']) 34 # Recursively check all values 35 for value in obj.values(): 36 _extract_recursive(value) 37 elif isinstance(obj, list): 38 # Recursively check all list items 39 for item in obj: 40 _extract_recursive(item) 41 42 _extract_recursive(data) 43 return list(handles) 44 45# Logging will be configured after argument parsing 46logger = None 47prompt_logger = None 48# Simple text formatting (Rich no longer used) 49SHOW_REASONING = False 50last_archival_query = "archival memory search" 51 52def log_with_panel(message, title=None, border_color="white"): 53 """Log a message with Unicode box-drawing characters""" 54 if title: 55 # Map old color names to appropriate symbols 56 symbol_map = { 57 "blue": "", # Tool calls 58 "green": "", # Success/completion 59 "yellow": "", # Reasoning 60 "red": "", # Errors 61 "white": "", # Default/mentions 62 "cyan": "", # Posts 63 } 64 symbol = symbol_map.get(border_color, "") 65 66 print(f"\n{symbol} {title}") 67 print(f" {'' * len(title)}") 68 # Indent message lines 69 for line in message.split('\n'): 70 print(f" {line}") 71 else: 72 print(message) 73 74 75# Create a client with extended timeout for LLM operations 76CLIENT= Letta( 77 token=os.environ["LETTA_API_KEY"], 78 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout 79) 80 81# Use the "Bluesky" project 82PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 83 84# Notification check delay 85FETCH_NOTIFICATIONS_DELAY_SEC = 30 86 87# Check for new notifications every N queue items 88CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 5 89 90# Queue directory 91QUEUE_DIR = Path("queue") 92QUEUE_DIR.mkdir(exist_ok=True) 93QUEUE_ERROR_DIR = Path("queue/errors") 94QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 95QUEUE_NO_REPLY_DIR = Path("queue/no_reply") 96QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 97PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json") 98 99# Maximum number of processed notifications to track 100MAX_PROCESSED_NOTIFICATIONS = 10000 101 102# Message tracking counters 103message_counters = defaultdict(int) 104start_time = time.time() 105 106# Testing mode flag 107TESTING_MODE = False 108 109# Skip git operations flag 110SKIP_GIT = False 111 112# Synthesis message tracking 113last_synthesis_time = time.time() 114 115def export_agent_state(client, agent, skip_git=False): 116 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 117 try: 118 # Confirm export with user unless git is being skipped 119 if not skip_git: 120 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 121 if response not in ['y', 'yes']: 122 logger.info("Agent export cancelled by user.") 123 return 124 else: 125 logger.info("Exporting agent state (git staging disabled)") 126 127 # Create directories if they don't exist 128 os.makedirs("agent_archive", exist_ok=True) 129 os.makedirs("agents", exist_ok=True) 130 131 # Export agent data 132 logger.info(f"Exporting agent {agent.id}. This takes some time...") 133 agent_data = client.agents.export_file(agent_id=agent.id) 134 135 # Save timestamped archive copy 136 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 137 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 138 with open(archive_file, 'w', encoding='utf-8') as f: 139 json.dump(agent_data, f, indent=2, ensure_ascii=False) 140 141 # Save current agent state 142 current_file = os.path.join("agents", "void.af") 143 with open(current_file, 'w', encoding='utf-8') as f: 144 json.dump(agent_data, f, indent=2, ensure_ascii=False) 145 146 logger.info(f"Agent exported to {archive_file} and {current_file}") 147 148 # Git add only the current agent file (archive is ignored) unless skip_git is True 149 if not skip_git: 150 try: 151 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 152 logger.info("Added current agent file to git staging") 153 except subprocess.CalledProcessError as e: 154 logger.warning(f"Failed to git add agent file: {e}") 155 156 except Exception as e: 157 logger.error(f"Failed to export agent: {e}") 158 159def initialize_void(): 160 logger.info("Starting void agent initialization...") 161 162 # Get the configured void agent by ID 163 logger.info("Loading void agent from config...") 164 from config_loader import get_letta_config 165 letta_config = get_letta_config() 166 agent_id = letta_config['agent_id'] 167 168 try: 169 void_agent = CLIENT.agents.retrieve(agent_id=agent_id) 170 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})") 171 except Exception as e: 172 logger.error(f"Failed to load void agent {agent_id}: {e}") 173 logger.error("Please ensure the agent_id in config.yaml is correct") 174 raise e 175 176 # Export agent state 177 logger.info("Exporting agent state...") 178 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 179 180 # Log agent details 181 logger.info(f"Void agent details - ID: {void_agent.id}") 182 logger.info(f"Agent name: {void_agent.name}") 183 if hasattr(void_agent, 'llm_config'): 184 logger.info(f"Agent model: {void_agent.llm_config.model}") 185 logger.info(f"Agent project_id: {void_agent.project_id}") 186 if hasattr(void_agent, 'tools'): 187 logger.info(f"Agent has {len(void_agent.tools)} tools") 188 for tool in void_agent.tools[:3]: # Show first 3 tools 189 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 190 191 return void_agent 192 193 194def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 195 """Process a mention and generate a reply using the Letta agent. 196 197 Args: 198 void_agent: The Letta agent instance 199 atproto_client: The AT Protocol client 200 notification_data: The notification data dictionary 201 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 202 203 Returns: 204 True: Successfully processed, remove from queue 205 False: Failed but retryable, keep in queue 206 None: Failed with non-retryable error, move to errors directory 207 "no_reply": No reply was generated, move to no_reply directory 208 """ 209 try: 210 logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}") 211 212 # Handle both dict and object inputs for backwards compatibility 213 if isinstance(notification_data, dict): 214 uri = notification_data['uri'] 215 mention_text = notification_data.get('record', {}).get('text', '') 216 author_handle = notification_data['author']['handle'] 217 author_name = notification_data['author'].get('display_name') or author_handle 218 else: 219 # Legacy object access 220 uri = notification_data.uri 221 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 222 author_handle = notification_data.author.handle 223 author_name = notification_data.author.display_name or author_handle 224 225 logger.debug(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...") 226 227 # Retrieve the entire thread associated with the mention 228 try: 229 thread = atproto_client.app.bsky.feed.get_post_thread({ 230 'uri': uri, 231 'parent_height': 40, 232 'depth': 10 233 }) 234 except Exception as e: 235 error_str = str(e) 236 # Check if this is a NotFound error 237 if 'NotFound' in error_str or 'Post not found' in error_str: 238 logger.warning(f"Post not found for URI {uri}, removing from queue") 239 return True # Return True to remove from queue 240 else: 241 # Re-raise other errors 242 logger.error(f"Error fetching thread: {e}") 243 raise 244 245 # Get thread context as YAML string 246 logger.debug("Converting thread to YAML string") 247 try: 248 thread_context = thread_to_yaml_string(thread) 249 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 250 251 # Check if #voidstop appears anywhere in the thread 252 if "#voidstop" in thread_context.lower(): 253 logger.info("Found #voidstop in thread context, skipping this mention") 254 return True # Return True to remove from queue 255 256 # Also check the mention text directly 257 if "#voidstop" in mention_text.lower(): 258 logger.info("Found #voidstop in mention text, skipping this mention") 259 return True # Return True to remove from queue 260 261 # Create a more informative preview by extracting meaningful content 262 lines = thread_context.split('\n') 263 meaningful_lines = [] 264 265 for line in lines: 266 stripped = line.strip() 267 if not stripped: 268 continue 269 270 # Look for lines with actual content (not just structure) 271 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 272 meaningful_lines.append(line) 273 if len(meaningful_lines) >= 5: 274 break 275 276 if meaningful_lines: 277 preview = '\n'.join(meaningful_lines) 278 logger.debug(f"Thread content preview:\n{preview}") 279 else: 280 # If no content fields found, just show it's a thread structure 281 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 282 except Exception as yaml_error: 283 import traceback 284 logger.error(f"Error converting thread to YAML: {yaml_error}") 285 logger.error(f"Full traceback:\n{traceback.format_exc()}") 286 logger.error(f"Thread type: {type(thread)}") 287 if hasattr(thread, '__dict__'): 288 logger.error(f"Thread attributes: {thread.__dict__}") 289 # Try to continue with a simple context 290 thread_context = f"Error processing thread context: {str(yaml_error)}" 291 292 # Create a prompt for the Letta agent with thread context 293 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 294 295MOST RECENT POST (the mention you're responding to): 296"{mention_text}" 297 298FULL THREAD CONTEXT: 299```yaml 300{thread_context} 301``` 302 303The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 304 305To reply, use the add_post_to_bluesky_reply_thread tool: 306- Each call creates one post (max 300 characters) 307- For most responses, a single call is sufficient 308- Only use multiple calls for threaded replies when: 309 * The topic requires extended explanation that cannot fit in 300 characters 310 * You're explicitly asked for a detailed/long response 311 * The conversation naturally benefits from a structured multi-part answer 312- Avoid unnecessary threads - be concise when possible""" 313 314 # Extract all handles from notification and thread data 315 all_handles = set() 316 all_handles.update(extract_handles_from_data(notification_data)) 317 all_handles.update(extract_handles_from_data(thread.model_dump())) 318 unique_handles = list(all_handles) 319 320 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 321 322 # Check if any handles are in known_bots list 323 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs 324 import json 325 326 try: 327 # Check for known bots in thread 328 bot_check_result = check_known_bots(unique_handles, void_agent) 329 bot_check_data = json.loads(bot_check_result) 330 331 if bot_check_data.get("bot_detected", False): 332 detected_bots = bot_check_data.get("detected_bots", []) 333 logger.info(f"Bot detected in thread: {detected_bots}") 334 335 # Decide whether to respond (10% chance) 336 if not should_respond_to_bot_thread(): 337 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}") 338 # Return False to keep in queue for potential later processing 339 return False 340 else: 341 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}") 342 else: 343 logger.debug("No known bots detected in thread") 344 345 except Exception as bot_check_error: 346 logger.warning(f"Error checking for bots: {bot_check_error}") 347 # Continue processing if bot check fails 348 349 # Attach user blocks before agent call 350 attached_handles = [] 351 if unique_handles: 352 try: 353 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 354 attach_result = attach_user_blocks(unique_handles, void_agent) 355 attached_handles = unique_handles # Track successfully attached handles 356 logger.debug(f"Attach result: {attach_result}") 357 except Exception as attach_error: 358 logger.warning(f"Failed to attach user blocks: {attach_error}") 359 # Continue without user blocks rather than failing completely 360 361 # Get response from Letta agent 362 # Format with Unicode characters 363 title = f"MENTION FROM @{author_handle}" 364 print(f"\n{title}") 365 print(f" {'' * len(title)}") 366 # Indent the mention text 367 for line in mention_text.split('\n'): 368 print(f" {line}") 369 370 # Log prompt details to separate logger 371 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 372 373 # Log concise prompt info to main logger 374 thread_handles_count = len(unique_handles) 375 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users") 376 377 try: 378 # Use streaming to avoid 524 timeout errors 379 message_stream = CLIENT.agents.messages.create_stream( 380 agent_id=void_agent.id, 381 messages=[{"role": "user", "content": prompt}], 382 stream_tokens=False, # Step streaming only (faster than token streaming) 383 max_steps=100 384 ) 385 386 # Collect the streaming response 387 all_messages = [] 388 for chunk in message_stream: 389 # Log condensed chunk info 390 if hasattr(chunk, 'message_type'): 391 if chunk.message_type == 'reasoning_message': 392 # Show full reasoning without truncation 393 if SHOW_REASONING: 394 # Format with Unicode characters 395 print("\n◆ Reasoning") 396 print(" ─────────") 397 # Indent reasoning lines 398 for line in chunk.reasoning.split('\n'): 399 print(f" {line}") 400 else: 401 # Default log format (only when --reasoning is used due to log level) 402 # Format with Unicode characters 403 print("\n◆ Reasoning") 404 print(" ─────────") 405 # Indent reasoning lines 406 for line in chunk.reasoning.split('\n'): 407 print(f" {line}") 408 409 # Create ATProto record for reasoning (unless in testing mode) 410 if not testing_mode and hasattr(chunk, 'reasoning'): 411 try: 412 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 413 except Exception as e: 414 logger.debug(f"Failed to create reasoning record: {e}") 415 elif chunk.message_type == 'tool_call_message': 416 # Parse tool arguments for better display 417 tool_name = chunk.tool_call.name 418 419 # Create ATProto record for tool call (unless in testing mode) 420 if not testing_mode: 421 try: 422 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 423 bsky_utils.create_tool_call_record( 424 atproto_client, 425 tool_name, 426 chunk.tool_call.arguments, 427 tool_call_id 428 ) 429 except Exception as e: 430 logger.debug(f"Failed to create tool call record: {e}") 431 432 try: 433 args = json.loads(chunk.tool_call.arguments) 434 # Format based on tool type 435 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: 436 # Extract the text being posted 437 text = args.get('text', '') 438 if text: 439 # Format with Unicode characters 440 print("\n✎ Bluesky Post") 441 print(" ────────────") 442 # Indent post text 443 for line in text.split('\n'): 444 print(f" {line}") 445 else: 446 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 447 elif tool_name == 'archival_memory_search': 448 query = args.get('query', 'unknown') 449 global last_archival_query 450 last_archival_query = query 451 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 452 elif tool_name == 'archival_memory_insert': 453 content = args.get('content', '') 454 # Show the full content being inserted 455 log_with_panel(content, f"Tool call: {tool_name}", "blue") 456 elif tool_name == 'update_block': 457 label = args.get('label', 'unknown') 458 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 459 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 460 else: 461 # Generic display for other tools 462 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 463 if len(args_str) > 150: 464 args_str = args_str[:150] + "..." 465 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 466 except: 467 # Fallback to original format if parsing fails 468 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 469 elif chunk.message_type == 'tool_return_message': 470 # Enhanced tool result logging 471 tool_name = chunk.name 472 status = chunk.status 473 474 if status == 'success': 475 # Try to show meaningful result info based on tool type 476 if hasattr(chunk, 'tool_return') and chunk.tool_return: 477 result_str = str(chunk.tool_return) 478 if tool_name == 'archival_memory_search': 479 480 try: 481 # Handle both string and list formats 482 if isinstance(chunk.tool_return, str): 483 # The string format is: "([{...}, {...}], count)" 484 # We need to extract just the list part 485 if chunk.tool_return.strip(): 486 # Find the list part between the first [ and last ] 487 start_idx = chunk.tool_return.find('[') 488 end_idx = chunk.tool_return.rfind(']') 489 if start_idx != -1 and end_idx != -1: 490 list_str = chunk.tool_return[start_idx:end_idx+1] 491 # Use ast.literal_eval since this is Python literal syntax, not JSON 492 import ast 493 results = ast.literal_eval(list_str) 494 else: 495 logger.warning("Could not find list in archival_memory_search result") 496 results = [] 497 else: 498 logger.warning("Empty string returned from archival_memory_search") 499 results = [] 500 else: 501 # If it's already a list, use directly 502 results = chunk.tool_return 503 504 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name}", "green") 505 506 # Use the captured search query from the tool call 507 search_query = last_archival_query 508 509 # Combine all results into a single text block 510 content_text = "" 511 for i, entry in enumerate(results, 1): 512 timestamp = entry.get('timestamp', 'N/A') 513 content = entry.get('content', '') 514 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" 515 516 # Format with Unicode characters 517 title = f"{search_query} ({len(results)} results)" 518 print(f"\n{title}") 519 print(f" {'' * len(title)}") 520 # Indent content text 521 for line in content_text.strip().split('\n'): 522 print(f" {line}") 523 524 except Exception as e: 525 logger.error(f"Error formatting archival memory results: {e}") 526 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name}", "green") 527 elif tool_name == 'add_post_to_bluesky_reply_thread': 528 # Just show success for bluesky posts, the text was already shown in tool call 529 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") 530 elif tool_name == 'archival_memory_insert': 531 # Skip archival memory insert results (always returns None) 532 pass 533 elif tool_name == 'update_block': 534 log_with_panel("Memory block updated", f"Tool result: {tool_name}", "green") 535 else: 536 # Generic success with preview 537 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 538 log_with_panel(preview, f"Tool result: {tool_name}", "green") 539 else: 540 log_with_panel("Success", f"Tool result: {tool_name}", "green") 541 elif status == 'error': 542 # Show error details 543 if tool_name == 'add_post_to_bluesky_reply_thread': 544 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" 545 log_with_panel(error_str, f"Bluesky Post ✗", "red") 546 elif tool_name == 'archival_memory_insert': 547 # Skip archival memory insert errors too 548 pass 549 else: 550 error_preview = "" 551 if hasattr(chunk, 'tool_return') and chunk.tool_return: 552 error_str = str(chunk.tool_return) 553 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 554 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name}", "red") 555 else: 556 log_with_panel("Error occurred", f"Tool result: {tool_name}", "red") 557 else: 558 logger.info(f"Tool result: {tool_name} - {status}") 559 elif chunk.message_type == 'assistant_message': 560 # Format with Unicode characters 561 print("\n▶ Assistant Response") 562 print(" ──────────────────") 563 # Indent response text 564 for line in chunk.content.split('\n'): 565 print(f" {line}") 566 else: 567 # Filter out verbose message types 568 if chunk.message_type not in ['usage_statistics', 'stop_reason']: 569 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") 570 else: 571 logger.info(f"📦 Stream status: {chunk}") 572 573 # Log full chunk for debugging 574 logger.debug(f"Full streaming chunk: {chunk}") 575 all_messages.append(chunk) 576 if str(chunk) == 'done': 577 break 578 579 # Convert streaming response to standard format for compatibility 580 message_response = type('StreamingResponse', (), { 581 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 582 })() 583 except Exception as api_error: 584 import traceback 585 error_str = str(api_error) 586 logger.error(f"Letta API error: {api_error}") 587 logger.error(f"Error type: {type(api_error).__name__}") 588 logger.error(f"Full traceback:\n{traceback.format_exc()}") 589 logger.error(f"Mention text was: {mention_text}") 590 logger.error(f"Author: @{author_handle}") 591 logger.error(f"URI: {uri}") 592 593 594 # Try to extract more info from different error types 595 if hasattr(api_error, 'response'): 596 logger.error(f"Error response object exists") 597 if hasattr(api_error.response, 'text'): 598 logger.error(f"Response text: {api_error.response.text}") 599 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 600 try: 601 logger.error(f"Response JSON: {api_error.response.json()}") 602 except: 603 pass 604 605 # Check for specific error types 606 if hasattr(api_error, 'status_code'): 607 logger.error(f"API Status code: {api_error.status_code}") 608 if hasattr(api_error, 'body'): 609 logger.error(f"API Response body: {api_error.body}") 610 if hasattr(api_error, 'headers'): 611 logger.error(f"API Response headers: {api_error.headers}") 612 613 if api_error.status_code == 413: 614 logger.error("413 Payload Too Large - moving to errors directory") 615 return None # Move to errors directory - payload is too large to ever succeed 616 elif api_error.status_code == 524: 617 logger.error("524 error - timeout from Cloudflare, will retry later") 618 return False # Keep in queue for retry 619 620 # Check if error indicates we should remove from queue 621 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 622 logger.warning("Payload too large error, moving to errors directory") 623 return None # Move to errors directory - cannot be fixed by retry 624 elif 'status_code: 524' in error_str: 625 logger.warning("524 timeout error, keeping in queue for retry") 626 return False # Keep in queue for retry 627 628 raise 629 630 # Log successful response 631 logger.debug("Successfully received response from Letta API") 632 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 633 634 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 635 reply_candidates = [] 636 tool_call_results = {} # Map tool_call_id to status 637 ack_note = None # Track any note from annotate_ack tool 638 639 logger.debug(f"Processing {len(message_response.messages)} response messages...") 640 641 # First pass: collect tool return statuses 642 ignored_notification = False 643 ignore_reason = "" 644 ignore_category = "" 645 646 for message in message_response.messages: 647 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 648 if message.name == 'add_post_to_bluesky_reply_thread': 649 tool_call_results[message.tool_call_id] = message.status 650 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 651 elif message.name == 'ignore_notification': 652 # Check if the tool was successful 653 if hasattr(message, 'tool_return') and message.status == 'success': 654 # Parse the return value to extract category and reason 655 result_str = str(message.tool_return) 656 if 'IGNORED_NOTIFICATION::' in result_str: 657 parts = result_str.split('::') 658 if len(parts) >= 3: 659 ignore_category = parts[1] 660 ignore_reason = parts[2] 661 ignored_notification = True 662 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 663 elif message.name == 'bluesky_reply': 664 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 665 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 666 logger.error("Update the agent's tools using register_tools.py") 667 # Export agent state before terminating 668 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 669 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 670 exit(1) 671 672 # Second pass: process messages and check for successful tool calls 673 for i, message in enumerate(message_response.messages, 1): 674 # Log concise message info instead of full object 675 msg_type = getattr(message, 'message_type', 'unknown') 676 if hasattr(message, 'reasoning') and message.reasoning: 677 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 678 elif hasattr(message, 'tool_call') and message.tool_call: 679 tool_name = message.tool_call.name 680 logger.debug(f" {i}. {msg_type}: {tool_name}") 681 elif hasattr(message, 'tool_return'): 682 tool_name = getattr(message, 'name', 'unknown_tool') 683 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 684 status = getattr(message, 'status', 'unknown') 685 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 686 elif hasattr(message, 'text'): 687 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 688 else: 689 logger.debug(f" {i}. {msg_type}: <no content>") 690 691 # Check for halt_activity tool call 692 if hasattr(message, 'tool_call') and message.tool_call: 693 if message.tool_call.name == 'halt_activity': 694 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 695 try: 696 args = json.loads(message.tool_call.arguments) 697 reason = args.get('reason', 'Agent requested halt') 698 logger.info(f"Halt reason: {reason}") 699 except: 700 logger.info("Halt reason: <unable to parse>") 701 702 # Delete the queue file before terminating 703 if queue_filepath and queue_filepath.exists(): 704 queue_filepath.unlink() 705 logger.info(f"Deleted queue file: {queue_filepath.name}") 706 707 # Also mark as processed to avoid reprocessing 708 processed_uris = load_processed_notifications() 709 processed_uris.add(notification_data.get('uri', '')) 710 save_processed_notifications(processed_uris) 711 712 # Export agent state before terminating 713 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 714 715 # Exit the program 716 logger.info("=== BOT TERMINATED BY AGENT ===") 717 exit(0) 718 719 # Check for deprecated bluesky_reply tool 720 if hasattr(message, 'tool_call') and message.tool_call: 721 if message.tool_call.name == 'bluesky_reply': 722 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 723 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 724 logger.error("Update the agent's tools using register_tools.py") 725 # Export agent state before terminating 726 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT) 727 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 728 exit(1) 729 730 # Collect annotate_ack tool calls 731 elif message.tool_call.name == 'annotate_ack': 732 try: 733 args = json.loads(message.tool_call.arguments) 734 note = args.get('note', '') 735 if note: 736 ack_note = note 737 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 738 except json.JSONDecodeError as e: 739 logger.error(f"Failed to parse annotate_ack arguments: {e}") 740 741 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 742 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 743 tool_call_id = message.tool_call.tool_call_id 744 tool_status = tool_call_results.get(tool_call_id, 'unknown') 745 746 if tool_status == 'success': 747 try: 748 args = json.loads(message.tool_call.arguments) 749 reply_text = args.get('text', '') 750 reply_lang = args.get('lang', 'en-US') 751 752 if reply_text: # Only add if there's actual content 753 reply_candidates.append((reply_text, reply_lang)) 754 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 755 except json.JSONDecodeError as e: 756 logger.error(f"Failed to parse tool call arguments: {e}") 757 elif tool_status == 'error': 758 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 759 else: 760 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 761 762 # Check for conflicting tool calls 763 if reply_candidates and ignored_notification: 764 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 765 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 766 logger.warning("Item will be left in queue for manual review") 767 # Return False to keep in queue 768 return False 769 770 if reply_candidates: 771 # Aggregate reply posts into a thread 772 reply_messages = [] 773 reply_langs = [] 774 for text, lang in reply_candidates: 775 reply_messages.append(text) 776 reply_langs.append(lang) 777 778 # Use the first language for the entire thread (could be enhanced later) 779 reply_lang = reply_langs[0] if reply_langs else 'en-US' 780 781 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 782 783 # Display the generated reply thread 784 if len(reply_messages) == 1: 785 content = reply_messages[0] 786 title = f"Reply to @{author_handle}" 787 else: 788 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) 789 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" 790 791 # Format with Unicode characters 792 print(f"\n{title}") 793 print(f" {'' * len(title)}") 794 # Indent content lines 795 for line in content.split('\n'): 796 print(f" {line}") 797 798 # Send the reply(s) with language (unless in testing mode) 799 if testing_mode: 800 logger.info("TESTING MODE: Skipping actual Bluesky post") 801 response = True # Simulate success 802 else: 803 if len(reply_messages) == 1: 804 # Single reply - use existing function 805 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 806 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 807 response = bsky_utils.reply_to_notification( 808 client=atproto_client, 809 notification=notification_data, 810 reply_text=cleaned_text, 811 lang=reply_lang 812 ) 813 else: 814 # Multiple replies - use new threaded function 815 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 816 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 817 response = bsky_utils.reply_with_thread_to_notification( 818 client=atproto_client, 819 notification=notification_data, 820 reply_messages=cleaned_messages, 821 lang=reply_lang 822 ) 823 824 if response: 825 logger.info(f"Successfully replied to @{author_handle}") 826 827 # Acknowledge the post we're replying to with stream.thought.ack 828 try: 829 post_uri = notification_data.get('uri') 830 post_cid = notification_data.get('cid') 831 832 if post_uri and post_cid: 833 ack_result = bsky_utils.acknowledge_post( 834 client=atproto_client, 835 post_uri=post_uri, 836 post_cid=post_cid, 837 note=ack_note 838 ) 839 if ack_result: 840 if ack_note: 841 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")") 842 else: 843 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack") 844 else: 845 logger.warning(f"Failed to acknowledge post from @{author_handle}") 846 else: 847 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}") 848 except Exception as e: 849 logger.error(f"Error acknowledging post from @{author_handle}: {e}") 850 # Don't fail the entire operation if acknowledgment fails 851 852 return True 853 else: 854 logger.error(f"Failed to send reply to @{author_handle}") 855 return False 856 else: 857 # Check if notification was explicitly ignored 858 if ignored_notification: 859 logger.info(f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})") 860 return "ignored" 861 else: 862 logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder") 863 return "no_reply" 864 865 except Exception as e: 866 logger.error(f"Error processing mention: {e}") 867 return False 868 finally: 869 # Detach user blocks after agent response (success or failure) 870 if 'attached_handles' in locals() and attached_handles: 871 try: 872 logger.info(f"Detaching user blocks for handles: {attached_handles}") 873 detach_result = detach_user_blocks(attached_handles, void_agent) 874 logger.debug(f"Detach result: {detach_result}") 875 except Exception as detach_error: 876 logger.warning(f"Failed to detach user blocks: {detach_error}") 877 878 879def notification_to_dict(notification): 880 """Convert a notification object to a dictionary for JSON serialization.""" 881 return { 882 'uri': notification.uri, 883 'cid': notification.cid, 884 'reason': notification.reason, 885 'is_read': notification.is_read, 886 'indexed_at': notification.indexed_at, 887 'author': { 888 'handle': notification.author.handle, 889 'display_name': notification.author.display_name, 890 'did': notification.author.did 891 }, 892 'record': { 893 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 894 } 895 } 896 897 898def load_processed_notifications(): 899 """Load the set of processed notification URIs.""" 900 if PROCESSED_NOTIFICATIONS_FILE.exists(): 901 try: 902 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f: 903 data = json.load(f) 904 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS) 905 if len(data) > MAX_PROCESSED_NOTIFICATIONS: 906 data = data[-MAX_PROCESSED_NOTIFICATIONS:] 907 save_processed_notifications(data) 908 return set(data) 909 except Exception as e: 910 logger.error(f"Error loading processed notifications: {e}") 911 return set() 912 913 914def save_processed_notifications(processed_set): 915 """Save the set of processed notification URIs.""" 916 try: 917 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f: 918 json.dump(list(processed_set), f) 919 except Exception as e: 920 logger.error(f"Error saving processed notifications: {e}") 921 922 923def save_notification_to_queue(notification, is_priority=None): 924 """Save a notification to the queue directory with priority-based filename.""" 925 try: 926 # Check if already processed 927 processed_uris = load_processed_notifications() 928 929 # Handle both notification objects and dicts 930 if isinstance(notification, dict): 931 notif_dict = notification 932 notification_uri = notification.get('uri') 933 else: 934 notif_dict = notification_to_dict(notification) 935 notification_uri = notification.uri 936 937 if notification_uri in processed_uris: 938 logger.debug(f"Notification already processed: {notification_uri}") 939 return False 940 941 # Create JSON string 942 notif_json = json.dumps(notif_dict, sort_keys=True) 943 944 # Generate hash for filename (to avoid duplicates) 945 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 946 947 # Determine priority based on author handle or explicit priority 948 if is_priority is not None: 949 priority_prefix = "0_" if is_priority else "1_" 950 else: 951 if isinstance(notification, dict): 952 author_handle = notification.get('author', {}).get('handle', '') 953 else: 954 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 955 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 956 957 # Create filename with priority, timestamp and hash 958 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 959 reason = notif_dict.get('reason', 'unknown') 960 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" 961 filepath = QUEUE_DIR / filename 962 963 # Check if this notification URI is already in the queue 964 for existing_file in QUEUE_DIR.glob("*.json"): 965 if existing_file.name == "processed_notifications.json": 966 continue 967 try: 968 with open(existing_file, 'r') as f: 969 existing_data = json.load(f) 970 if existing_data.get('uri') == notification_uri: 971 logger.debug(f"Notification already queued (URI: {notification_uri})") 972 return False 973 except: 974 continue 975 976 # Write to file 977 with open(filepath, 'w') as f: 978 json.dump(notif_dict, f, indent=2) 979 980 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 981 logger.info(f"Queued notification ({priority_label}): {filename}") 982 return True 983 984 except Exception as e: 985 logger.error(f"Error saving notification to queue: {e}") 986 return False 987 988 989def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False): 990 """Load and process all notifications from the queue in priority order.""" 991 try: 992 # Get all JSON files in queue directory (excluding processed_notifications.json) 993 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 994 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 995 996 # Filter out and delete like notifications immediately 997 queue_files = [] 998 likes_deleted = 0 999 1000 for filepath in all_queue_files: 1001 try: 1002 with open(filepath, 'r') as f: 1003 notif_data = json.load(f) 1004 1005 # If it's a like, delete it immediately and don't process 1006 if notif_data.get('reason') == 'like': 1007 filepath.unlink() 1008 likes_deleted += 1 1009 logger.debug(f"Deleted like notification: {filepath.name}") 1010 else: 1011 queue_files.append(filepath) 1012 except Exception as e: 1013 logger.warning(f"Error checking notification file {filepath.name}: {e}") 1014 queue_files.append(filepath) # Keep it in case it's valid 1015 1016 if likes_deleted > 0: 1017 logger.info(f"Deleted {likes_deleted} like notifications from queue") 1018 1019 if not queue_files: 1020 return 1021 1022 logger.info(f"Processing {len(queue_files)} queued notifications") 1023 1024 # Log current statistics 1025 elapsed_time = time.time() - start_time 1026 total_messages = sum(message_counters.values()) 1027 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1028 1029 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 1030 1031 for i, filepath in enumerate(queue_files, 1): 1032 # Check for new notifications periodically during queue processing 1033 if i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1: 1034 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") 1035 try: 1036 # Fetch and queue new notifications without processing them 1037 new_count = fetch_and_queue_new_notifications(atproto_client) 1038 1039 if new_count > 0: 1040 logger.info(f"Added {new_count} new notifications to queue") 1041 # Reload the queue files to include the new items 1042 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1043 queue_files = updated_queue_files 1044 logger.info(f"Queue updated: now {len(queue_files)} total items") 1045 except Exception as e: 1046 logger.error(f"Error checking for new notifications: {e}") 1047 1048 logger.info(f"Processing queue file {i}/{len(queue_files)}: {filepath.name}") 1049 try: 1050 # Load notification data 1051 with open(filepath, 'r') as f: 1052 notif_data = json.load(f) 1053 1054 # Process based on type using dict data directly 1055 success = False 1056 if notif_data['reason'] == "mention": 1057 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1058 if success: 1059 message_counters['mentions'] += 1 1060 elif notif_data['reason'] == "reply": 1061 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1062 if success: 1063 message_counters['replies'] += 1 1064 elif notif_data['reason'] == "follow": 1065 author_handle = notif_data['author']['handle'] 1066 author_display_name = notif_data['author'].get('display_name', 'no display name') 1067 follow_update = f"@{author_handle} ({author_display_name}) started following you." 1068 logger.info(f"Notifying agent about new follower: @{author_handle}") 1069 CLIENT.agents.messages.create( 1070 agent_id = void_agent.id, 1071 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 1072 ) 1073 success = True # Follow updates are always successful 1074 if success: 1075 message_counters['follows'] += 1 1076 elif notif_data['reason'] == "repost": 1077 # Skip reposts silently 1078 success = True # Skip reposts but mark as successful to remove from queue 1079 if success: 1080 message_counters['reposts_skipped'] += 1 1081 elif notif_data['reason'] == "like": 1082 # Skip likes silently 1083 success = True # Skip likes but mark as successful to remove from queue 1084 if success: 1085 message_counters.setdefault('likes_skipped', 0) 1086 message_counters['likes_skipped'] += 1 1087 else: 1088 logger.warning(f"Unknown notification type: {notif_data['reason']}") 1089 success = True # Remove unknown types from queue 1090 1091 # Handle file based on processing result 1092 if success: 1093 if testing_mode: 1094 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") 1095 else: 1096 filepath.unlink() 1097 logger.info(f"Successfully processed and removed: {filepath.name}") 1098 1099 # Mark as processed to avoid reprocessing 1100 processed_uris = load_processed_notifications() 1101 processed_uris.add(notif_data['uri']) 1102 save_processed_notifications(processed_uris) 1103 1104 elif success is None: # Special case for moving to error directory 1105 error_path = QUEUE_ERROR_DIR / filepath.name 1106 filepath.rename(error_path) 1107 logger.warning(f"Moved {filepath.name} to errors directory") 1108 1109 # Also mark as processed to avoid retrying 1110 processed_uris = load_processed_notifications() 1111 processed_uris.add(notif_data['uri']) 1112 save_processed_notifications(processed_uris) 1113 1114 elif success == "no_reply": # Special case for moving to no_reply directory 1115 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 1116 filepath.rename(no_reply_path) 1117 logger.info(f"Moved {filepath.name} to no_reply directory") 1118 1119 # Also mark as processed to avoid retrying 1120 processed_uris = load_processed_notifications() 1121 processed_uris.add(notif_data['uri']) 1122 save_processed_notifications(processed_uris) 1123 1124 elif success == "ignored": # Special case for explicitly ignored notifications 1125 # For ignored notifications, we just delete them (not move to no_reply) 1126 filepath.unlink() 1127 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1128 1129 # Also mark as processed to avoid retrying 1130 processed_uris = load_processed_notifications() 1131 processed_uris.add(notif_data['uri']) 1132 save_processed_notifications(processed_uris) 1133 1134 else: 1135 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 1136 1137 except Exception as e: 1138 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 1139 # Keep the file for retry later 1140 1141 except Exception as e: 1142 logger.error(f"Error loading queued notifications: {e}") 1143 1144 1145def fetch_and_queue_new_notifications(atproto_client): 1146 """Fetch new notifications and queue them without processing.""" 1147 try: 1148 # Get current time for marking notifications as seen 1149 logger.debug("Getting current time for notification marking...") 1150 last_seen_at = atproto_client.get_current_time_iso() 1151 1152 # Fetch ALL notifications using pagination 1153 all_notifications = [] 1154 cursor = None 1155 page_count = 0 1156 max_pages = 20 # Safety limit to prevent infinite loops 1157 1158 while page_count < max_pages: 1159 try: 1160 # Fetch notifications page 1161 if cursor: 1162 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1163 params={'cursor': cursor, 'limit': 100} 1164 ) 1165 else: 1166 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1167 params={'limit': 100} 1168 ) 1169 1170 page_count += 1 1171 page_notifications = notifications_response.notifications 1172 1173 if not page_notifications: 1174 break 1175 1176 all_notifications.extend(page_notifications) 1177 1178 # Check if there are more pages 1179 cursor = getattr(notifications_response, 'cursor', None) 1180 if not cursor: 1181 break 1182 1183 except Exception as e: 1184 logger.error(f"Error fetching notifications page {page_count}: {e}") 1185 break 1186 1187 # Now process all fetched notifications 1188 new_count = 0 1189 if all_notifications: 1190 # Mark as seen first 1191 try: 1192 atproto_client.app.bsky.notification.update_seen( 1193 data={'seenAt': last_seen_at} 1194 ) 1195 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") 1196 except Exception as e: 1197 logger.error(f"Error marking notifications as seen: {e}") 1198 1199 # Queue all new notifications (except likes and already read) 1200 for notif in all_notifications: 1201 # Skip if already read or if it's a like 1202 if (hasattr(notif, 'is_read') and notif.is_read) or (hasattr(notif, 'reason') and notif.reason == 'like'): 1203 continue 1204 1205 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif 1206 1207 # Skip likes in dict form too 1208 if notif_dict.get('reason') == 'like': 1209 continue 1210 1211 # Check if it's a priority notification 1212 is_priority = False 1213 1214 # Priority for cameron.pfiffer.org notifications 1215 author_handle = notif_dict.get('author', {}).get('handle', '') 1216 if author_handle == "cameron.pfiffer.org": 1217 is_priority = True 1218 1219 # Also check for priority keywords in mentions 1220 if notif_dict.get('reason') == 'mention': 1221 # Get the mention text to check for priority keywords 1222 record = notif_dict.get('record', {}) 1223 text = record.get('text', '') 1224 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): 1225 is_priority = True 1226 1227 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1228 new_count += 1 1229 1230 logger.info(f"Queued {new_count} new notifications and marked as seen") 1231 else: 1232 logger.debug("No new notifications to queue") 1233 1234 return new_count 1235 1236 except Exception as e: 1237 logger.error(f"Error fetching and queueing notifications: {e}") 1238 return 0 1239 1240 1241def process_notifications(void_agent, atproto_client, testing_mode=False): 1242 """Fetch new notifications, queue them, and process the queue.""" 1243 try: 1244 # Fetch and queue new notifications 1245 new_count = fetch_and_queue_new_notifications(atproto_client) 1246 1247 if new_count > 0: 1248 logger.info(f"Found {new_count} new notifications to process") 1249 1250 # Now process the entire queue (old + new notifications) 1251 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode) 1252 1253 except Exception as e: 1254 logger.error(f"Error processing notifications: {e}") 1255 1256 1257def send_synthesis_message(client: Letta, agent_id: str, atproto_client=None) -> None: 1258 """ 1259 Send a synthesis message to the agent every 10 minutes. 1260 This prompts the agent to synthesize its recent experiences. 1261 1262 Args: 1263 client: Letta client 1264 agent_id: Agent ID to send synthesis to 1265 atproto_client: Optional AT Protocol client for posting synthesis results 1266 """ 1267 # Track attached temporal blocks for cleanup 1268 attached_temporal_labels = [] 1269 1270 try: 1271 logger.info("🧠 Preparing synthesis with temporal journal blocks") 1272 1273 # Attach temporal blocks before synthesis 1274 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id) 1275 if not success: 1276 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway") 1277 1278 # Create enhanced synthesis prompt 1279 today = date.today() 1280 synthesis_prompt = f"""Time for synthesis and reflection. 1281 1282You have access to temporal journal blocks for recording your thoughts and experiences: 1283- void_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')}) 1284- void_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')}) 1285- void_year_{today.year}: This year's journal ({today.year}) 1286 1287These journal blocks are attached temporarily for this synthesis session. Use them to: 12881. Record significant interactions and insights from recent experiences 12892. Track patterns in conversations and user behaviors 12903. Note your evolving understanding of the digital social environment 12914. Reflect on your growth and changes in perspective 12925. Document memorable moments or interesting discoveries 1293 1294The journal entries should be cumulative - add to existing content rather than replacing it. 1295Consider both immediate experiences (daily) and longer-term patterns (monthly/yearly). 1296 1297After recording in your journals, synthesize your recent experiences into your core memory blocks 1298(zeitgeist, void-persona, void-humans) as you normally would. 1299 1300Begin your synthesis and journaling now.""" 1301 1302 logger.info("🧠 Sending enhanced synthesis prompt to agent") 1303 1304 # Send synthesis message with streaming to show tool use 1305 message_stream = client.agents.messages.create_stream( 1306 agent_id=agent_id, 1307 messages=[{"role": "user", "content": synthesis_prompt}], 1308 stream_tokens=False, 1309 max_steps=100 1310 ) 1311 1312 # Track synthesis content for potential posting 1313 synthesis_posts = [] 1314 ack_note = None 1315 1316 # Process the streaming response 1317 for chunk in message_stream: 1318 if hasattr(chunk, 'message_type'): 1319 if chunk.message_type == 'reasoning_message': 1320 if SHOW_REASONING: 1321 print("\n◆ Reasoning") 1322 print(" ─────────") 1323 for line in chunk.reasoning.split('\n'): 1324 print(f" {line}") 1325 1326 # Create ATProto record for reasoning (if we have atproto client) 1327 if atproto_client and hasattr(chunk, 'reasoning'): 1328 try: 1329 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 1330 except Exception as e: 1331 logger.debug(f"Failed to create reasoning record during synthesis: {e}") 1332 elif chunk.message_type == 'tool_call_message': 1333 tool_name = chunk.tool_call.name 1334 1335 # Create ATProto record for tool call (if we have atproto client) 1336 if atproto_client: 1337 try: 1338 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 1339 bsky_utils.create_tool_call_record( 1340 atproto_client, 1341 tool_name, 1342 chunk.tool_call.arguments, 1343 tool_call_id 1344 ) 1345 except Exception as e: 1346 logger.debug(f"Failed to create tool call record during synthesis: {e}") 1347 try: 1348 args = json.loads(chunk.tool_call.arguments) 1349 if tool_name == 'archival_memory_search': 1350 query = args.get('query', 'unknown') 1351 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 1352 elif tool_name == 'archival_memory_insert': 1353 content = args.get('content', '') 1354 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue") 1355 elif tool_name == 'update_block': 1356 label = args.get('label', 'unknown') 1357 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', '')) 1358 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 1359 elif tool_name == 'annotate_ack': 1360 note = args.get('note', '') 1361 if note: 1362 ack_note = note 1363 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue") 1364 elif tool_name == 'add_post_to_bluesky_reply_thread': 1365 text = args.get('text', '') 1366 synthesis_posts.append(text) 1367 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue") 1368 else: 1369 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 1370 if len(args_str) > 150: 1371 args_str = args_str[:150] + "..." 1372 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 1373 except: 1374 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 1375 elif chunk.message_type == 'tool_return_message': 1376 if chunk.status == 'success': 1377 log_with_panel("Success", f"Tool result: {chunk.name}", "green") 1378 else: 1379 log_with_panel("Error", f"Tool result: {chunk.name}", "red") 1380 elif chunk.message_type == 'assistant_message': 1381 print("\n▶ Synthesis Response") 1382 print(" ──────────────────") 1383 for line in chunk.content.split('\n'): 1384 print(f" {line}") 1385 1386 if str(chunk) == 'done': 1387 break 1388 1389 logger.info("🧠 Synthesis message processed successfully") 1390 1391 # Handle synthesis acknowledgments if we have an atproto client 1392 if atproto_client and ack_note: 1393 try: 1394 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note) 1395 if result: 1396 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...") 1397 else: 1398 logger.warning("Failed to create synthesis acknowledgment") 1399 except Exception as e: 1400 logger.error(f"Error creating synthesis acknowledgment: {e}") 1401 1402 # Handle synthesis posts if any were generated 1403 if atproto_client and synthesis_posts: 1404 try: 1405 for post_text in synthesis_posts: 1406 cleaned_text = bsky_utils.remove_outside_quotes(post_text) 1407 response = bsky_utils.send_post(atproto_client, cleaned_text) 1408 if response: 1409 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...") 1410 else: 1411 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...") 1412 except Exception as e: 1413 logger.error(f"Error posting synthesis content: {e}") 1414 1415 except Exception as e: 1416 logger.error(f"Error sending synthesis message: {e}") 1417 finally: 1418 # Always detach temporal blocks after synthesis 1419 if attached_temporal_labels: 1420 logger.info("🧠 Detaching temporal journal blocks after synthesis") 1421 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels) 1422 if not detach_success: 1423 logger.warning("Some temporal blocks may not have been detached properly") 1424 1425 1426def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None: 1427 """ 1428 Detach all user blocks from the agent to prevent memory bloat. 1429 This should be called periodically to ensure clean state. 1430 """ 1431 try: 1432 # Get all blocks attached to the agent 1433 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 1434 1435 user_blocks_to_detach = [] 1436 for block in attached_blocks: 1437 if hasattr(block, 'label') and block.label.startswith('user_'): 1438 user_blocks_to_detach.append({ 1439 'label': block.label, 1440 'id': block.id 1441 }) 1442 1443 if not user_blocks_to_detach: 1444 logger.debug("No user blocks found to detach during periodic cleanup") 1445 return 1446 1447 # Detach each user block 1448 detached_count = 0 1449 for block_info in user_blocks_to_detach: 1450 try: 1451 client.agents.blocks.detach( 1452 agent_id=agent_id, 1453 block_id=str(block_info['id']) 1454 ) 1455 detached_count += 1 1456 logger.debug(f"Detached user block: {block_info['label']}") 1457 except Exception as e: 1458 logger.warning(f"Failed to detach block {block_info['label']}: {e}") 1459 1460 if detached_count > 0: 1461 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks") 1462 1463 except Exception as e: 1464 logger.error(f"Error during periodic user block cleanup: {e}") 1465 1466 1467def attach_temporal_blocks(client: Letta, agent_id: str) -> tuple: 1468 """ 1469 Attach temporal journal blocks (day, month, year) to the agent for synthesis. 1470 Creates blocks if they don't exist. 1471 1472 Returns: 1473 Tuple of (success: bool, attached_labels: list) 1474 """ 1475 try: 1476 today = date.today() 1477 1478 # Generate temporal block labels 1479 day_label = f"void_day_{today.strftime('%Y_%m_%d')}" 1480 month_label = f"void_month_{today.strftime('%Y_%m')}" 1481 year_label = f"void_year_{today.year}" 1482 1483 temporal_labels = [day_label, month_label, year_label] 1484 attached_labels = [] 1485 1486 # Get current blocks attached to agent 1487 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1488 current_block_labels = {block.label for block in current_blocks} 1489 current_block_ids = {str(block.id) for block in current_blocks} 1490 1491 for label in temporal_labels: 1492 try: 1493 # Skip if already attached 1494 if label in current_block_labels: 1495 logger.debug(f"Temporal block already attached: {label}") 1496 attached_labels.append(label) 1497 continue 1498 1499 # Check if block exists globally 1500 blocks = client.blocks.list(label=label) 1501 1502 if blocks and len(blocks) > 0: 1503 block = blocks[0] 1504 # Check if already attached by ID 1505 if str(block.id) in current_block_ids: 1506 logger.debug(f"Temporal block already attached by ID: {label}") 1507 attached_labels.append(label) 1508 continue 1509 else: 1510 # Create new temporal block with appropriate header 1511 if "day" in label: 1512 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}" 1513 initial_content = f"{header}\n\nNo entries yet for today." 1514 elif "month" in label: 1515 header = f"# Monthly Journal - {today.strftime('%B %Y')}" 1516 initial_content = f"{header}\n\nNo entries yet for this month." 1517 else: # year 1518 header = f"# Yearly Journal - {today.year}" 1519 initial_content = f"{header}\n\nNo entries yet for this year." 1520 1521 block = client.blocks.create( 1522 label=label, 1523 value=initial_content, 1524 limit=10000 # Larger limit for journal blocks 1525 ) 1526 logger.info(f"Created new temporal block: {label}") 1527 1528 # Attach the block 1529 client.agents.blocks.attach( 1530 agent_id=agent_id, 1531 block_id=str(block.id) 1532 ) 1533 attached_labels.append(label) 1534 logger.info(f"Attached temporal block: {label}") 1535 1536 except Exception as e: 1537 # Check for duplicate constraint errors 1538 error_str = str(e) 1539 if "duplicate key value violates unique constraint" in error_str: 1540 logger.debug(f"Temporal block already attached (constraint): {label}") 1541 attached_labels.append(label) 1542 else: 1543 logger.warning(f"Failed to attach temporal block {label}: {e}") 1544 1545 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}") 1546 return True, attached_labels 1547 1548 except Exception as e: 1549 logger.error(f"Error attaching temporal blocks: {e}") 1550 return False, [] 1551 1552 1553def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: list = None) -> bool: 1554 """ 1555 Detach temporal journal blocks from the agent after synthesis. 1556 1557 Args: 1558 client: Letta client 1559 agent_id: Agent ID 1560 labels_to_detach: Optional list of specific labels to detach. 1561 If None, detaches all temporal blocks. 1562 1563 Returns: 1564 bool: Success status 1565 """ 1566 try: 1567 # If no specific labels provided, generate today's labels 1568 if labels_to_detach is None: 1569 today = date.today() 1570 labels_to_detach = [ 1571 f"void_day_{today.strftime('%Y_%m_%d')}", 1572 f"void_month_{today.strftime('%Y_%m')}", 1573 f"void_year_{today.year}" 1574 ] 1575 1576 # Get current blocks and build label to ID mapping 1577 current_blocks = client.agents.blocks.list(agent_id=agent_id) 1578 block_label_to_id = {block.label: str(block.id) for block in current_blocks} 1579 1580 detached_count = 0 1581 for label in labels_to_detach: 1582 if label in block_label_to_id: 1583 try: 1584 client.agents.blocks.detach( 1585 agent_id=agent_id, 1586 block_id=block_label_to_id[label] 1587 ) 1588 detached_count += 1 1589 logger.debug(f"Detached temporal block: {label}") 1590 except Exception as e: 1591 logger.warning(f"Failed to detach temporal block {label}: {e}") 1592 else: 1593 logger.debug(f"Temporal block not attached: {label}") 1594 1595 logger.info(f"Detached {detached_count} temporal blocks") 1596 return True 1597 1598 except Exception as e: 1599 logger.error(f"Error detaching temporal blocks: {e}") 1600 return False 1601 1602 1603def main(): 1604 # Parse command line arguments 1605 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent') 1606 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1607 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1608 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') 1609 # --rich option removed as we now use simple text formatting 1610 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') 1611 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 1612 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') 1613 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') 1614 args = parser.parse_args() 1615 1616 # Configure logging based on command line arguments 1617 if args.simple_logs: 1618 log_format = "void - %(levelname)s - %(message)s" 1619 else: 1620 # Create custom formatter with symbols 1621 class SymbolFormatter(logging.Formatter): 1622 """Custom formatter that adds symbols for different log levels""" 1623 1624 SYMBOLS = { 1625 logging.DEBUG: '', 1626 logging.INFO: '', 1627 logging.WARNING: '', 1628 logging.ERROR: '', 1629 logging.CRITICAL: '' 1630 } 1631 1632 def format(self, record): 1633 # Get the symbol for this log level 1634 symbol = self.SYMBOLS.get(record.levelno, '') 1635 1636 # Format time as HH:MM:SS 1637 timestamp = self.formatTime(record, "%H:%M:%S") 1638 1639 # Build the formatted message 1640 level_name = f"{record.levelname:<5}" # Left-align, 5 chars 1641 1642 # Use vertical bar as separator 1643 parts = [symbol, timestamp, '', level_name, '', record.getMessage()] 1644 1645 return ' '.join(parts) 1646 1647 # Reset logging configuration 1648 for handler in logging.root.handlers[:]: 1649 logging.root.removeHandler(handler) 1650 1651 # Create handler with custom formatter 1652 handler = logging.StreamHandler() 1653 if not args.simple_logs: 1654 handler.setFormatter(SymbolFormatter()) 1655 else: 1656 handler.setFormatter(logging.Formatter(log_format)) 1657 1658 # Configure root logger 1659 logging.root.setLevel(logging.INFO) 1660 logging.root.addHandler(handler) 1661 1662 global logger, prompt_logger, console 1663 logger = logging.getLogger("void_bot") 1664 logger.setLevel(logging.INFO) 1665 1666 # Create a separate logger for prompts (set to WARNING to hide by default) 1667 prompt_logger = logging.getLogger("void_bot.prompts") 1668 if args.reasoning: 1669 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1670 else: 1671 prompt_logger.setLevel(logging.WARNING) # Hide by default 1672 1673 # Disable httpx logging completely 1674 logging.getLogger("httpx").setLevel(logging.CRITICAL) 1675 1676 # Create Rich console for pretty printing 1677 # Console no longer used - simple text formatting 1678 1679 global TESTING_MODE, SKIP_GIT, SHOW_REASONING 1680 TESTING_MODE = args.test 1681 1682 # Store no-git flag globally for use in export_agent_state calls 1683 SKIP_GIT = args.no_git 1684 1685 # Store rich flag globally 1686 # Rich formatting no longer used 1687 1688 # Store reasoning flag globally 1689 SHOW_REASONING = args.reasoning 1690 1691 if TESTING_MODE: 1692 logger.info("=== RUNNING IN TESTING MODE ===") 1693 logger.info(" - No messages will be sent to Bluesky") 1694 logger.info(" - Queue files will not be deleted") 1695 logger.info(" - Notifications will not be marked as seen") 1696 print("\n") 1697 1698 # Check for synthesis-only mode 1699 SYNTHESIS_ONLY = args.synthesis_only 1700 if SYNTHESIS_ONLY: 1701 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===") 1702 logger.info(" - Only synthesis messages will be sent") 1703 logger.info(" - No notification processing") 1704 logger.info(" - No Bluesky client needed") 1705 print("\n") 1706 """Main bot loop that continuously monitors for notifications.""" 1707 global start_time 1708 start_time = time.time() 1709 logger.info("=== STARTING VOID BOT ===") 1710 void_agent = initialize_void() 1711 logger.info(f"Void agent initialized: {void_agent.id}") 1712 1713 # Ensure correct tools are attached for Bluesky 1714 logger.info("Configuring tools for Bluesky platform...") 1715 try: 1716 from tool_manager import ensure_platform_tools 1717 ensure_platform_tools('bluesky', void_agent.id) 1718 except Exception as e: 1719 logger.error(f"Failed to configure platform tools: {e}") 1720 logger.warning("Continuing with existing tool configuration") 1721 1722 # Check if agent has required tools 1723 if hasattr(void_agent, 'tools') and void_agent.tools: 1724 tool_names = [tool.name for tool in void_agent.tools] 1725 # Check for bluesky-related tools 1726 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1727 if not bluesky_tools: 1728 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 1729 else: 1730 logger.warning("Agent has no tools registered!") 1731 1732 # Clean up all user blocks at startup 1733 logger.info("🧹 Cleaning up user blocks at startup...") 1734 periodic_user_block_cleanup(CLIENT, void_agent.id) 1735 1736 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts) 1737 if not SYNTHESIS_ONLY: 1738 atproto_client = bsky_utils.default_login() 1739 logger.info("Connected to Bluesky") 1740 else: 1741 # In synthesis-only mode, still connect for acks and posts (unless in test mode) 1742 if not args.test: 1743 atproto_client = bsky_utils.default_login() 1744 logger.info("Connected to Bluesky (for synthesis acks/posts)") 1745 else: 1746 atproto_client = None 1747 logger.info("Skipping Bluesky connection (test mode)") 1748 1749 # Configure intervals 1750 CLEANUP_INTERVAL = args.cleanup_interval 1751 SYNTHESIS_INTERVAL = args.synthesis_interval 1752 1753 # Synthesis-only mode 1754 if SYNTHESIS_ONLY: 1755 if SYNTHESIS_INTERVAL <= 0: 1756 logger.error("Synthesis-only mode requires --synthesis-interval > 0") 1757 return 1758 1759 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 1760 1761 while True: 1762 try: 1763 # Send synthesis message immediately on first run 1764 logger.info("🧠 Sending synthesis message") 1765 send_synthesis_message(CLIENT, void_agent.id, atproto_client) 1766 1767 # Wait for next interval 1768 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...") 1769 sleep(SYNTHESIS_INTERVAL) 1770 1771 except KeyboardInterrupt: 1772 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===") 1773 break 1774 except Exception as e: 1775 logger.error(f"Error in synthesis loop: {e}") 1776 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...") 1777 sleep(SYNTHESIS_INTERVAL) 1778 1779 # Normal mode with notification processing 1780 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 1781 1782 cycle_count = 0 1783 1784 if CLEANUP_INTERVAL > 0: 1785 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles") 1786 else: 1787 logger.info("User block cleanup disabled") 1788 1789 if SYNTHESIS_INTERVAL > 0: 1790 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 1791 else: 1792 logger.info("Synthesis messages disabled") 1793 1794 while True: 1795 try: 1796 cycle_count += 1 1797 process_notifications(void_agent, atproto_client, TESTING_MODE) 1798 1799 # Check if synthesis interval has passed 1800 if SYNTHESIS_INTERVAL > 0: 1801 current_time = time.time() 1802 global last_synthesis_time 1803 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL: 1804 logger.info(f"{SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis") 1805 send_synthesis_message(CLIENT, void_agent.id, atproto_client) 1806 last_synthesis_time = current_time 1807 1808 # Run periodic cleanup every N cycles 1809 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 1810 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 1811 periodic_user_block_cleanup(CLIENT, void_agent.id) 1812 1813 # Log cycle completion with stats 1814 elapsed_time = time.time() - start_time 1815 total_messages = sum(message_counters.values()) 1816 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1817 1818 if total_messages > 0: 1819 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 1820 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 1821 1822 except KeyboardInterrupt: 1823 # Final stats 1824 elapsed_time = time.time() - start_time 1825 total_messages = sum(message_counters.values()) 1826 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1827 1828 logger.info("=== BOT STOPPED BY USER ===") 1829 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 1830 logger.info(f" - {message_counters['mentions']} mentions") 1831 logger.info(f" - {message_counters['replies']} replies") 1832 logger.info(f" - {message_counters['follows']} follows") 1833 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 1834 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 1835 break 1836 except Exception as e: 1837 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 1838 logger.error(f"Error details: {e}") 1839 # Wait a bit longer on errors 1840 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 1841 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 1842 1843 1844if __name__ == "__main__": 1845 main()