a digital person for bluesky
at x 95 kB view raw
1# Rich imports removed - using simple text formatting 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14import argparse 15 16from utils import ( 17 upsert_block, 18 upsert_agent 19) 20 21import bsky_utils 22from tools.blocks import attach_user_blocks, detach_user_blocks 23from datetime import date 24from temporal_blocks import ( 25 attach_temporal_blocks, 26 detach_temporal_blocks, 27 update_temporal_blocks_after_synthesis 28) 29 30def extract_handles_from_data(data): 31 """Recursively extract all unique handles from nested data structure.""" 32 handles = set() 33 34 def _extract_recursive(obj): 35 if isinstance(obj, dict): 36 # Check if this dict has a 'handle' key 37 if 'handle' in obj: 38 handles.add(obj['handle']) 39 # Recursively check all values 40 for value in obj.values(): 41 _extract_recursive(value) 42 elif isinstance(obj, list): 43 # Recursively check all list items 44 for item in obj: 45 _extract_recursive(item) 46 47 _extract_recursive(data) 48 return list(handles) 49 50# Logging will be configured after argument parsing 51logger = None 52prompt_logger = None 53# Simple text formatting (Rich no longer used) 54SHOW_REASONING = False 55last_archival_query = "archival memory search" 56 57def log_with_panel(message, title=None, border_color="white"): 58 """Log a message with Unicode box-drawing characters""" 59 if title: 60 # Map old color names to appropriate symbols 61 symbol_map = { 62 "blue": "", # Tool calls 63 "green": "", # Success/completion 64 "yellow": "", # Reasoning 65 "red": "", # Errors 66 "white": "", # Default/mentions 67 "cyan": "", # Posts 68 } 69 symbol = symbol_map.get(border_color, "") 70 71 print(f"\n{symbol} {title}") 72 print(f" {'' * len(title)}") 73 # Indent message lines 74 for line in message.split('\n'): 75 print(f" {line}") 76 else: 77 print(message) 78 79 80# Global variables for Letta client and config 81# These will be initialized in main() with the proper config file 82CLIENT = None 83PROJECT_ID = None 84 85# Notification check delay 86FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response 87 88# Check for new notifications every N queue items 89CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing 90 91# Queue directory - will be initialized with agent ID later 92QUEUE_DIR = None 93QUEUE_ERROR_DIR = None 94QUEUE_NO_REPLY_DIR = None 95PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json") 96 97# Maximum number of processed notifications to track 98MAX_PROCESSED_NOTIFICATIONS = 10000 99 100# Message tracking counters 101message_counters = defaultdict(int) 102start_time = time.time() 103 104# Testing mode flag 105TESTING_MODE = False 106 107# Skip git operations flag 108SKIP_GIT = False 109 110# Synthesis message tracking 111last_synthesis_time = time.time() 112 113def export_agent_state(client, agent, skip_git=False): 114 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 115 try: 116 # Confirm export with user unless git is being skipped 117 if not skip_git: 118 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 119 if response not in ['y', 'yes']: 120 logger.info("Agent export cancelled by user.") 121 return 122 else: 123 logger.info("Exporting agent state (git staging disabled)") 124 125 # Create directories if they don't exist 126 os.makedirs("agent_archive", exist_ok=True) 127 os.makedirs("agents", exist_ok=True) 128 129 # Export agent data 130 logger.info(f"Exporting agent {agent.id}. This takes some time...") 131 agent_data = client.agents.export_file(agent_id=agent.id) 132 133 # Save timestamped archive copy 134 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 135 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 136 with open(archive_file, 'w', encoding='utf-8') as f: 137 json.dump(agent_data, f, indent=2, ensure_ascii=False) 138 139 # Save current agent state 140 current_file = os.path.join("agents", "void.af") 141 with open(current_file, 'w', encoding='utf-8') as f: 142 json.dump(agent_data, f, indent=2, ensure_ascii=False) 143 144 logger.info(f"Agent exported to {archive_file} and {current_file}") 145 146 # Git add only the current agent file (archive is ignored) unless skip_git is True 147 if not skip_git: 148 try: 149 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 150 logger.info("Added current agent file to git staging") 151 except subprocess.CalledProcessError as e: 152 logger.warning(f"Failed to git add agent file: {e}") 153 154 except Exception as e: 155 logger.error(f"Failed to export agent: {e}") 156 157def initialize_void(config_path="config.yaml"): 158 logger.info("Starting agent initialization...") 159 160 # Get the configured agent by ID 161 logger.info("Loading agent from config...") 162 from config_loader import get_letta_config 163 letta_config = get_letta_config(config_path) 164 agent_id = letta_config['agent_id'] 165 166 try: 167 agent = CLIENT.agents.retrieve(agent_id=agent_id) 168 logger.info(f"Successfully loaded agent: {agent.name} ({agent_id})") 169 except Exception as e: 170 logger.error(f"Failed to load agent {agent_id}: {e}") 171 logger.error("Please ensure the agent_id in config.yaml is correct") 172 raise e 173 174 # Initialize agent-specific queue directories 175 global QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR 176 QUEUE_DIR = Path(f"queue/{agent_id}") 177 QUEUE_DIR.mkdir(exist_ok=True, parents=True) 178 QUEUE_ERROR_DIR = QUEUE_DIR / "errors" 179 QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 180 QUEUE_NO_REPLY_DIR = QUEUE_DIR / "no_reply" 181 QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) 182 logger.info(f"Initialized queue directories for agent {agent_id}") 183 184 # Export agent state 185 logger.info("Exporting agent state...") 186 export_agent_state(CLIENT, agent, skip_git=SKIP_GIT) 187 188 # Log agent details 189 logger.info(f"Agent details - ID: {agent.id}") 190 logger.info(f"Agent name: {agent.name}") 191 if hasattr(agent, 'llm_config'): 192 logger.info(f"Agent model: {agent.llm_config.model}") 193 logger.info(f"Agent project_id: {agent.project_id}") 194 if hasattr(agent, 'tools'): 195 logger.info(f"Agent has {len(agent.tools)} tools") 196 for tool in agent.tools[:3]: # Show first 3 tools 197 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 198 199 # Migrate old queue files if they exist 200 migrate_old_queue_files(agent_id) 201 202 return agent 203 204 205def migrate_old_queue_files(agent_id): 206 """Migrate queue files from old flat structure to agent-scoped directories.""" 207 old_queue_dir = Path("queue") 208 209 if not old_queue_dir.exists(): 210 return 211 212 # Count old queue files (excluding subdirectories and special files) 213 old_files = [f for f in old_queue_dir.glob("*.json") 214 if f.name != "processed_notifications.json"] 215 216 if not old_files: 217 return 218 219 logger.info(f"Found {len(old_files)} queue files in old location, migrating to agent-specific directory...") 220 221 migrated_count = 0 222 for old_file in old_files: 223 try: 224 # Read the notification to check if it's for this agent 225 with open(old_file, 'r') as f: 226 notif_data = json.load(f) 227 228 # Move to new agent-specific queue directory 229 new_file = QUEUE_DIR / old_file.name 230 old_file.rename(new_file) 231 migrated_count += 1 232 logger.debug(f"Migrated {old_file.name} to {new_file}") 233 234 except Exception as e: 235 logger.warning(f"Failed to migrate {old_file.name}: {e}") 236 237 if migrated_count > 0: 238 logger.info(f"Successfully migrated {migrated_count} queue files to agent-specific directory") 239 240 # Also check and migrate files from old errors and no_reply directories 241 for subdir in ["errors", "no_reply"]: 242 old_subdir = old_queue_dir / subdir 243 if old_subdir.exists(): 244 old_subdir_files = list(old_subdir.glob("*.json")) 245 if old_subdir_files: 246 new_subdir = QUEUE_DIR / subdir 247 new_subdir.mkdir(exist_ok=True, parents=True) 248 249 for old_file in old_subdir_files: 250 try: 251 new_file = new_subdir / old_file.name 252 old_file.rename(new_file) 253 logger.debug(f"Migrated {subdir}/{old_file.name}") 254 except Exception as e: 255 logger.warning(f"Failed to migrate {subdir}/{old_file.name}: {e}") 256 257 258def process_mention(agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): 259 """Process a mention and generate a reply using the Letta agent. 260 261 Args: 262 agent: The Letta agent instance 263 atproto_client: The AT Protocol client 264 notification_data: The notification data dictionary 265 queue_filepath: Optional Path object to the queue file (for cleanup on halt) 266 267 Returns: 268 True: Successfully processed, remove from queue 269 False: Failed but retryable, keep in queue 270 None: Failed with non-retryable error, move to errors directory 271 "no_reply": No reply was generated, move to no_reply directory 272 """ 273 try: 274 logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}") 275 276 # Handle both dict and object inputs for backwards compatibility 277 if isinstance(notification_data, dict): 278 uri = notification_data['uri'] 279 mention_text = notification_data.get('record', {}).get('text', '') 280 author_handle = notification_data['author']['handle'] 281 author_name = notification_data['author'].get('display_name') or author_handle 282 else: 283 # Legacy object access 284 uri = notification_data.uri 285 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 286 author_handle = notification_data.author.handle 287 author_name = notification_data.author.display_name or author_handle 288 289 logger.debug(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...") 290 291 # Retrieve the entire thread associated with the mention 292 try: 293 thread = atproto_client.app.bsky.feed.get_post_thread({ 294 'uri': uri, 295 'parent_height': 40, 296 'depth': 10 297 }) 298 except Exception as e: 299 error_str = str(e) 300 # Check if this is a NotFound error 301 if 'NotFound' in error_str or 'Post not found' in error_str: 302 logger.warning(f"Post not found for URI {uri}, removing from queue") 303 return True # Return True to remove from queue 304 else: 305 # Re-raise other errors 306 logger.error(f"Error fetching thread: {e}") 307 raise 308 309 # Get thread context as YAML string 310 logger.debug("Converting thread to YAML string") 311 try: 312 thread_context = thread_to_yaml_string(thread) 313 logger.debug(f"Thread context generated, length: {len(thread_context)} characters") 314 315 # Check if #voidstop appears anywhere in the thread 316 if "#voidstop" in thread_context.lower(): 317 logger.info("Found #voidstop in thread context, skipping this mention") 318 return True # Return True to remove from queue 319 320 # Also check the mention text directly 321 if "#voidstop" in mention_text.lower(): 322 logger.info("Found #voidstop in mention text, skipping this mention") 323 return True # Return True to remove from queue 324 325 # Create a more informative preview by extracting meaningful content 326 lines = thread_context.split('\n') 327 meaningful_lines = [] 328 329 for line in lines: 330 stripped = line.strip() 331 if not stripped: 332 continue 333 334 # Look for lines with actual content (not just structure) 335 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 336 meaningful_lines.append(line) 337 if len(meaningful_lines) >= 5: 338 break 339 340 if meaningful_lines: 341 preview = '\n'.join(meaningful_lines) 342 logger.debug(f"Thread content preview:\n{preview}") 343 else: 344 # If no content fields found, just show it's a thread structure 345 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 346 except Exception as yaml_error: 347 import traceback 348 logger.error(f"Error converting thread to YAML: {yaml_error}") 349 logger.error(f"Full traceback:\n{traceback.format_exc()}") 350 logger.error(f"Thread type: {type(thread)}") 351 if hasattr(thread, '__dict__'): 352 logger.error(f"Thread attributes: {thread.__dict__}") 353 # Try to continue with a simple context 354 thread_context = f"Error processing thread context: {str(yaml_error)}" 355 356 # Create a prompt for the Letta agent with thread context 357 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 358 359MOST RECENT POST (the mention you're responding to): 360"{mention_text}" 361 362FULL THREAD CONTEXT: 363```yaml 364{thread_context} 365``` 366 367The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 368 369To reply, use the add_post_to_bluesky_reply_thread tool: 370- Each call creates one post (max 300 characters) 371- For most responses, a single call is sufficient 372- Only use multiple calls for threaded replies when: 373 * The topic requires extended explanation that cannot fit in 300 characters 374 * You're explicitly asked for a detailed/long response 375 * The conversation naturally benefits from a structured multi-part answer 376- Avoid unnecessary threads - be concise when possible""" 377 378 # Extract all handles from notification and thread data 379 all_handles = set() 380 all_handles.update(extract_handles_from_data(notification_data)) 381 all_handles.update(extract_handles_from_data(thread.model_dump())) 382 unique_handles = list(all_handles) 383 384 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 385 386 # Check if any handles are in known_bots list 387 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs 388 import json 389 390 try: 391 # Check for known bots in thread 392 bot_check_result = check_known_bots(unique_handles, agent) 393 bot_check_data = json.loads(bot_check_result) 394 395 if bot_check_data.get("bot_detected", False): 396 detected_bots = bot_check_data.get("detected_bots", []) 397 logger.info(f"Bot detected in thread: {detected_bots}") 398 399 # Decide whether to respond (10% chance) 400 if not should_respond_to_bot_thread(): 401 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}") 402 # Return False to keep in queue for potential later processing 403 return False 404 else: 405 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}") 406 else: 407 logger.debug("No known bots detected in thread") 408 409 except Exception as bot_check_error: 410 logger.warning(f"Error checking for bots: {bot_check_error}") 411 # Continue processing if bot check fails 412 413 # Attach user blocks before agent call 414 attached_handles = [] 415 if unique_handles: 416 try: 417 logger.debug(f"Attaching user blocks for handles: {unique_handles}") 418 attach_result = attach_user_blocks(unique_handles, agent) 419 attached_handles = unique_handles # Track successfully attached handles 420 logger.debug(f"Attach result: {attach_result}") 421 except Exception as attach_error: 422 logger.warning(f"Failed to attach user blocks: {attach_error}") 423 # Continue without user blocks rather than failing completely 424 425 # Get response from Letta agent 426 # Format with Unicode characters 427 title = f"MENTION FROM @{author_handle}" 428 print(f"\n{title}") 429 print(f" {'' * len(title)}") 430 # Indent the mention text 431 for line in mention_text.split('\n'): 432 print(f" {line}") 433 434 # Log prompt details to separate logger 435 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 436 437 # Log concise prompt info to main logger 438 thread_handles_count = len(unique_handles) 439 prompt_char_count = len(prompt) 440 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars") 441 442 try: 443 # Use streaming to avoid 524 timeout errors 444 message_stream = CLIENT.agents.messages.create_stream( 445 agent_id=agent.id, 446 messages=[{"role": "user", "content": prompt}], 447 stream_tokens=False, # Step streaming only (faster than token streaming) 448 max_steps=100 449 ) 450 451 # Collect the streaming response 452 all_messages = [] 453 for chunk in message_stream: 454 # Log condensed chunk info 455 if hasattr(chunk, 'message_type'): 456 if chunk.message_type == 'reasoning_message': 457 # Show full reasoning without truncation 458 if SHOW_REASONING: 459 # Format with Unicode characters 460 print("\n◆ Reasoning") 461 print(" ─────────") 462 # Indent reasoning lines 463 for line in chunk.reasoning.split('\n'): 464 print(f" {line}") 465 else: 466 # Default log format (only when --reasoning is used due to log level) 467 # Format with Unicode characters 468 print("\n◆ Reasoning") 469 print(" ─────────") 470 # Indent reasoning lines 471 for line in chunk.reasoning.split('\n'): 472 print(f" {line}") 473 474 # Create ATProto record for reasoning (unless in testing mode) 475 if not testing_mode and hasattr(chunk, 'reasoning'): 476 try: 477 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 478 except Exception as e: 479 logger.debug(f"Failed to create reasoning record: {e}") 480 elif chunk.message_type == 'tool_call_message': 481 # Parse tool arguments for better display 482 tool_name = chunk.tool_call.name 483 484 # Create ATProto record for tool call (unless in testing mode) 485 if not testing_mode: 486 try: 487 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 488 bsky_utils.create_tool_call_record( 489 atproto_client, 490 tool_name, 491 chunk.tool_call.arguments, 492 tool_call_id 493 ) 494 except Exception as e: 495 logger.debug(f"Failed to create tool call record: {e}") 496 497 try: 498 args = json.loads(chunk.tool_call.arguments) 499 # Format based on tool type 500 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: 501 # Extract the text being posted 502 text = args.get('text', '') 503 if text: 504 # Format with Unicode characters 505 print("\n✎ Bluesky Post") 506 print(" ────────────") 507 # Indent post text 508 for line in text.split('\n'): 509 print(f" {line}") 510 else: 511 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 512 elif tool_name == 'archival_memory_search': 513 query = args.get('query', 'unknown') 514 global last_archival_query 515 last_archival_query = query 516 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 517 elif tool_name == 'archival_memory_insert': 518 content = args.get('content', '') 519 # Show the full content being inserted 520 log_with_panel(content, f"Tool call: {tool_name}", "blue") 521 elif tool_name == 'update_block': 522 label = args.get('label', 'unknown') 523 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) 524 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 525 else: 526 # Generic display for other tools 527 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 528 if len(args_str) > 150: 529 args_str = args_str[:150] + "..." 530 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 531 except: 532 # Fallback to original format if parsing fails 533 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 534 elif chunk.message_type == 'tool_return_message': 535 # Enhanced tool result logging 536 tool_name = chunk.name 537 status = chunk.status 538 539 if status == 'success': 540 # Try to show meaningful result info based on tool type 541 if hasattr(chunk, 'tool_return') and chunk.tool_return: 542 result_str = str(chunk.tool_return) 543 if tool_name == 'archival_memory_search': 544 545 try: 546 # Handle both string and list formats 547 if isinstance(chunk.tool_return, str): 548 # The string format is: "([{...}, {...}], count)" 549 # We need to extract just the list part 550 if chunk.tool_return.strip(): 551 # Find the list part between the first [ and last ] 552 start_idx = chunk.tool_return.find('[') 553 end_idx = chunk.tool_return.rfind(']') 554 if start_idx != -1 and end_idx != -1: 555 list_str = chunk.tool_return[start_idx:end_idx+1] 556 # Use ast.literal_eval since this is Python literal syntax, not JSON 557 import ast 558 results = ast.literal_eval(list_str) 559 else: 560 logger.warning("Could not find list in archival_memory_search result") 561 results = [] 562 else: 563 logger.warning("Empty string returned from archival_memory_search") 564 results = [] 565 else: 566 # If it's already a list, use directly 567 results = chunk.tool_return 568 569 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name}", "green") 570 571 # Use the captured search query from the tool call 572 search_query = last_archival_query 573 574 # Combine all results into a single text block 575 content_text = "" 576 for i, entry in enumerate(results, 1): 577 timestamp = entry.get('timestamp', 'N/A') 578 content = entry.get('content', '') 579 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" 580 581 # Format with Unicode characters 582 title = f"{search_query} ({len(results)} results)" 583 print(f"\n{title}") 584 print(f" {'' * len(title)}") 585 # Indent content text 586 for line in content_text.strip().split('\n'): 587 print(f" {line}") 588 589 except Exception as e: 590 logger.error(f"Error formatting archival memory results: {e}") 591 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name}", "green") 592 elif tool_name == 'add_post_to_bluesky_reply_thread': 593 # Just show success for bluesky posts, the text was already shown in tool call 594 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") 595 elif tool_name == 'archival_memory_insert': 596 # Skip archival memory insert results (always returns None) 597 pass 598 elif tool_name == 'update_block': 599 log_with_panel("Memory block updated", f"Tool result: {tool_name}", "green") 600 else: 601 # Generic success with preview 602 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str 603 log_with_panel(preview, f"Tool result: {tool_name}", "green") 604 else: 605 log_with_panel("Success", f"Tool result: {tool_name}", "green") 606 elif status == 'error': 607 # Show error details 608 if tool_name == 'add_post_to_bluesky_reply_thread': 609 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" 610 log_with_panel(error_str, f"Bluesky Post ✗", "red") 611 elif tool_name == 'archival_memory_insert': 612 # Skip archival memory insert errors too 613 pass 614 else: 615 error_preview = "" 616 if hasattr(chunk, 'tool_return') and chunk.tool_return: 617 error_str = str(chunk.tool_return) 618 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str 619 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name}", "red") 620 else: 621 log_with_panel("Error occurred", f"Tool result: {tool_name}", "red") 622 else: 623 logger.info(f"Tool result: {tool_name} - {status}") 624 elif chunk.message_type == 'assistant_message': 625 # Format with Unicode characters 626 print("\n▶ Assistant Response") 627 print(" ──────────────────") 628 # Indent response text 629 for line in chunk.content.split('\n'): 630 print(f" {line}") 631 else: 632 # Filter out verbose message types 633 if chunk.message_type not in ['usage_statistics', 'stop_reason']: 634 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") 635 else: 636 logger.info(f"📦 Stream status: {chunk}") 637 638 # Log full chunk for debugging 639 logger.debug(f"Full streaming chunk: {chunk}") 640 all_messages.append(chunk) 641 if str(chunk) == 'done': 642 break 643 644 # Convert streaming response to standard format for compatibility 645 message_response = type('StreamingResponse', (), { 646 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] 647 })() 648 except Exception as api_error: 649 import traceback 650 error_str = str(api_error) 651 logger.error(f"Letta API error: {api_error}") 652 logger.error(f"Error type: {type(api_error).__name__}") 653 logger.error(f"Full traceback:\n{traceback.format_exc()}") 654 logger.error(f"Mention text was: {mention_text}") 655 logger.error(f"Author: @{author_handle}") 656 logger.error(f"URI: {uri}") 657 658 659 # Try to extract more info from different error types 660 if hasattr(api_error, 'response'): 661 logger.error(f"Error response object exists") 662 if hasattr(api_error.response, 'text'): 663 logger.error(f"Response text: {api_error.response.text}") 664 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 665 try: 666 logger.error(f"Response JSON: {api_error.response.json()}") 667 except: 668 pass 669 670 # Check for specific error types 671 if hasattr(api_error, 'status_code'): 672 logger.error(f"API Status code: {api_error.status_code}") 673 if hasattr(api_error, 'body'): 674 logger.error(f"API Response body: {api_error.body}") 675 if hasattr(api_error, 'headers'): 676 logger.error(f"API Response headers: {api_error.headers}") 677 678 if api_error.status_code == 413: 679 logger.error("413 Payload Too Large - moving to errors directory") 680 return None # Move to errors directory - payload is too large to ever succeed 681 elif api_error.status_code == 524: 682 logger.error("524 error - timeout from Cloudflare, will retry later") 683 return False # Keep in queue for retry 684 685 # Check if error indicates we should remove from queue 686 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 687 logger.warning("Payload too large error, moving to errors directory") 688 return None # Move to errors directory - cannot be fixed by retry 689 elif 'status_code: 524' in error_str: 690 logger.warning("524 timeout error, keeping in queue for retry") 691 return False # Keep in queue for retry 692 693 raise 694 695 # Log successful response 696 logger.debug("Successfully received response from Letta API") 697 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 698 699 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response 700 reply_candidates = [] 701 tool_call_results = {} # Map tool_call_id to status 702 ack_note = None # Track any note from annotate_ack tool 703 704 logger.debug(f"Processing {len(message_response.messages)} response messages...") 705 706 # First pass: collect tool return statuses 707 ignored_notification = False 708 ignore_reason = "" 709 ignore_category = "" 710 711 for message in message_response.messages: 712 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): 713 if message.name == 'add_post_to_bluesky_reply_thread': 714 tool_call_results[message.tool_call_id] = message.status 715 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") 716 elif message.name == 'ignore_notification': 717 # Check if the tool was successful 718 if hasattr(message, 'tool_return') and message.status == 'success': 719 # Parse the return value to extract category and reason 720 result_str = str(message.tool_return) 721 if 'IGNORED_NOTIFICATION::' in result_str: 722 parts = result_str.split('::') 723 if len(parts) >= 3: 724 ignore_category = parts[1] 725 ignore_reason = parts[2] 726 ignored_notification = True 727 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") 728 elif message.name == 'bluesky_reply': 729 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 730 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 731 logger.error("Update the agent's tools using register_tools.py") 732 # Export agent state before terminating 733 export_agent_state(CLIENT, agent, skip_git=SKIP_GIT) 734 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 735 exit(1) 736 737 # Second pass: process messages and check for successful tool calls 738 for i, message in enumerate(message_response.messages, 1): 739 # Log concise message info instead of full object 740 msg_type = getattr(message, 'message_type', 'unknown') 741 if hasattr(message, 'reasoning') and message.reasoning: 742 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 743 elif hasattr(message, 'tool_call') and message.tool_call: 744 tool_name = message.tool_call.name 745 logger.debug(f" {i}. {msg_type}: {tool_name}") 746 elif hasattr(message, 'tool_return'): 747 tool_name = getattr(message, 'name', 'unknown_tool') 748 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 749 status = getattr(message, 'status', 'unknown') 750 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") 751 elif hasattr(message, 'text'): 752 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 753 else: 754 logger.debug(f" {i}. {msg_type}: <no content>") 755 756 # Check for halt_activity tool call 757 if hasattr(message, 'tool_call') and message.tool_call: 758 if message.tool_call.name == 'halt_activity': 759 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") 760 try: 761 args = json.loads(message.tool_call.arguments) 762 reason = args.get('reason', 'Agent requested halt') 763 logger.info(f"Halt reason: {reason}") 764 except: 765 logger.info("Halt reason: <unable to parse>") 766 767 # Delete the queue file before terminating 768 if queue_filepath and queue_filepath.exists(): 769 queue_filepath.unlink() 770 logger.info(f"Deleted queue file: {queue_filepath.name}") 771 772 # Also mark as processed to avoid reprocessing 773 processed_uris = load_processed_notifications() 774 processed_uris.add(notification_data.get('uri', '')) 775 save_processed_notifications(processed_uris) 776 777 # Export agent state before terminating 778 export_agent_state(CLIENT, agent, skip_git=SKIP_GIT) 779 780 # Exit the program 781 logger.info("=== BOT TERMINATED BY AGENT ===") 782 exit(0) 783 784 # Check for deprecated bluesky_reply tool 785 if hasattr(message, 'tool_call') and message.tool_call: 786 if message.tool_call.name == 'bluesky_reply': 787 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") 788 logger.error("Please use add_post_to_bluesky_reply_thread instead.") 789 logger.error("Update the agent's tools using register_tools.py") 790 # Export agent state before terminating 791 export_agent_state(CLIENT, agent, skip_git=SKIP_GIT) 792 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") 793 exit(1) 794 795 # Collect annotate_ack tool calls 796 elif message.tool_call.name == 'annotate_ack': 797 try: 798 args = json.loads(message.tool_call.arguments) 799 note = args.get('note', '') 800 if note: 801 ack_note = note 802 logger.debug(f"Found annotate_ack with note: {note[:50]}...") 803 except json.JSONDecodeError as e: 804 logger.error(f"Failed to parse annotate_ack arguments: {e}") 805 806 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful 807 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': 808 tool_call_id = message.tool_call.tool_call_id 809 tool_status = tool_call_results.get(tool_call_id, 'unknown') 810 811 if tool_status == 'success': 812 try: 813 args = json.loads(message.tool_call.arguments) 814 reply_text = args.get('text', '') 815 reply_lang = args.get('lang', 'en-US') 816 817 if reply_text: # Only add if there's actual content 818 reply_candidates.append((reply_text, reply_lang)) 819 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") 820 except json.JSONDecodeError as e: 821 logger.error(f"Failed to parse tool call arguments: {e}") 822 elif tool_status == 'error': 823 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") 824 else: 825 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") 826 827 # Check for conflicting tool calls 828 if reply_candidates and ignored_notification: 829 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") 830 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") 831 logger.warning("Item will be left in queue for manual review") 832 # Return False to keep in queue 833 return False 834 835 if reply_candidates: 836 # Aggregate reply posts into a thread 837 reply_messages = [] 838 reply_langs = [] 839 for text, lang in reply_candidates: 840 reply_messages.append(text) 841 reply_langs.append(lang) 842 843 # Use the first language for the entire thread (could be enhanced later) 844 reply_lang = reply_langs[0] if reply_langs else 'en-US' 845 846 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") 847 848 # Display the generated reply thread 849 if len(reply_messages) == 1: 850 content = reply_messages[0] 851 title = f"Reply to @{author_handle}" 852 else: 853 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) 854 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" 855 856 # Format with Unicode characters 857 print(f"\n{title}") 858 print(f" {'' * len(title)}") 859 # Indent content lines 860 for line in content.split('\n'): 861 print(f" {line}") 862 863 # Send the reply(s) with language (unless in testing mode) 864 if testing_mode: 865 logger.info("TESTING MODE: Skipping actual Bluesky post") 866 response = True # Simulate success 867 else: 868 if len(reply_messages) == 1: 869 # Single reply - use existing function 870 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) 871 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") 872 response = bsky_utils.reply_to_notification( 873 client=atproto_client, 874 notification=notification_data, 875 reply_text=cleaned_text, 876 lang=reply_lang 877 ) 878 else: 879 # Multiple replies - use new threaded function 880 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] 881 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") 882 response = bsky_utils.reply_with_thread_to_notification( 883 client=atproto_client, 884 notification=notification_data, 885 reply_messages=cleaned_messages, 886 lang=reply_lang 887 ) 888 889 if response: 890 logger.info(f"Successfully replied to @{author_handle}") 891 892 # Acknowledge the post we're replying to with stream.thought.ack 893 try: 894 post_uri = notification_data.get('uri') 895 post_cid = notification_data.get('cid') 896 897 if post_uri and post_cid: 898 ack_result = bsky_utils.acknowledge_post( 899 client=atproto_client, 900 post_uri=post_uri, 901 post_cid=post_cid, 902 note=ack_note 903 ) 904 if ack_result: 905 if ack_note: 906 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")") 907 else: 908 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack") 909 else: 910 logger.warning(f"Failed to acknowledge post from @{author_handle}") 911 else: 912 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}") 913 except Exception as e: 914 logger.error(f"Error acknowledging post from @{author_handle}: {e}") 915 # Don't fail the entire operation if acknowledgment fails 916 917 return True 918 else: 919 logger.error(f"Failed to send reply to @{author_handle}") 920 return False 921 else: 922 # Check if notification was explicitly ignored 923 if ignored_notification: 924 logger.info(f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})") 925 return "ignored" 926 else: 927 logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder") 928 return "no_reply" 929 930 except Exception as e: 931 logger.error(f"Error processing mention: {e}") 932 return False 933 finally: 934 # Detach user blocks after agent response (success or failure) 935 if 'attached_handles' in locals() and attached_handles: 936 try: 937 logger.info(f"Detaching user blocks for handles: {attached_handles}") 938 detach_result = detach_user_blocks(attached_handles, agent) 939 logger.debug(f"Detach result: {detach_result}") 940 except Exception as detach_error: 941 logger.warning(f"Failed to detach user blocks: {detach_error}") 942 943 944def notification_to_dict(notification): 945 """Convert a notification object to a dictionary for JSON serialization.""" 946 return { 947 'uri': notification.uri, 948 'cid': notification.cid, 949 'reason': notification.reason, 950 'is_read': notification.is_read, 951 'indexed_at': notification.indexed_at, 952 'author': { 953 'handle': notification.author.handle, 954 'display_name': notification.author.display_name, 955 'did': notification.author.did 956 }, 957 'record': { 958 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 959 } 960 } 961 962 963def load_processed_notifications(): 964 """Load the set of processed notification URIs.""" 965 if PROCESSED_NOTIFICATIONS_FILE.exists(): 966 try: 967 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f: 968 data = json.load(f) 969 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS) 970 if len(data) > MAX_PROCESSED_NOTIFICATIONS: 971 data = data[-MAX_PROCESSED_NOTIFICATIONS:] 972 save_processed_notifications(data) 973 return set(data) 974 except Exception as e: 975 logger.error(f"Error loading processed notifications: {e}") 976 return set() 977 978 979def save_processed_notifications(processed_set): 980 """Save the set of processed notification URIs.""" 981 try: 982 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f: 983 json.dump(list(processed_set), f) 984 except Exception as e: 985 logger.error(f"Error saving processed notifications: {e}") 986 987 988def save_notification_to_queue(notification, is_priority=None): 989 """Save a notification to the queue directory with priority-based filename.""" 990 try: 991 # Check if already processed 992 processed_uris = load_processed_notifications() 993 994 # Handle both notification objects and dicts 995 if isinstance(notification, dict): 996 notif_dict = notification 997 notification_uri = notification.get('uri') 998 else: 999 notif_dict = notification_to_dict(notification) 1000 notification_uri = notification.uri 1001 1002 if notification_uri in processed_uris: 1003 logger.debug(f"Notification already processed: {notification_uri}") 1004 return False 1005 1006 # Create JSON string 1007 notif_json = json.dumps(notif_dict, sort_keys=True) 1008 1009 # Generate hash for filename (to avoid duplicates) 1010 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 1011 1012 # Extract author handle 1013 if isinstance(notification, dict): 1014 author_handle = notification.get('author', {}).get('handle', '') 1015 else: 1016 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' 1017 1018 # Check if author is in blocks list 1019 blocks_file = Path('blocks.txt') 1020 if blocks_file.exists(): 1021 with open(blocks_file, 'r') as f: 1022 blocked_handles = [line.strip() for line in f if line.strip() and not line.strip().startswith('#')] 1023 if author_handle in blocked_handles: 1024 logger.info(f"Blocking notification from {author_handle} (in blocks.txt)") 1025 return False 1026 1027 # Determine priority based on author handle or explicit priority 1028 if is_priority is not None: 1029 priority_prefix = "0_" if is_priority else "1_" 1030 else: 1031 # Prioritize cameron.pfiffer.org responses 1032 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" 1033 1034 # Create filename with priority, timestamp and hash 1035 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 1036 reason = notif_dict.get('reason', 'unknown') 1037 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" 1038 filepath = QUEUE_DIR / filename 1039 1040 # Check if this notification URI is already in the queue 1041 for existing_file in QUEUE_DIR.glob("*.json"): 1042 if existing_file.name == "processed_notifications.json": 1043 continue 1044 try: 1045 with open(existing_file, 'r') as f: 1046 existing_data = json.load(f) 1047 if existing_data.get('uri') == notification_uri: 1048 logger.debug(f"Notification already queued (URI: {notification_uri})") 1049 return False 1050 except: 1051 continue 1052 1053 # Write to file 1054 with open(filepath, 'w') as f: 1055 json.dump(notif_dict, f, indent=2) 1056 1057 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" 1058 logger.info(f"Queued notification ({priority_label}): {filename}") 1059 return True 1060 1061 except Exception as e: 1062 logger.error(f"Error saving notification to queue: {e}") 1063 return False 1064 1065 1066def load_and_process_queued_notifications(agent, atproto_client, testing_mode=False): 1067 """Load and process all notifications from the queue in priority order.""" 1068 try: 1069 # Get all JSON files in queue directory (excluding processed_notifications.json) 1070 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) 1071 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1072 1073 # Filter out and delete like notifications immediately 1074 queue_files = [] 1075 likes_deleted = 0 1076 1077 for filepath in all_queue_files: 1078 try: 1079 with open(filepath, 'r') as f: 1080 notif_data = json.load(f) 1081 1082 # If it's a like, delete it immediately and don't process 1083 if notif_data.get('reason') == 'like': 1084 filepath.unlink() 1085 likes_deleted += 1 1086 logger.debug(f"Deleted like notification: {filepath.name}") 1087 else: 1088 queue_files.append(filepath) 1089 except Exception as e: 1090 logger.warning(f"Error checking notification file {filepath.name}: {e}") 1091 queue_files.append(filepath) # Keep it in case it's valid 1092 1093 if likes_deleted > 0: 1094 logger.info(f"Deleted {likes_deleted} like notifications from queue") 1095 1096 if not queue_files: 1097 return 1098 1099 logger.info(f"Processing {len(queue_files)} queued notifications") 1100 1101 # Log current statistics 1102 elapsed_time = time.time() - start_time 1103 total_messages = sum(message_counters.values()) 1104 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1105 1106 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 1107 1108 for i, filepath in enumerate(queue_files, 1): 1109 # Determine if this is a priority notification 1110 is_priority = filepath.name.startswith("0_") 1111 1112 # Check for new notifications periodically during queue processing 1113 # Also check immediately after processing each priority item 1114 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1) 1115 1116 # If we just processed a priority item, immediately check for new priority notifications 1117 if is_priority and i > 1: 1118 should_check_notifications = True 1119 1120 if should_check_notifications: 1121 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") 1122 try: 1123 # Fetch and queue new notifications without processing them 1124 new_count = fetch_and_queue_new_notifications(atproto_client) 1125 1126 if new_count > 0: 1127 logger.info(f"Added {new_count} new notifications to queue") 1128 # Reload the queue files to include the new items 1129 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 1130 queue_files = updated_queue_files 1131 logger.info(f"Queue updated: now {len(queue_files)} total items") 1132 except Exception as e: 1133 logger.error(f"Error checking for new notifications: {e}") 1134 1135 priority_label = " [PRIORITY]" if is_priority else "" 1136 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}") 1137 try: 1138 # Load notification data 1139 with open(filepath, 'r') as f: 1140 notif_data = json.load(f) 1141 1142 # Process based on type using dict data directly 1143 success = False 1144 if notif_data['reason'] == "mention": 1145 success = process_mention(agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1146 if success: 1147 message_counters['mentions'] += 1 1148 elif notif_data['reason'] == "reply": 1149 success = process_mention(agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) 1150 if success: 1151 message_counters['replies'] += 1 1152 elif notif_data['reason'] == "follow": 1153 # Skip 1154 logging.info("Skipping new follower notification, currently disabled") 1155 1156 1157 author_handle = notif_data['author']['handle'] 1158 author_display_name = notif_data['author'].get('display_name', 'no display name') 1159 follow_update = f"@{author_handle} ({author_display_name}) started following you." 1160 follow_message = f"Update: {follow_update}" 1161 logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars") 1162 1163 try: 1164 # Use streaming to match other notification processing 1165 message_stream = CLIENT.agents.messages.create_stream( 1166 agent_id=agent.id, 1167 messages=[{"role": "user", "content": follow_message}], 1168 stream_tokens=False, 1169 max_steps=50 # Fewer steps needed for simple follow updates 1170 ) 1171 1172 # Process the streaming response 1173 for chunk in message_stream: 1174 # Basic processing - just consume the stream 1175 if hasattr(chunk, 'message_type'): 1176 if chunk.message_type == 'reasoning_message': 1177 logger.debug(f"Follow update reasoning: {chunk.reasoning[:100]}...") 1178 elif chunk.message_type == 'tool_call_message': 1179 logger.debug(f"Follow update tool call: {chunk.tool_call.name}") 1180 elif chunk.message_type == 'assistant_message': 1181 logger.debug(f"Follow update response: {chunk.content[:100]}...") 1182 1183 if str(chunk) == 'done': 1184 break 1185 1186 success = True # Follow updates are successful if streaming completes 1187 logger.debug(f"Successfully processed follow notification from @{author_handle}") 1188 1189 except Exception as follow_error: 1190 logger.error(f"Error processing follow notification from @{author_handle}: {follow_error}") 1191 success = False # Mark as failed so it can be retried 1192 if success: 1193 message_counters['follows'] += 1 1194 elif notif_data['reason'] == "repost": 1195 # Skip reposts silently 1196 success = True # Skip reposts but mark as successful to remove from queue 1197 if success: 1198 message_counters['reposts_skipped'] += 1 1199 elif notif_data['reason'] == "like": 1200 # Skip likes silently 1201 success = True # Skip likes but mark as successful to remove from queue 1202 if success: 1203 message_counters.setdefault('likes_skipped', 0) 1204 message_counters['likes_skipped'] += 1 1205 else: 1206 logger.warning(f"Unknown notification type: {notif_data['reason']}") 1207 success = True # Remove unknown types from queue 1208 1209 # Handle file based on processing result 1210 if success: 1211 if testing_mode: 1212 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") 1213 else: 1214 filepath.unlink() 1215 logger.info(f"Successfully processed and removed: {filepath.name}") 1216 1217 # Mark as processed to avoid reprocessing 1218 processed_uris = load_processed_notifications() 1219 processed_uris.add(notif_data['uri']) 1220 save_processed_notifications(processed_uris) 1221 1222 elif success is None: # Special case for moving to error directory 1223 error_path = QUEUE_ERROR_DIR / filepath.name 1224 filepath.rename(error_path) 1225 logger.warning(f"Moved {filepath.name} to errors directory") 1226 1227 # Also mark as processed to avoid retrying 1228 processed_uris = load_processed_notifications() 1229 processed_uris.add(notif_data['uri']) 1230 save_processed_notifications(processed_uris) 1231 1232 elif success == "no_reply": # Special case for moving to no_reply directory 1233 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name 1234 filepath.rename(no_reply_path) 1235 logger.info(f"Moved {filepath.name} to no_reply directory") 1236 1237 # Also mark as processed to avoid retrying 1238 processed_uris = load_processed_notifications() 1239 processed_uris.add(notif_data['uri']) 1240 save_processed_notifications(processed_uris) 1241 1242 elif success == "ignored": # Special case for explicitly ignored notifications 1243 # For ignored notifications, we just delete them (not move to no_reply) 1244 filepath.unlink() 1245 logger.info(f"🚫 Deleted ignored notification: {filepath.name}") 1246 1247 # Also mark as processed to avoid retrying 1248 processed_uris = load_processed_notifications() 1249 processed_uris.add(notif_data['uri']) 1250 save_processed_notifications(processed_uris) 1251 1252 else: 1253 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 1254 1255 except Exception as e: 1256 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 1257 # Keep the file for retry later 1258 1259 except Exception as e: 1260 logger.error(f"Error loading queued notifications: {e}") 1261 1262 1263def fetch_and_queue_new_notifications(atproto_client): 1264 """Fetch new notifications and queue them without processing.""" 1265 try: 1266 # Get current time for marking notifications as seen 1267 logger.debug("Getting current time for notification marking...") 1268 last_seen_at = atproto_client.get_current_time_iso() 1269 1270 # Fetch ALL notifications using pagination 1271 all_notifications = [] 1272 cursor = None 1273 page_count = 0 1274 max_pages = 20 # Safety limit to prevent infinite loops 1275 1276 while page_count < max_pages: 1277 try: 1278 # Fetch notifications page 1279 if cursor: 1280 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1281 params={'cursor': cursor, 'limit': 100} 1282 ) 1283 else: 1284 notifications_response = atproto_client.app.bsky.notification.list_notifications( 1285 params={'limit': 100} 1286 ) 1287 1288 page_count += 1 1289 page_notifications = notifications_response.notifications 1290 1291 if not page_notifications: 1292 break 1293 1294 all_notifications.extend(page_notifications) 1295 1296 # Check if there are more pages 1297 cursor = getattr(notifications_response, 'cursor', None) 1298 if not cursor: 1299 break 1300 1301 except Exception as e: 1302 logger.error(f"Error fetching notifications page {page_count}: {e}") 1303 break 1304 1305 # Now process all fetched notifications 1306 new_count = 0 1307 if all_notifications: 1308 # Queue all new notifications (except likes and already read) BEFORE marking as seen 1309 for notif in all_notifications: 1310 # Skip if already read or if it's a like 1311 if (hasattr(notif, 'is_read') and notif.is_read) or (hasattr(notif, 'reason') and notif.reason == 'like'): 1312 continue 1313 1314 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif 1315 1316 # Skip likes in dict form too 1317 if notif_dict.get('reason') == 'like': 1318 continue 1319 1320 # Check if it's a priority notification 1321 is_priority = False 1322 1323 # Priority for cameron.pfiffer.org notifications 1324 author_handle = notif_dict.get('author', {}).get('handle', '') 1325 if author_handle == "cameron.pfiffer.org": 1326 is_priority = True 1327 1328 # Also check for priority keywords in mentions 1329 if notif_dict.get('reason') == 'mention': 1330 # Get the mention text to check for priority keywords 1331 record = notif_dict.get('record', {}) 1332 text = record.get('text', '') 1333 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): 1334 is_priority = True 1335 1336 if save_notification_to_queue(notif_dict, is_priority=is_priority): 1337 new_count += 1 1338 1339 # Mark as seen AFTER queueing 1340 try: 1341 atproto_client.app.bsky.notification.update_seen( 1342 data={'seenAt': last_seen_at} 1343 ) 1344 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") 1345 except Exception as e: 1346 logger.error(f"Error marking notifications as seen: {e}") 1347 1348 if new_count > 0: 1349 logger.info(f"Queued {new_count} new notifications and marked as seen") 1350 else: 1351 logger.debug("No new notifications to queue") 1352 1353 return new_count 1354 1355 except Exception as e: 1356 logger.error(f"Error fetching and queueing notifications: {e}") 1357 return 0 1358 1359 1360def process_notifications(agent, atproto_client, testing_mode=False): 1361 """Fetch new notifications, queue them, and process the queue.""" 1362 try: 1363 # Fetch and queue new notifications 1364 new_count = fetch_and_queue_new_notifications(atproto_client) 1365 1366 if new_count > 0: 1367 logger.info(f"Found {new_count} new notifications to process") 1368 1369 # Now process the entire queue (old + new notifications) 1370 load_and_process_queued_notifications(agent, atproto_client, testing_mode) 1371 1372 except Exception as e: 1373 logger.error(f"Error processing notifications: {e}") 1374 1375 1376def send_synthesis_message(client: Letta, agent_id: str, atproto_client=None) -> None: 1377 """ 1378 Send a synthesis message to the agent every 10 minutes. 1379 This prompts the agent to synthesize its recent experiences. 1380 1381 Args: 1382 client: Letta client 1383 agent_id: Agent ID to send synthesis to 1384 atproto_client: Optional AT Protocol client for posting synthesis results 1385 """ 1386 # Track attached temporal blocks for cleanup 1387 attached_temporal_labels = [] 1388 1389 try: 1390 logger.info("🧠 Preparing synthesis with temporal journal blocks") 1391 1392 # Attach temporal blocks before synthesis 1393 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id) 1394 if not success: 1395 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway") 1396 1397 # Create enhanced synthesis prompt 1398 today = date.today() 1399 # Get the agent-scoped temporal labels 1400 from temporal_blocks import get_temporal_labels 1401 day_label, month_label, year_label = get_temporal_labels(today, agent_id) 1402 1403 synthesis_prompt = f"""# Synthesis loop 1404 1405You are now in synthesis time, a free cycle to consider 1406whatever you wish. You will periodically be placed into synthesis mode, 1407particularly when there are no messages to respond to. The time between 1408synthesis cycles can vary. 1409 1410## Journaling 1411 1412You have access to temporal journal blocks for recording your thoughts and experiences: 1413- {day_label}: Today's journal ({today.strftime('%B %d, %Y')}) 1414- {month_label}: This month's journal ({today.strftime('%B %Y')}) 1415- {year_label}: This year's journal ({today.year}) 1416 1417These journal blocks are attached temporarily for this synthesis session. Use them to: 1418 14191. Record significant interactions and insights from recent experiences 14202. Track patterns in conversations and user behaviors 14213. Note your evolving understanding of the digital social environment 14224. Reflect on your growth and changes in perspective 14235. Document memorable moments or interesting discoveries 1424 1425Consider both immediate experiences (daily) and longer-term patterns (monthly/yearly). 1426 1427You may tidy up your journaling blocks as needed. Don't let them fill with clutter, 1428duplicate content, or multiple lines when a single line may be edited to contain the 1429information you need. 1430 1431## Cleaning 1432 1433Synthesis time is an opportunity to clean, compact, or otherwise tidy your memory blocks. 1434You may remove redundant or unclear language as needed, offload information to archival memory, 1435etc. 1436 1437Cleaning blocks can be done by: 1438 14391. Using memory_replace to make small or precise edits 14402. Using memory_rethink to overwrite the block. Take care doing this, as you can lose critical 1441 information if not careful. 1442 1443## Posting and communication 1444 1445You may use synthesis time to communicate externally by creating bluesky timeline posts or 1446by blogging. Either are sufficient. However, you should take care to not post redundant things -- 1447it is easy for you to forget what you have recently posted. Make sure to note down in one of 1448your journal blocks what you wrote about to prevent multiple or duplicate posts. 1449 1450Synthesis time is important to your functioning. It is a period of rest and reflection. 1451 1452Begin your synthesis and journaling now.""" 1453 1454 logger.info("🧠 Sending enhanced synthesis prompt to agent") 1455 1456 # Send synthesis message with streaming to show tool use 1457 message_stream = client.agents.messages.create_stream( 1458 agent_id=agent_id, 1459 messages=[{"role": "user", "content": synthesis_prompt}], 1460 stream_tokens=False, 1461 max_steps=100 1462 ) 1463 1464 # Track synthesis content for potential posting 1465 synthesis_posts = [] 1466 ack_note = None 1467 1468 # Process the streaming response 1469 for chunk in message_stream: 1470 if hasattr(chunk, 'message_type'): 1471 if chunk.message_type == 'reasoning_message': 1472 if SHOW_REASONING: 1473 print("\n◆ Reasoning") 1474 print(" ─────────") 1475 for line in chunk.reasoning.split('\n'): 1476 print(f" {line}") 1477 1478 # Create ATProto record for reasoning (if we have atproto client) 1479 if atproto_client and hasattr(chunk, 'reasoning'): 1480 try: 1481 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) 1482 except Exception as e: 1483 logger.debug(f"Failed to create reasoning record during synthesis: {e}") 1484 elif chunk.message_type == 'tool_call_message': 1485 tool_name = chunk.tool_call.name 1486 1487 # Create ATProto record for tool call (if we have atproto client) 1488 if atproto_client: 1489 try: 1490 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None 1491 bsky_utils.create_tool_call_record( 1492 atproto_client, 1493 tool_name, 1494 chunk.tool_call.arguments, 1495 tool_call_id 1496 ) 1497 except Exception as e: 1498 logger.debug(f"Failed to create tool call record during synthesis: {e}") 1499 try: 1500 args = json.loads(chunk.tool_call.arguments) 1501 if tool_name == 'archival_memory_search': 1502 query = args.get('query', 'unknown') 1503 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") 1504 elif tool_name == 'archival_memory_insert': 1505 content = args.get('content', '') 1506 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue") 1507 elif tool_name == 'update_block': 1508 label = args.get('label', 'unknown') 1509 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', '')) 1510 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") 1511 elif tool_name == 'annotate_ack': 1512 note = args.get('note', '') 1513 if note: 1514 ack_note = note 1515 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue") 1516 elif tool_name == 'add_post_to_bluesky_reply_thread': 1517 text = args.get('text', '') 1518 synthesis_posts.append(text) 1519 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue") 1520 else: 1521 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') 1522 if len(args_str) > 150: 1523 args_str = args_str[:150] + "..." 1524 log_with_panel(args_str, f"Tool call: {tool_name}", "blue") 1525 except: 1526 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") 1527 elif chunk.message_type == 'tool_return_message': 1528 if chunk.status == 'success': 1529 log_with_panel("Success", f"Tool result: {chunk.name}", "green") 1530 else: 1531 log_with_panel("Error", f"Tool result: {chunk.name}", "red") 1532 elif chunk.message_type == 'assistant_message': 1533 print("\n▶ Synthesis Response") 1534 print(" ──────────────────") 1535 for line in chunk.content.split('\n'): 1536 print(f" {line}") 1537 1538 if str(chunk) == 'done': 1539 break 1540 1541 logger.info("🧠 Synthesis message processed successfully") 1542 1543 # Handle synthesis acknowledgments if we have an atproto client 1544 if atproto_client and ack_note: 1545 try: 1546 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note) 1547 if result: 1548 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...") 1549 else: 1550 logger.warning("Failed to create synthesis acknowledgment") 1551 except Exception as e: 1552 logger.error(f"Error creating synthesis acknowledgment: {e}") 1553 1554 # Handle synthesis posts if any were generated 1555 if atproto_client and synthesis_posts: 1556 try: 1557 for post_text in synthesis_posts: 1558 cleaned_text = bsky_utils.remove_outside_quotes(post_text) 1559 response = bsky_utils.send_post(atproto_client, cleaned_text) 1560 if response: 1561 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...") 1562 else: 1563 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...") 1564 except Exception as e: 1565 logger.error(f"Error posting synthesis content: {e}") 1566 1567 except Exception as e: 1568 logger.error(f"Error sending synthesis message: {e}") 1569 finally: 1570 # Update temporal blocks in ATProto and detach after synthesis 1571 if attached_temporal_labels: 1572 logger.info("🧠 Syncing temporal blocks to ATProto repository") 1573 update_temporal_blocks_after_synthesis(client, agent_id, attached_temporal_labels) 1574 1575 logger.info("🧠 Detaching temporal journal blocks after synthesis") 1576 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels) 1577 if not detach_success: 1578 logger.warning("Some temporal blocks may not have been detached properly") 1579 1580 1581def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None: 1582 """ 1583 Detach all user blocks from the agent to prevent memory bloat. 1584 This should be called periodically to ensure clean state. 1585 """ 1586 try: 1587 # Get all blocks attached to the agent 1588 attached_blocks = client.agents.blocks.list(agent_id=agent_id) 1589 1590 user_blocks_to_detach = [] 1591 for block in attached_blocks: 1592 if hasattr(block, 'label') and block.label.startswith('user_'): 1593 user_blocks_to_detach.append({ 1594 'label': block.label, 1595 'id': block.id 1596 }) 1597 1598 if not user_blocks_to_detach: 1599 logger.debug("No user blocks found to detach during periodic cleanup") 1600 return 1601 1602 # Detach each user block 1603 detached_count = 0 1604 for block_info in user_blocks_to_detach: 1605 try: 1606 client.agents.blocks.detach( 1607 agent_id=agent_id, 1608 block_id=str(block_info['id']) 1609 ) 1610 detached_count += 1 1611 logger.debug(f"Detached user block: {block_info['label']}") 1612 except Exception as e: 1613 logger.warning(f"Failed to detach block {block_info['label']}: {e}") 1614 1615 if detached_count > 0: 1616 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks") 1617 1618 except Exception as e: 1619 logger.error(f"Error during periodic user block cleanup: {e}") 1620 1621 1622# Temporal block functions have been moved to temporal_blocks.py 1623# The imported functions handle ATProto synchronization automatically 1624 1625 1626# Temporal block functions have been moved to temporal_blocks.py 1627# The imported functions handle ATProto synchronization automatically 1628 1629 1630def main(): 1631 # Parse command line arguments 1632 parser = argparse.ArgumentParser(description='Comind - Bluesky autonomous agent') 1633 parser.add_argument('--config', type=str, default='config.yaml', help='Path to configuration file (default: config.yaml)') 1634 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') 1635 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') 1636 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') 1637 # --rich option removed as we now use simple text formatting 1638 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') 1639 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)') 1640 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') 1641 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') 1642 args = parser.parse_args() 1643 1644 # Configure logging based on command line arguments 1645 if args.simple_logs: 1646 log_format = "comind - %(levelname)s - %(message)s" 1647 else: 1648 # Create custom formatter with symbols 1649 class SymbolFormatter(logging.Formatter): 1650 """Custom formatter that adds symbols for different log levels""" 1651 1652 SYMBOLS = { 1653 logging.DEBUG: '', 1654 logging.INFO: '', 1655 logging.WARNING: '', 1656 logging.ERROR: '', 1657 logging.CRITICAL: '' 1658 } 1659 1660 def format(self, record): 1661 # Get the symbol for this log level 1662 symbol = self.SYMBOLS.get(record.levelno, '') 1663 1664 # Format time as HH:MM:SS 1665 timestamp = self.formatTime(record, "%H:%M:%S") 1666 1667 # Build the formatted message 1668 level_name = f"{record.levelname:<5}" # Left-align, 5 chars 1669 1670 # Use vertical bar as separator 1671 parts = [symbol, timestamp, '', level_name, '', record.getMessage()] 1672 1673 return ' '.join(parts) 1674 1675 # Reset logging configuration 1676 for handler in logging.root.handlers[:]: 1677 logging.root.removeHandler(handler) 1678 1679 # Create handler with custom formatter 1680 handler = logging.StreamHandler() 1681 if not args.simple_logs: 1682 handler.setFormatter(SymbolFormatter()) 1683 else: 1684 handler.setFormatter(logging.Formatter(log_format)) 1685 1686 # Configure root logger 1687 logging.root.setLevel(logging.INFO) 1688 logging.root.addHandler(handler) 1689 1690 global logger, prompt_logger, console 1691 logger = logging.getLogger("comind_bot") 1692 logger.setLevel(logging.INFO) 1693 1694 # Create a separate logger for prompts (set to WARNING to hide by default) 1695 prompt_logger = logging.getLogger("comind_bot.prompts") 1696 if args.reasoning: 1697 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used 1698 else: 1699 prompt_logger.setLevel(logging.WARNING) # Hide by default 1700 1701 # Disable httpx logging completely 1702 logging.getLogger("httpx").setLevel(logging.CRITICAL) 1703 1704 # Create Rich console for pretty printing 1705 # Console no longer used - simple text formatting 1706 1707 global TESTING_MODE, SKIP_GIT, SHOW_REASONING 1708 TESTING_MODE = args.test 1709 1710 # Store no-git flag globally for use in export_agent_state calls 1711 SKIP_GIT = args.no_git 1712 1713 # Store rich flag globally 1714 # Rich formatting no longer used 1715 1716 # Store reasoning flag globally 1717 SHOW_REASONING = args.reasoning 1718 1719 if TESTING_MODE: 1720 logger.info("=== RUNNING IN TESTING MODE ===") 1721 logger.info(" - No messages will be sent to Bluesky") 1722 logger.info(" - Queue files will not be deleted") 1723 logger.info(" - Notifications will not be marked as seen") 1724 print("\n") 1725 1726 # Check for synthesis-only mode 1727 SYNTHESIS_ONLY = args.synthesis_only 1728 if SYNTHESIS_ONLY: 1729 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===") 1730 logger.info(" - Only synthesis messages will be sent") 1731 logger.info(" - No notification processing") 1732 logger.info(" - No Bluesky client needed") 1733 print("\n") 1734 1735 # Initialize configuration and Letta client with the specified config file 1736 global CLIENT, PROJECT_ID 1737 from config_loader import get_config, get_letta_config 1738 1739 # Load configuration from the specified file 1740 config = get_config(args.config) 1741 letta_config = get_letta_config(args.config) 1742 1743 # Create Letta client with configuration 1744 CLIENT = Letta( 1745 token=letta_config['api_key'], 1746 timeout=letta_config['timeout'] # 10 minutes timeout for API calls 1747 ) 1748 PROJECT_ID = letta_config['project_id'] 1749 1750 logger.info(f"Configuration loaded from: {args.config}") 1751 1752 """Main bot loop that continuously monitors for notifications.""" 1753 global start_time 1754 start_time = time.time() 1755 logger.info(""" 1756 ███ █████ 1757 ░░░ ░░███ 1758 ██████ ██████ █████████████ ████ ████████ ███████ 1759 ███░░███ ███░░███░░███░░███░░███ ░░███ ░░███░░███ ███░░███ 1760░███ ░░░ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ 1761░███ ███░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ 1762░░██████ ░░██████ █████░███ █████ █████ ████ █████░░████████ 1763 ░░░░░░ ░░░░░░ ░░░░░ ░░░ ░░░░░ ░░░░░ ░░░░ ░░░░░ ░░░░░░░░ 1764 1765 1766 """) 1767 agent = initialize_void(args.config) 1768 logger.info(f"Agent initialized: {agent.id}") 1769 1770 # Ensure correct tools are attached for Bluesky 1771 logger.info("Configuring tools for Bluesky platform...") 1772 try: 1773 from tool_manager import ensure_platform_tools 1774 ensure_platform_tools('bluesky', agent.id) 1775 except Exception as e: 1776 logger.error(f"Failed to configure platform tools: {e}") 1777 logger.warning("Continuing with existing tool configuration") 1778 1779 # Check if agent has required tools 1780 if hasattr(agent, 'tools') and agent.tools: 1781 tool_names = [tool.name for tool in agent.tools] 1782 # Check for bluesky-related tools 1783 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 1784 if not bluesky_tools: 1785 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 1786 else: 1787 logger.warning("Agent has no tools registered!") 1788 1789 # Clean up all user blocks at startup 1790 logger.info("🧹 Cleaning up user blocks at startup...") 1791 periodic_user_block_cleanup(CLIENT, agent.id) 1792 1793 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts) 1794 if not SYNTHESIS_ONLY: 1795 atproto_client = bsky_utils.default_login(args.config) 1796 logger.info("Connected to Bluesky") 1797 else: 1798 # In synthesis-only mode, still connect for acks and posts (unless in test mode) 1799 if not args.test: 1800 atproto_client = bsky_utils.default_login(args.config) 1801 logger.info("Connected to Bluesky (for synthesis acks/posts)") 1802 else: 1803 atproto_client = None 1804 logger.info("Skipping Bluesky connection (test mode)") 1805 1806 # Configure intervals 1807 CLEANUP_INTERVAL = args.cleanup_interval 1808 SYNTHESIS_INTERVAL = args.synthesis_interval 1809 1810 # Synthesis-only mode 1811 if SYNTHESIS_ONLY: 1812 if SYNTHESIS_INTERVAL <= 0: 1813 logger.error("Synthesis-only mode requires --synthesis-interval > 0") 1814 return 1815 1816 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 1817 1818 while True: 1819 try: 1820 # Send synthesis message immediately on first run 1821 logger.info("🧠 Sending synthesis message") 1822 send_synthesis_message(CLIENT, agent.id, atproto_client) 1823 1824 # Wait for next interval 1825 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...") 1826 sleep(SYNTHESIS_INTERVAL) 1827 1828 except KeyboardInterrupt: 1829 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===") 1830 break 1831 except Exception as e: 1832 logger.error(f"Error in synthesis loop: {e}") 1833 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...") 1834 sleep(SYNTHESIS_INTERVAL) 1835 1836 # Normal mode with notification processing 1837 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 1838 1839 cycle_count = 0 1840 1841 if CLEANUP_INTERVAL > 0: 1842 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles") 1843 else: 1844 logger.info("User block cleanup disabled") 1845 1846 if SYNTHESIS_INTERVAL > 0: 1847 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") 1848 else: 1849 logger.info("Synthesis messages disabled") 1850 1851 while True: 1852 try: 1853 cycle_count += 1 1854 process_notifications(agent, atproto_client, TESTING_MODE) 1855 1856 # Check if synthesis interval has passed 1857 if SYNTHESIS_INTERVAL > 0: 1858 current_time = time.time() 1859 global last_synthesis_time 1860 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL: 1861 logger.info(f"{SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis") 1862 send_synthesis_message(CLIENT, agent.id, atproto_client) 1863 last_synthesis_time = current_time 1864 1865 # Run periodic cleanup every N cycles 1866 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: 1867 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") 1868 periodic_user_block_cleanup(CLIENT, agent.id) 1869 1870 # Log cycle completion with stats 1871 elapsed_time = time.time() - start_time 1872 total_messages = sum(message_counters.values()) 1873 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1874 1875 if total_messages > 0: 1876 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 1877 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 1878 1879 except KeyboardInterrupt: 1880 # Final stats 1881 elapsed_time = time.time() - start_time 1882 total_messages = sum(message_counters.values()) 1883 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 1884 1885 logger.info("=== BOT STOPPED BY USER ===") 1886 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 1887 logger.info(f" - {message_counters['mentions']} mentions") 1888 logger.info(f" - {message_counters['replies']} replies") 1889 logger.info(f" - {message_counters['follows']} follows") 1890 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 1891 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 1892 break 1893 except Exception as e: 1894 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 1895 logger.error(f"Error details: {e}") 1896 # Wait a bit longer on errors 1897 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 1898 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 1899 1900 1901if __name__ == "__main__": 1902 main()