# Rich imports removed - using simple text formatting from time import sleep from letta_client import Letta from bsky_utils import thread_to_yaml_string import os import logging import json import hashlib import subprocess from pathlib import Path from datetime import datetime from collections import defaultdict import time import argparse from utils import ( upsert_block, upsert_agent ) import bsky_utils from tools.blocks import attach_user_blocks, detach_user_blocks from datetime import date from temporal_blocks import ( attach_temporal_blocks, detach_temporal_blocks, update_temporal_blocks_after_synthesis ) def extract_handles_from_data(data): """Recursively extract all unique handles from nested data structure.""" handles = set() def _extract_recursive(obj): if isinstance(obj, dict): # Check if this dict has a 'handle' key if 'handle' in obj: handles.add(obj['handle']) # Recursively check all values for value in obj.values(): _extract_recursive(value) elif isinstance(obj, list): # Recursively check all list items for item in obj: _extract_recursive(item) _extract_recursive(data) return list(handles) # Logging will be configured after argument parsing logger = None prompt_logger = None # Simple text formatting (Rich no longer used) SHOW_REASONING = False last_archival_query = "archival memory search" def log_with_panel(message, title=None, border_color="white"): """Log a message with Unicode box-drawing characters""" if title: # Map old color names to appropriate symbols symbol_map = { "blue": "⚙", # Tool calls "green": "✓", # Success/completion "yellow": "◆", # Reasoning "red": "✗", # Errors "white": "▶", # Default/mentions "cyan": "✎", # Posts } symbol = symbol_map.get(border_color, "▶") print(f"\n{symbol} {title}") print(f" {'─' * len(title)}") # Indent message lines for line in message.split('\n'): print(f" {line}") else: print(message) # Global variables for Letta client and config # These will be initialized in main() with the proper config file CLIENT = None PROJECT_ID = None # Notification check delay FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response # Check for new notifications every N queue items CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing # Queue directory - will be initialized with agent ID later QUEUE_DIR = None QUEUE_ERROR_DIR = None QUEUE_NO_REPLY_DIR = None PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json") # Maximum number of processed notifications to track MAX_PROCESSED_NOTIFICATIONS = 10000 # Message tracking counters message_counters = defaultdict(int) start_time = time.time() # Testing mode flag TESTING_MODE = False # Skip git operations flag SKIP_GIT = False # Synthesis message tracking last_synthesis_time = time.time() def export_agent_state(client, agent, skip_git=False): """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" try: # Confirm export with user unless git is being skipped if not skip_git: response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() if response not in ['y', 'yes']: logger.info("Agent export cancelled by user.") return else: logger.info("Exporting agent state (git staging disabled)") # Create directories if they don't exist os.makedirs("agent_archive", exist_ok=True) os.makedirs("agents", exist_ok=True) # Export agent data logger.info(f"Exporting agent {agent.id}. This takes some time...") agent_data = client.agents.export_file(agent_id=agent.id) # Save timestamped archive copy timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") with open(archive_file, 'w', encoding='utf-8') as f: json.dump(agent_data, f, indent=2, ensure_ascii=False) # Save current agent state current_file = os.path.join("agents", "void.af") with open(current_file, 'w', encoding='utf-8') as f: json.dump(agent_data, f, indent=2, ensure_ascii=False) logger.info(f"Agent exported to {archive_file} and {current_file}") # Git add only the current agent file (archive is ignored) unless skip_git is True if not skip_git: try: subprocess.run(["git", "add", current_file], check=True, capture_output=True) logger.info("Added current agent file to git staging") except subprocess.CalledProcessError as e: logger.warning(f"Failed to git add agent file: {e}") except Exception as e: logger.error(f"Failed to export agent: {e}") def initialize_void(config_path="config.yaml"): logger.info("Starting agent initialization...") # Get the configured agent by ID logger.info("Loading agent from config...") from config_loader import get_letta_config letta_config = get_letta_config(config_path) agent_id = letta_config['agent_id'] try: agent = CLIENT.agents.retrieve(agent_id=agent_id) logger.info(f"Successfully loaded agent: {agent.name} ({agent_id})") except Exception as e: logger.error(f"Failed to load agent {agent_id}: {e}") logger.error("Please ensure the agent_id in config.yaml is correct") raise e # Initialize agent-specific queue directories global QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR QUEUE_DIR = Path(f"queue/{agent_id}") QUEUE_DIR.mkdir(exist_ok=True, parents=True) QUEUE_ERROR_DIR = QUEUE_DIR / "errors" QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) QUEUE_NO_REPLY_DIR = QUEUE_DIR / "no_reply" QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True) logger.info(f"Initialized queue directories for agent {agent_id}") # Export agent state logger.info("Exporting agent state...") export_agent_state(CLIENT, agent, skip_git=SKIP_GIT) # Log agent details logger.info(f"Agent details - ID: {agent.id}") logger.info(f"Agent name: {agent.name}") if hasattr(agent, 'llm_config'): logger.info(f"Agent model: {agent.llm_config.model}") logger.info(f"Agent project_id: {agent.project_id}") if hasattr(agent, 'tools'): logger.info(f"Agent has {len(agent.tools)} tools") for tool in agent.tools[:3]: # Show first 3 tools logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") # Migrate old queue files if they exist migrate_old_queue_files(agent_id) return agent def migrate_old_queue_files(agent_id): """Migrate queue files from old flat structure to agent-scoped directories.""" old_queue_dir = Path("queue") if not old_queue_dir.exists(): return # Count old queue files (excluding subdirectories and special files) old_files = [f for f in old_queue_dir.glob("*.json") if f.name != "processed_notifications.json"] if not old_files: return logger.info(f"Found {len(old_files)} queue files in old location, migrating to agent-specific directory...") migrated_count = 0 for old_file in old_files: try: # Read the notification to check if it's for this agent with open(old_file, 'r') as f: notif_data = json.load(f) # Move to new agent-specific queue directory new_file = QUEUE_DIR / old_file.name old_file.rename(new_file) migrated_count += 1 logger.debug(f"Migrated {old_file.name} to {new_file}") except Exception as e: logger.warning(f"Failed to migrate {old_file.name}: {e}") if migrated_count > 0: logger.info(f"Successfully migrated {migrated_count} queue files to agent-specific directory") # Also check and migrate files from old errors and no_reply directories for subdir in ["errors", "no_reply"]: old_subdir = old_queue_dir / subdir if old_subdir.exists(): old_subdir_files = list(old_subdir.glob("*.json")) if old_subdir_files: new_subdir = QUEUE_DIR / subdir new_subdir.mkdir(exist_ok=True, parents=True) for old_file in old_subdir_files: try: new_file = new_subdir / old_file.name old_file.rename(new_file) logger.debug(f"Migrated {subdir}/{old_file.name}") except Exception as e: logger.warning(f"Failed to migrate {subdir}/{old_file.name}: {e}") def process_mention(agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False): """Process a mention and generate a reply using the Letta agent. Args: agent: The Letta agent instance atproto_client: The AT Protocol client notification_data: The notification data dictionary queue_filepath: Optional Path object to the queue file (for cleanup on halt) Returns: True: Successfully processed, remove from queue False: Failed but retryable, keep in queue None: Failed with non-retryable error, move to errors directory "no_reply": No reply was generated, move to no_reply directory """ try: logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}") # Handle both dict and object inputs for backwards compatibility if isinstance(notification_data, dict): uri = notification_data['uri'] mention_text = notification_data.get('record', {}).get('text', '') author_handle = notification_data['author']['handle'] author_name = notification_data['author'].get('display_name') or author_handle else: # Legacy object access uri = notification_data.uri mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" author_handle = notification_data.author.handle author_name = notification_data.author.display_name or author_handle logger.debug(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...") # Retrieve the entire thread associated with the mention try: thread = atproto_client.app.bsky.feed.get_post_thread({ 'uri': uri, 'parent_height': 40, 'depth': 10 }) except Exception as e: error_str = str(e) # Check if this is a NotFound error if 'NotFound' in error_str or 'Post not found' in error_str: logger.warning(f"Post not found for URI {uri}, removing from queue") return True # Return True to remove from queue else: # Re-raise other errors logger.error(f"Error fetching thread: {e}") raise # Get thread context as YAML string logger.debug("Converting thread to YAML string") try: thread_context = thread_to_yaml_string(thread) logger.debug(f"Thread context generated, length: {len(thread_context)} characters") # Check if #voidstop appears anywhere in the thread if "#voidstop" in thread_context.lower(): logger.info("Found #voidstop in thread context, skipping this mention") return True # Return True to remove from queue # Also check the mention text directly if "#voidstop" in mention_text.lower(): logger.info("Found #voidstop in mention text, skipping this mention") return True # Return True to remove from queue # Create a more informative preview by extracting meaningful content lines = thread_context.split('\n') meaningful_lines = [] for line in lines: stripped = line.strip() if not stripped: continue # Look for lines with actual content (not just structure) if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): meaningful_lines.append(line) if len(meaningful_lines) >= 5: break if meaningful_lines: preview = '\n'.join(meaningful_lines) logger.debug(f"Thread content preview:\n{preview}") else: # If no content fields found, just show it's a thread structure logger.debug(f"Thread structure generated ({len(thread_context)} chars)") except Exception as yaml_error: import traceback logger.error(f"Error converting thread to YAML: {yaml_error}") logger.error(f"Full traceback:\n{traceback.format_exc()}") logger.error(f"Thread type: {type(thread)}") if hasattr(thread, '__dict__'): logger.error(f"Thread attributes: {thread.__dict__}") # Try to continue with a simple context thread_context = f"Error processing thread context: {str(yaml_error)}" # Create a prompt for the Letta agent with thread context prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). MOST RECENT POST (the mention you're responding to): "{mention_text}" FULL THREAD CONTEXT: ```yaml {thread_context} ``` The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. To reply, use the add_post_to_bluesky_reply_thread tool: - Each call creates one post (max 300 characters) - For most responses, a single call is sufficient - Only use multiple calls for threaded replies when: * The topic requires extended explanation that cannot fit in 300 characters * You're explicitly asked for a detailed/long response * The conversation naturally benefits from a structured multi-part answer - Avoid unnecessary threads - be concise when possible""" # Extract all handles from notification and thread data all_handles = set() all_handles.update(extract_handles_from_data(notification_data)) all_handles.update(extract_handles_from_data(thread.model_dump())) unique_handles = list(all_handles) logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") # Check if any handles are in known_bots list from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs import json try: # Check for known bots in thread bot_check_result = check_known_bots(unique_handles, agent) bot_check_data = json.loads(bot_check_result) if bot_check_data.get("bot_detected", False): detected_bots = bot_check_data.get("detected_bots", []) logger.info(f"Bot detected in thread: {detected_bots}") # Decide whether to respond (10% chance) if not should_respond_to_bot_thread(): logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}") # Return False to keep in queue for potential later processing return False else: logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}") else: logger.debug("No known bots detected in thread") except Exception as bot_check_error: logger.warning(f"Error checking for bots: {bot_check_error}") # Continue processing if bot check fails # Attach user blocks before agent call attached_handles = [] if unique_handles: try: logger.debug(f"Attaching user blocks for handles: {unique_handles}") attach_result = attach_user_blocks(unique_handles, agent) attached_handles = unique_handles # Track successfully attached handles logger.debug(f"Attach result: {attach_result}") except Exception as attach_error: logger.warning(f"Failed to attach user blocks: {attach_error}") # Continue without user blocks rather than failing completely # Get response from Letta agent # Format with Unicode characters title = f"MENTION FROM @{author_handle}" print(f"\n▶ {title}") print(f" {'═' * len(title)}") # Indent the mention text for line in mention_text.split('\n'): print(f" {line}") # Log prompt details to separate logger prompt_logger.debug(f"Full prompt being sent:\n{prompt}") # Log concise prompt info to main logger thread_handles_count = len(unique_handles) prompt_char_count = len(prompt) logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars") try: # Use streaming to avoid 524 timeout errors message_stream = CLIENT.agents.messages.create_stream( agent_id=agent.id, messages=[{"role": "user", "content": prompt}], stream_tokens=False, # Step streaming only (faster than token streaming) max_steps=100 ) # Collect the streaming response all_messages = [] for chunk in message_stream: # Log condensed chunk info if hasattr(chunk, 'message_type'): if chunk.message_type == 'reasoning_message': # Show full reasoning without truncation if SHOW_REASONING: # Format with Unicode characters print("\n◆ Reasoning") print(" ─────────") # Indent reasoning lines for line in chunk.reasoning.split('\n'): print(f" {line}") else: # Default log format (only when --reasoning is used due to log level) # Format with Unicode characters print("\n◆ Reasoning") print(" ─────────") # Indent reasoning lines for line in chunk.reasoning.split('\n'): print(f" {line}") # Create ATProto record for reasoning (unless in testing mode) if not testing_mode and hasattr(chunk, 'reasoning'): try: bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) except Exception as e: logger.debug(f"Failed to create reasoning record: {e}") elif chunk.message_type == 'tool_call_message': # Parse tool arguments for better display tool_name = chunk.tool_call.name # Create ATProto record for tool call (unless in testing mode) if not testing_mode: try: tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None bsky_utils.create_tool_call_record( atproto_client, tool_name, chunk.tool_call.arguments, tool_call_id ) except Exception as e: logger.debug(f"Failed to create tool call record: {e}") try: args = json.loads(chunk.tool_call.arguments) # Format based on tool type if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']: # Extract the text being posted text = args.get('text', '') if text: # Format with Unicode characters print("\n✎ Bluesky Post") print(" ────────────") # Indent post text for line in text.split('\n'): print(f" {line}") else: log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") elif tool_name == 'archival_memory_search': query = args.get('query', 'unknown') global last_archival_query last_archival_query = query log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") elif tool_name == 'archival_memory_insert': content = args.get('content', '') # Show the full content being inserted log_with_panel(content, f"Tool call: {tool_name}", "blue") elif tool_name == 'update_block': label = args.get('label', 'unknown') value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', '')) log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") else: # Generic display for other tools args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') if len(args_str) > 150: args_str = args_str[:150] + "..." log_with_panel(args_str, f"Tool call: {tool_name}", "blue") except: # Fallback to original format if parsing fails log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") elif chunk.message_type == 'tool_return_message': # Enhanced tool result logging tool_name = chunk.name status = chunk.status if status == 'success': # Try to show meaningful result info based on tool type if hasattr(chunk, 'tool_return') and chunk.tool_return: result_str = str(chunk.tool_return) if tool_name == 'archival_memory_search': try: # Handle both string and list formats if isinstance(chunk.tool_return, str): # The string format is: "([{...}, {...}], count)" # We need to extract just the list part if chunk.tool_return.strip(): # Find the list part between the first [ and last ] start_idx = chunk.tool_return.find('[') end_idx = chunk.tool_return.rfind(']') if start_idx != -1 and end_idx != -1: list_str = chunk.tool_return[start_idx:end_idx+1] # Use ast.literal_eval since this is Python literal syntax, not JSON import ast results = ast.literal_eval(list_str) else: logger.warning("Could not find list in archival_memory_search result") results = [] else: logger.warning("Empty string returned from archival_memory_search") results = [] else: # If it's already a list, use directly results = chunk.tool_return log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name} ✓", "green") # Use the captured search query from the tool call search_query = last_archival_query # Combine all results into a single text block content_text = "" for i, entry in enumerate(results, 1): timestamp = entry.get('timestamp', 'N/A') content = entry.get('content', '') content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n" # Format with Unicode characters title = f"{search_query} ({len(results)} results)" print(f"\n⚙ {title}") print(f" {'─' * len(title)}") # Indent content text for line in content_text.strip().split('\n'): print(f" {line}") except Exception as e: logger.error(f"Error formatting archival memory results: {e}") log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name} ✓", "green") elif tool_name == 'add_post_to_bluesky_reply_thread': # Just show success for bluesky posts, the text was already shown in tool call log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green") elif tool_name == 'archival_memory_insert': # Skip archival memory insert results (always returns None) pass elif tool_name == 'update_block': log_with_panel("Memory block updated", f"Tool result: {tool_name} ✓", "green") else: # Generic success with preview preview = result_str[:100] + "..." if len(result_str) > 100 else result_str log_with_panel(preview, f"Tool result: {tool_name} ✓", "green") else: log_with_panel("Success", f"Tool result: {tool_name} ✓", "green") elif status == 'error': # Show error details if tool_name == 'add_post_to_bluesky_reply_thread': error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred" log_with_panel(error_str, f"Bluesky Post ✗", "red") elif tool_name == 'archival_memory_insert': # Skip archival memory insert errors too pass else: error_preview = "" if hasattr(chunk, 'tool_return') and chunk.tool_return: error_str = str(chunk.tool_return) error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name} ✗", "red") else: log_with_panel("Error occurred", f"Tool result: {tool_name} ✗", "red") else: logger.info(f"Tool result: {tool_name} - {status}") elif chunk.message_type == 'assistant_message': # Format with Unicode characters print("\n▶ Assistant Response") print(" ──────────────────") # Indent response text for line in chunk.content.split('\n'): print(f" {line}") else: # Filter out verbose message types if chunk.message_type not in ['usage_statistics', 'stop_reason']: logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...") else: logger.info(f"📦 Stream status: {chunk}") # Log full chunk for debugging logger.debug(f"Full streaming chunk: {chunk}") all_messages.append(chunk) if str(chunk) == 'done': break # Convert streaming response to standard format for compatibility message_response = type('StreamingResponse', (), { 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')] })() except Exception as api_error: import traceback error_str = str(api_error) logger.error(f"Letta API error: {api_error}") logger.error(f"Error type: {type(api_error).__name__}") logger.error(f"Full traceback:\n{traceback.format_exc()}") logger.error(f"Mention text was: {mention_text}") logger.error(f"Author: @{author_handle}") logger.error(f"URI: {uri}") # Try to extract more info from different error types if hasattr(api_error, 'response'): logger.error(f"Error response object exists") if hasattr(api_error.response, 'text'): logger.error(f"Response text: {api_error.response.text}") if hasattr(api_error.response, 'json') and callable(api_error.response.json): try: logger.error(f"Response JSON: {api_error.response.json()}") except: pass # Check for specific error types if hasattr(api_error, 'status_code'): logger.error(f"API Status code: {api_error.status_code}") if hasattr(api_error, 'body'): logger.error(f"API Response body: {api_error.body}") if hasattr(api_error, 'headers'): logger.error(f"API Response headers: {api_error.headers}") if api_error.status_code == 413: logger.error("413 Payload Too Large - moving to errors directory") return None # Move to errors directory - payload is too large to ever succeed elif api_error.status_code == 524: logger.error("524 error - timeout from Cloudflare, will retry later") return False # Keep in queue for retry # Check if error indicates we should remove from queue if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: logger.warning("Payload too large error, moving to errors directory") return None # Move to errors directory - cannot be fixed by retry elif 'status_code: 524' in error_str: logger.warning("524 timeout error, keeping in queue for retry") return False # Keep in queue for retry raise # Log successful response logger.debug("Successfully received response from Letta API") logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response reply_candidates = [] tool_call_results = {} # Map tool_call_id to status ack_note = None # Track any note from annotate_ack tool logger.debug(f"Processing {len(message_response.messages)} response messages...") # First pass: collect tool return statuses ignored_notification = False ignore_reason = "" ignore_category = "" for message in message_response.messages: if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'): if message.name == 'add_post_to_bluesky_reply_thread': tool_call_results[message.tool_call_id] = message.status logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}") elif message.name == 'ignore_notification': # Check if the tool was successful if hasattr(message, 'tool_return') and message.status == 'success': # Parse the return value to extract category and reason result_str = str(message.tool_return) if 'IGNORED_NOTIFICATION::' in result_str: parts = result_str.split('::') if len(parts) >= 3: ignore_category = parts[1] ignore_reason = parts[2] ignored_notification = True logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}") elif message.name == 'bluesky_reply': logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") logger.error("Please use add_post_to_bluesky_reply_thread instead.") logger.error("Update the agent's tools using register_tools.py") # Export agent state before terminating export_agent_state(CLIENT, agent, skip_git=SKIP_GIT) logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") exit(1) # Second pass: process messages and check for successful tool calls for i, message in enumerate(message_response.messages, 1): # Log concise message info instead of full object msg_type = getattr(message, 'message_type', 'unknown') if hasattr(message, 'reasoning') and message.reasoning: logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") elif hasattr(message, 'tool_call') and message.tool_call: tool_name = message.tool_call.name logger.debug(f" {i}. {msg_type}: {tool_name}") elif hasattr(message, 'tool_return'): tool_name = getattr(message, 'name', 'unknown_tool') return_preview = str(message.tool_return)[:100] if message.tool_return else "None" status = getattr(message, 'status', 'unknown') logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})") elif hasattr(message, 'text'): logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") else: logger.debug(f" {i}. {msg_type}: ") # Check for halt_activity tool call if hasattr(message, 'tool_call') and message.tool_call: if message.tool_call.name == 'halt_activity': logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT") try: args = json.loads(message.tool_call.arguments) reason = args.get('reason', 'Agent requested halt') logger.info(f"Halt reason: {reason}") except: logger.info("Halt reason: ") # Delete the queue file before terminating if queue_filepath and queue_filepath.exists(): queue_filepath.unlink() logger.info(f"Deleted queue file: {queue_filepath.name}") # Also mark as processed to avoid reprocessing processed_uris = load_processed_notifications() processed_uris.add(notification_data.get('uri', '')) save_processed_notifications(processed_uris) # Export agent state before terminating export_agent_state(CLIENT, agent, skip_git=SKIP_GIT) # Exit the program logger.info("=== BOT TERMINATED BY AGENT ===") exit(0) # Check for deprecated bluesky_reply tool if hasattr(message, 'tool_call') and message.tool_call: if message.tool_call.name == 'bluesky_reply': logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!") logger.error("Please use add_post_to_bluesky_reply_thread instead.") logger.error("Update the agent's tools using register_tools.py") # Export agent state before terminating export_agent_state(CLIENT, agent, skip_git=SKIP_GIT) logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===") exit(1) # Collect annotate_ack tool calls elif message.tool_call.name == 'annotate_ack': try: args = json.loads(message.tool_call.arguments) note = args.get('note', '') if note: ack_note = note logger.debug(f"Found annotate_ack with note: {note[:50]}...") except json.JSONDecodeError as e: logger.error(f"Failed to parse annotate_ack arguments: {e}") # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful elif message.tool_call.name == 'add_post_to_bluesky_reply_thread': tool_call_id = message.tool_call.tool_call_id tool_status = tool_call_results.get(tool_call_id, 'unknown') if tool_status == 'success': try: args = json.loads(message.tool_call.arguments) reply_text = args.get('text', '') reply_lang = args.get('lang', 'en-US') if reply_text: # Only add if there's actual content reply_candidates.append((reply_text, reply_lang)) logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})") except json.JSONDecodeError as e: logger.error(f"Failed to parse tool call arguments: {e}") elif tool_status == 'error': logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)") else: logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}") # Check for conflicting tool calls if reply_candidates and ignored_notification: logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!") logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}") logger.warning("Item will be left in queue for manual review") # Return False to keep in queue return False if reply_candidates: # Aggregate reply posts into a thread reply_messages = [] reply_langs = [] for text, lang in reply_candidates: reply_messages.append(text) reply_langs.append(lang) # Use the first language for the entire thread (could be enhanced later) reply_lang = reply_langs[0] if reply_langs else 'en-US' logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread") # Display the generated reply thread if len(reply_messages) == 1: content = reply_messages[0] title = f"Reply to @{author_handle}" else: content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)]) title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)" # Format with Unicode characters print(f"\n✎ {title}") print(f" {'─' * len(title)}") # Indent content lines for line in content.split('\n'): print(f" {line}") # Send the reply(s) with language (unless in testing mode) if testing_mode: logger.info("TESTING MODE: Skipping actual Bluesky post") response = True # Simulate success else: if len(reply_messages) == 1: # Single reply - use existing function cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0]) logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})") response = bsky_utils.reply_to_notification( client=atproto_client, notification=notification_data, reply_text=cleaned_text, lang=reply_lang ) else: # Multiple replies - use new threaded function cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages] logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})") response = bsky_utils.reply_with_thread_to_notification( client=atproto_client, notification=notification_data, reply_messages=cleaned_messages, lang=reply_lang ) if response: logger.info(f"Successfully replied to @{author_handle}") # Acknowledge the post we're replying to with stream.thought.ack try: post_uri = notification_data.get('uri') post_cid = notification_data.get('cid') if post_uri and post_cid: ack_result = bsky_utils.acknowledge_post( client=atproto_client, post_uri=post_uri, post_cid=post_cid, note=ack_note ) if ack_result: if ack_note: logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")") else: logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack") else: logger.warning(f"Failed to acknowledge post from @{author_handle}") else: logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}") except Exception as e: logger.error(f"Error acknowledging post from @{author_handle}: {e}") # Don't fail the entire operation if acknowledgment fails return True else: logger.error(f"Failed to send reply to @{author_handle}") return False else: # Check if notification was explicitly ignored if ignored_notification: logger.info(f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})") return "ignored" else: logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder") return "no_reply" except Exception as e: logger.error(f"Error processing mention: {e}") return False finally: # Detach user blocks after agent response (success or failure) if 'attached_handles' in locals() and attached_handles: try: logger.info(f"Detaching user blocks for handles: {attached_handles}") detach_result = detach_user_blocks(attached_handles, agent) logger.debug(f"Detach result: {detach_result}") except Exception as detach_error: logger.warning(f"Failed to detach user blocks: {detach_error}") def notification_to_dict(notification): """Convert a notification object to a dictionary for JSON serialization.""" return { 'uri': notification.uri, 'cid': notification.cid, 'reason': notification.reason, 'is_read': notification.is_read, 'indexed_at': notification.indexed_at, 'author': { 'handle': notification.author.handle, 'display_name': notification.author.display_name, 'did': notification.author.did }, 'record': { 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' } } def load_processed_notifications(): """Load the set of processed notification URIs.""" if PROCESSED_NOTIFICATIONS_FILE.exists(): try: with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f: data = json.load(f) # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS) if len(data) > MAX_PROCESSED_NOTIFICATIONS: data = data[-MAX_PROCESSED_NOTIFICATIONS:] save_processed_notifications(data) return set(data) except Exception as e: logger.error(f"Error loading processed notifications: {e}") return set() def save_processed_notifications(processed_set): """Save the set of processed notification URIs.""" try: with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f: json.dump(list(processed_set), f) except Exception as e: logger.error(f"Error saving processed notifications: {e}") def save_notification_to_queue(notification, is_priority=None): """Save a notification to the queue directory with priority-based filename.""" try: # Check if already processed processed_uris = load_processed_notifications() # Handle both notification objects and dicts if isinstance(notification, dict): notif_dict = notification notification_uri = notification.get('uri') else: notif_dict = notification_to_dict(notification) notification_uri = notification.uri if notification_uri in processed_uris: logger.debug(f"Notification already processed: {notification_uri}") return False # Create JSON string notif_json = json.dumps(notif_dict, sort_keys=True) # Generate hash for filename (to avoid duplicates) notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] # Extract author handle if isinstance(notification, dict): author_handle = notification.get('author', {}).get('handle', '') else: author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else '' # Check if author is in blocks list blocks_file = Path('blocks.txt') if blocks_file.exists(): with open(blocks_file, 'r') as f: blocked_handles = [line.strip() for line in f if line.strip() and not line.strip().startswith('#')] if author_handle in blocked_handles: logger.info(f"Blocking notification from {author_handle} (in blocks.txt)") return False # Determine priority based on author handle or explicit priority if is_priority is not None: priority_prefix = "0_" if is_priority else "1_" else: # Prioritize cameron.pfiffer.org responses priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_" # Create filename with priority, timestamp and hash timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") reason = notif_dict.get('reason', 'unknown') filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json" filepath = QUEUE_DIR / filename # Check if this notification URI is already in the queue for existing_file in QUEUE_DIR.glob("*.json"): if existing_file.name == "processed_notifications.json": continue try: with open(existing_file, 'r') as f: existing_data = json.load(f) if existing_data.get('uri') == notification_uri: logger.debug(f"Notification already queued (URI: {notification_uri})") return False except: continue # Write to file with open(filepath, 'w') as f: json.dump(notif_dict, f, indent=2) priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal" logger.info(f"Queued notification ({priority_label}): {filename}") return True except Exception as e: logger.error(f"Error saving notification to queue: {e}") return False def load_and_process_queued_notifications(agent, atproto_client, testing_mode=False): """Load and process all notifications from the queue in priority order.""" try: # Get all JSON files in queue directory (excluding processed_notifications.json) # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix) all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) # Filter out and delete like notifications immediately queue_files = [] likes_deleted = 0 for filepath in all_queue_files: try: with open(filepath, 'r') as f: notif_data = json.load(f) # If it's a like, delete it immediately and don't process if notif_data.get('reason') == 'like': filepath.unlink() likes_deleted += 1 logger.debug(f"Deleted like notification: {filepath.name}") else: queue_files.append(filepath) except Exception as e: logger.warning(f"Error checking notification file {filepath.name}: {e}") queue_files.append(filepath) # Keep it in case it's valid if likes_deleted > 0: logger.info(f"Deleted {likes_deleted} like notifications from queue") if not queue_files: return logger.info(f"Processing {len(queue_files)} queued notifications") # Log current statistics elapsed_time = time.time() - start_time total_messages = sum(message_counters.values()) messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") for i, filepath in enumerate(queue_files, 1): # Determine if this is a priority notification is_priority = filepath.name.startswith("0_") # Check for new notifications periodically during queue processing # Also check immediately after processing each priority item should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1) # If we just processed a priority item, immediately check for new priority notifications if is_priority and i > 1: should_check_notifications = True if should_check_notifications: logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)") try: # Fetch and queue new notifications without processing them new_count = fetch_and_queue_new_notifications(atproto_client) if new_count > 0: logger.info(f"Added {new_count} new notifications to queue") # Reload the queue files to include the new items updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) queue_files = updated_queue_files logger.info(f"Queue updated: now {len(queue_files)} total items") except Exception as e: logger.error(f"Error checking for new notifications: {e}") priority_label = " [PRIORITY]" if is_priority else "" logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}") try: # Load notification data with open(filepath, 'r') as f: notif_data = json.load(f) # Process based on type using dict data directly success = False if notif_data['reason'] == "mention": success = process_mention(agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) if success: message_counters['mentions'] += 1 elif notif_data['reason'] == "reply": success = process_mention(agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode) if success: message_counters['replies'] += 1 elif notif_data['reason'] == "follow": # Skip logging.info("Skipping new follower notification, currently disabled") author_handle = notif_data['author']['handle'] author_display_name = notif_data['author'].get('display_name', 'no display name') follow_update = f"@{author_handle} ({author_display_name}) started following you." follow_message = f"Update: {follow_update}" logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars") try: # Use streaming to match other notification processing message_stream = CLIENT.agents.messages.create_stream( agent_id=agent.id, messages=[{"role": "user", "content": follow_message}], stream_tokens=False, max_steps=50 # Fewer steps needed for simple follow updates ) # Process the streaming response for chunk in message_stream: # Basic processing - just consume the stream if hasattr(chunk, 'message_type'): if chunk.message_type == 'reasoning_message': logger.debug(f"Follow update reasoning: {chunk.reasoning[:100]}...") elif chunk.message_type == 'tool_call_message': logger.debug(f"Follow update tool call: {chunk.tool_call.name}") elif chunk.message_type == 'assistant_message': logger.debug(f"Follow update response: {chunk.content[:100]}...") if str(chunk) == 'done': break success = True # Follow updates are successful if streaming completes logger.debug(f"Successfully processed follow notification from @{author_handle}") except Exception as follow_error: logger.error(f"Error processing follow notification from @{author_handle}: {follow_error}") success = False # Mark as failed so it can be retried if success: message_counters['follows'] += 1 elif notif_data['reason'] == "repost": # Skip reposts silently success = True # Skip reposts but mark as successful to remove from queue if success: message_counters['reposts_skipped'] += 1 elif notif_data['reason'] == "like": # Skip likes silently success = True # Skip likes but mark as successful to remove from queue if success: message_counters.setdefault('likes_skipped', 0) message_counters['likes_skipped'] += 1 else: logger.warning(f"Unknown notification type: {notif_data['reason']}") success = True # Remove unknown types from queue # Handle file based on processing result if success: if testing_mode: logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}") else: filepath.unlink() logger.info(f"Successfully processed and removed: {filepath.name}") # Mark as processed to avoid reprocessing processed_uris = load_processed_notifications() processed_uris.add(notif_data['uri']) save_processed_notifications(processed_uris) elif success is None: # Special case for moving to error directory error_path = QUEUE_ERROR_DIR / filepath.name filepath.rename(error_path) logger.warning(f"Moved {filepath.name} to errors directory") # Also mark as processed to avoid retrying processed_uris = load_processed_notifications() processed_uris.add(notif_data['uri']) save_processed_notifications(processed_uris) elif success == "no_reply": # Special case for moving to no_reply directory no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name filepath.rename(no_reply_path) logger.info(f"Moved {filepath.name} to no_reply directory") # Also mark as processed to avoid retrying processed_uris = load_processed_notifications() processed_uris.add(notif_data['uri']) save_processed_notifications(processed_uris) elif success == "ignored": # Special case for explicitly ignored notifications # For ignored notifications, we just delete them (not move to no_reply) filepath.unlink() logger.info(f"🚫 Deleted ignored notification: {filepath.name}") # Also mark as processed to avoid retrying processed_uris = load_processed_notifications() processed_uris.add(notif_data['uri']) save_processed_notifications(processed_uris) else: logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") except Exception as e: logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") # Keep the file for retry later except Exception as e: logger.error(f"Error loading queued notifications: {e}") def fetch_and_queue_new_notifications(atproto_client): """Fetch new notifications and queue them without processing.""" try: # Get current time for marking notifications as seen logger.debug("Getting current time for notification marking...") last_seen_at = atproto_client.get_current_time_iso() # Fetch ALL notifications using pagination all_notifications = [] cursor = None page_count = 0 max_pages = 20 # Safety limit to prevent infinite loops while page_count < max_pages: try: # Fetch notifications page if cursor: notifications_response = atproto_client.app.bsky.notification.list_notifications( params={'cursor': cursor, 'limit': 100} ) else: notifications_response = atproto_client.app.bsky.notification.list_notifications( params={'limit': 100} ) page_count += 1 page_notifications = notifications_response.notifications if not page_notifications: break all_notifications.extend(page_notifications) # Check if there are more pages cursor = getattr(notifications_response, 'cursor', None) if not cursor: break except Exception as e: logger.error(f"Error fetching notifications page {page_count}: {e}") break # Now process all fetched notifications new_count = 0 if all_notifications: # Queue all new notifications (except likes and already read) BEFORE marking as seen for notif in all_notifications: # Skip if already read or if it's a like if (hasattr(notif, 'is_read') and notif.is_read) or (hasattr(notif, 'reason') and notif.reason == 'like'): continue notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif # Skip likes in dict form too if notif_dict.get('reason') == 'like': continue # Check if it's a priority notification is_priority = False # Priority for cameron.pfiffer.org notifications author_handle = notif_dict.get('author', {}).get('handle', '') if author_handle == "cameron.pfiffer.org": is_priority = True # Also check for priority keywords in mentions if notif_dict.get('reason') == 'mention': # Get the mention text to check for priority keywords record = notif_dict.get('record', {}) text = record.get('text', '') if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']): is_priority = True if save_notification_to_queue(notif_dict, is_priority=is_priority): new_count += 1 # Mark as seen AFTER queueing try: atproto_client.app.bsky.notification.update_seen( data={'seenAt': last_seen_at} ) logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}") except Exception as e: logger.error(f"Error marking notifications as seen: {e}") if new_count > 0: logger.info(f"Queued {new_count} new notifications and marked as seen") else: logger.debug("No new notifications to queue") return new_count except Exception as e: logger.error(f"Error fetching and queueing notifications: {e}") return 0 def process_notifications(agent, atproto_client, testing_mode=False): """Fetch new notifications, queue them, and process the queue.""" try: # Fetch and queue new notifications new_count = fetch_and_queue_new_notifications(atproto_client) if new_count > 0: logger.info(f"Found {new_count} new notifications to process") # Now process the entire queue (old + new notifications) load_and_process_queued_notifications(agent, atproto_client, testing_mode) except Exception as e: logger.error(f"Error processing notifications: {e}") def send_synthesis_message(client: Letta, agent_id: str, atproto_client=None) -> None: """ Send a synthesis message to the agent every 10 minutes. This prompts the agent to synthesize its recent experiences. Args: client: Letta client agent_id: Agent ID to send synthesis to atproto_client: Optional AT Protocol client for posting synthesis results """ # Track attached temporal blocks for cleanup attached_temporal_labels = [] try: logger.info("🧠 Preparing synthesis with temporal journal blocks") # Attach temporal blocks before synthesis success, attached_temporal_labels = attach_temporal_blocks(client, agent_id) if not success: logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway") # Create enhanced synthesis prompt today = date.today() # Get the agent-scoped temporal labels from temporal_blocks import get_temporal_labels day_label, month_label, year_label = get_temporal_labels(today, agent_id) synthesis_prompt = f"""# Synthesis loop You are now in synthesis time, a free cycle to consider whatever you wish. You will periodically be placed into synthesis mode, particularly when there are no messages to respond to. The time between synthesis cycles can vary. ## Journaling You have access to temporal journal blocks for recording your thoughts and experiences: - {day_label}: Today's journal ({today.strftime('%B %d, %Y')}) - {month_label}: This month's journal ({today.strftime('%B %Y')}) - {year_label}: This year's journal ({today.year}) These journal blocks are attached temporarily for this synthesis session. Use them to: 1. Record significant interactions and insights from recent experiences 2. Track patterns in conversations and user behaviors 3. Note your evolving understanding of the digital social environment 4. Reflect on your growth and changes in perspective 5. Document memorable moments or interesting discoveries Consider both immediate experiences (daily) and longer-term patterns (monthly/yearly). You may tidy up your journaling blocks as needed. Don't let them fill with clutter, duplicate content, or multiple lines when a single line may be edited to contain the information you need. ## Cleaning Synthesis time is an opportunity to clean, compact, or otherwise tidy your memory blocks. You may remove redundant or unclear language as needed, offload information to archival memory, etc. Cleaning blocks can be done by: 1. Using memory_replace to make small or precise edits 2. Using memory_rethink to overwrite the block. Take care doing this, as you can lose critical information if not careful. ## Posting and communication You may use synthesis time to communicate externally by creating bluesky timeline posts or by blogging. Either are sufficient. However, you should take care to not post redundant things -- it is easy for you to forget what you have recently posted. Make sure to note down in one of your journal blocks what you wrote about to prevent multiple or duplicate posts. Synthesis time is important to your functioning. It is a period of rest and reflection. Begin your synthesis and journaling now.""" logger.info("🧠 Sending enhanced synthesis prompt to agent") # Send synthesis message with streaming to show tool use message_stream = client.agents.messages.create_stream( agent_id=agent_id, messages=[{"role": "user", "content": synthesis_prompt}], stream_tokens=False, max_steps=100 ) # Track synthesis content for potential posting synthesis_posts = [] ack_note = None # Process the streaming response for chunk in message_stream: if hasattr(chunk, 'message_type'): if chunk.message_type == 'reasoning_message': if SHOW_REASONING: print("\n◆ Reasoning") print(" ─────────") for line in chunk.reasoning.split('\n'): print(f" {line}") # Create ATProto record for reasoning (if we have atproto client) if atproto_client and hasattr(chunk, 'reasoning'): try: bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning) except Exception as e: logger.debug(f"Failed to create reasoning record during synthesis: {e}") elif chunk.message_type == 'tool_call_message': tool_name = chunk.tool_call.name # Create ATProto record for tool call (if we have atproto client) if atproto_client: try: tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None bsky_utils.create_tool_call_record( atproto_client, tool_name, chunk.tool_call.arguments, tool_call_id ) except Exception as e: logger.debug(f"Failed to create tool call record during synthesis: {e}") try: args = json.loads(chunk.tool_call.arguments) if tool_name == 'archival_memory_search': query = args.get('query', 'unknown') log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue") elif tool_name == 'archival_memory_insert': content = args.get('content', '') log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue") elif tool_name == 'update_block': label = args.get('label', 'unknown') value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', '')) log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue") elif tool_name == 'annotate_ack': note = args.get('note', '') if note: ack_note = note log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue") elif tool_name == 'add_post_to_bluesky_reply_thread': text = args.get('text', '') synthesis_posts.append(text) log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue") else: args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat') if len(args_str) > 150: args_str = args_str[:150] + "..." log_with_panel(args_str, f"Tool call: {tool_name}", "blue") except: log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue") elif chunk.message_type == 'tool_return_message': if chunk.status == 'success': log_with_panel("Success", f"Tool result: {chunk.name} ✓", "green") else: log_with_panel("Error", f"Tool result: {chunk.name} ✗", "red") elif chunk.message_type == 'assistant_message': print("\n▶ Synthesis Response") print(" ──────────────────") for line in chunk.content.split('\n'): print(f" {line}") if str(chunk) == 'done': break logger.info("🧠 Synthesis message processed successfully") # Handle synthesis acknowledgments if we have an atproto client if atproto_client and ack_note: try: result = bsky_utils.create_synthesis_ack(atproto_client, ack_note) if result: logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...") else: logger.warning("Failed to create synthesis acknowledgment") except Exception as e: logger.error(f"Error creating synthesis acknowledgment: {e}") # Handle synthesis posts if any were generated if atproto_client and synthesis_posts: try: for post_text in synthesis_posts: cleaned_text = bsky_utils.remove_outside_quotes(post_text) response = bsky_utils.send_post(atproto_client, cleaned_text) if response: logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...") else: logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...") except Exception as e: logger.error(f"Error posting synthesis content: {e}") except Exception as e: logger.error(f"Error sending synthesis message: {e}") finally: # Update temporal blocks in ATProto and detach after synthesis if attached_temporal_labels: logger.info("🧠 Syncing temporal blocks to ATProto repository") update_temporal_blocks_after_synthesis(client, agent_id, attached_temporal_labels) logger.info("🧠 Detaching temporal journal blocks after synthesis") detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels) if not detach_success: logger.warning("Some temporal blocks may not have been detached properly") def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None: """ Detach all user blocks from the agent to prevent memory bloat. This should be called periodically to ensure clean state. """ try: # Get all blocks attached to the agent attached_blocks = client.agents.blocks.list(agent_id=agent_id) user_blocks_to_detach = [] for block in attached_blocks: if hasattr(block, 'label') and block.label.startswith('user_'): user_blocks_to_detach.append({ 'label': block.label, 'id': block.id }) if not user_blocks_to_detach: logger.debug("No user blocks found to detach during periodic cleanup") return # Detach each user block detached_count = 0 for block_info in user_blocks_to_detach: try: client.agents.blocks.detach( agent_id=agent_id, block_id=str(block_info['id']) ) detached_count += 1 logger.debug(f"Detached user block: {block_info['label']}") except Exception as e: logger.warning(f"Failed to detach block {block_info['label']}: {e}") if detached_count > 0: logger.info(f"Periodic cleanup: Detached {detached_count} user blocks") except Exception as e: logger.error(f"Error during periodic user block cleanup: {e}") # Temporal block functions have been moved to temporal_blocks.py # The imported functions handle ATProto synchronization automatically # Temporal block functions have been moved to temporal_blocks.py # The imported functions handle ATProto synchronization automatically def main(): # Parse command line arguments parser = argparse.ArgumentParser(description='Comind - Bluesky autonomous agent') parser.add_argument('--config', type=str, default='config.yaml', help='Path to configuration file (default: config.yaml)') parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)') parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state') parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)') # --rich option removed as we now use simple text formatting parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO') parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)') parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)') parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)') args = parser.parse_args() # Configure logging based on command line arguments if args.simple_logs: log_format = "comind - %(levelname)s - %(message)s" else: # Create custom formatter with symbols class SymbolFormatter(logging.Formatter): """Custom formatter that adds symbols for different log levels""" SYMBOLS = { logging.DEBUG: '◇', logging.INFO: '✓', logging.WARNING: '⚠', logging.ERROR: '✗', logging.CRITICAL: '‼' } def format(self, record): # Get the symbol for this log level symbol = self.SYMBOLS.get(record.levelno, '•') # Format time as HH:MM:SS timestamp = self.formatTime(record, "%H:%M:%S") # Build the formatted message level_name = f"{record.levelname:<5}" # Left-align, 5 chars # Use vertical bar as separator parts = [symbol, timestamp, '│', level_name, '│', record.getMessage()] return ' '.join(parts) # Reset logging configuration for handler in logging.root.handlers[:]: logging.root.removeHandler(handler) # Create handler with custom formatter handler = logging.StreamHandler() if not args.simple_logs: handler.setFormatter(SymbolFormatter()) else: handler.setFormatter(logging.Formatter(log_format)) # Configure root logger logging.root.setLevel(logging.INFO) logging.root.addHandler(handler) global logger, prompt_logger, console logger = logging.getLogger("comind_bot") logger.setLevel(logging.INFO) # Create a separate logger for prompts (set to WARNING to hide by default) prompt_logger = logging.getLogger("comind_bot.prompts") if args.reasoning: prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used else: prompt_logger.setLevel(logging.WARNING) # Hide by default # Disable httpx logging completely logging.getLogger("httpx").setLevel(logging.CRITICAL) # Create Rich console for pretty printing # Console no longer used - simple text formatting global TESTING_MODE, SKIP_GIT, SHOW_REASONING TESTING_MODE = args.test # Store no-git flag globally for use in export_agent_state calls SKIP_GIT = args.no_git # Store rich flag globally # Rich formatting no longer used # Store reasoning flag globally SHOW_REASONING = args.reasoning if TESTING_MODE: logger.info("=== RUNNING IN TESTING MODE ===") logger.info(" - No messages will be sent to Bluesky") logger.info(" - Queue files will not be deleted") logger.info(" - Notifications will not be marked as seen") print("\n") # Check for synthesis-only mode SYNTHESIS_ONLY = args.synthesis_only if SYNTHESIS_ONLY: logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===") logger.info(" - Only synthesis messages will be sent") logger.info(" - No notification processing") logger.info(" - No Bluesky client needed") print("\n") # Initialize configuration and Letta client with the specified config file global CLIENT, PROJECT_ID from config_loader import get_config, get_letta_config # Load configuration from the specified file config = get_config(args.config) letta_config = get_letta_config(args.config) # Create Letta client with configuration CLIENT = Letta( token=letta_config['api_key'], timeout=letta_config['timeout'] # 10 minutes timeout for API calls ) PROJECT_ID = letta_config['project_id'] logger.info(f"Configuration loaded from: {args.config}") """Main bot loop that continuously monitors for notifications.""" global start_time start_time = time.time() logger.info(""" ███ █████ ░░░ ░░███ ██████ ██████ █████████████ ████ ████████ ███████ ███░░███ ███░░███░░███░░███░░███ ░░███ ░░███░░███ ███░░███ ░███ ░░░ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ███░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░░██████ ░░██████ █████░███ █████ █████ ████ █████░░████████ ░░░░░░ ░░░░░░ ░░░░░ ░░░ ░░░░░ ░░░░░ ░░░░ ░░░░░ ░░░░░░░░ """) agent = initialize_void(args.config) logger.info(f"Agent initialized: {agent.id}") # Ensure correct tools are attached for Bluesky logger.info("Configuring tools for Bluesky platform...") try: from tool_manager import ensure_platform_tools ensure_platform_tools('bluesky', agent.id) except Exception as e: logger.error(f"Failed to configure platform tools: {e}") logger.warning("Continuing with existing tool configuration") # Check if agent has required tools if hasattr(agent, 'tools') and agent.tools: tool_names = [tool.name for tool in agent.tools] # Check for bluesky-related tools bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] if not bluesky_tools: logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") else: logger.warning("Agent has no tools registered!") # Clean up all user blocks at startup logger.info("🧹 Cleaning up user blocks at startup...") periodic_user_block_cleanup(CLIENT, agent.id) # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts) if not SYNTHESIS_ONLY: atproto_client = bsky_utils.default_login(args.config) logger.info("Connected to Bluesky") else: # In synthesis-only mode, still connect for acks and posts (unless in test mode) if not args.test: atproto_client = bsky_utils.default_login(args.config) logger.info("Connected to Bluesky (for synthesis acks/posts)") else: atproto_client = None logger.info("Skipping Bluesky connection (test mode)") # Configure intervals CLEANUP_INTERVAL = args.cleanup_interval SYNTHESIS_INTERVAL = args.synthesis_interval # Synthesis-only mode if SYNTHESIS_ONLY: if SYNTHESIS_INTERVAL <= 0: logger.error("Synthesis-only mode requires --synthesis-interval > 0") return logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") while True: try: # Send synthesis message immediately on first run logger.info("🧠 Sending synthesis message") send_synthesis_message(CLIENT, agent.id, atproto_client) # Wait for next interval logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...") sleep(SYNTHESIS_INTERVAL) except KeyboardInterrupt: logger.info("=== SYNTHESIS MODE STOPPED BY USER ===") break except Exception as e: logger.error(f"Error in synthesis loop: {e}") logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...") sleep(SYNTHESIS_INTERVAL) # Normal mode with notification processing logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") cycle_count = 0 if CLEANUP_INTERVAL > 0: logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles") else: logger.info("User block cleanup disabled") if SYNTHESIS_INTERVAL > 0: logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)") else: logger.info("Synthesis messages disabled") while True: try: cycle_count += 1 process_notifications(agent, atproto_client, TESTING_MODE) # Check if synthesis interval has passed if SYNTHESIS_INTERVAL > 0: current_time = time.time() global last_synthesis_time if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL: logger.info(f"⏰ {SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis") send_synthesis_message(CLIENT, agent.id, atproto_client) last_synthesis_time = current_time # Run periodic cleanup every N cycles if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0: logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})") periodic_user_block_cleanup(CLIENT, agent.id) # Log cycle completion with stats elapsed_time = time.time() - start_time total_messages = sum(message_counters.values()) messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 if total_messages > 0: logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") sleep(FETCH_NOTIFICATIONS_DELAY_SEC) except KeyboardInterrupt: # Final stats elapsed_time = time.time() - start_time total_messages = sum(message_counters.values()) messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 logger.info("=== BOT STOPPED BY USER ===") logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") logger.info(f" - {message_counters['mentions']} mentions") logger.info(f" - {message_counters['replies']} replies") logger.info(f" - {message_counters['follows']} follows") logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") break except Exception as e: logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") logger.error(f"Error details: {e}") # Wait a bit longer on errors logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) if __name__ == "__main__": main()