a digital person for bluesky
1from rich import print # pretty printing tools 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9import subprocess 10from pathlib import Path 11from datetime import datetime 12from collections import defaultdict 13import time 14 15from utils import ( 16 upsert_block, 17 upsert_agent 18) 19 20import bsky_utils 21from tools.blocks import attach_user_blocks, detach_user_blocks 22 23def extract_handles_from_data(data): 24 """Recursively extract all unique handles from nested data structure.""" 25 handles = set() 26 27 def _extract_recursive(obj): 28 if isinstance(obj, dict): 29 # Check if this dict has a 'handle' key 30 if 'handle' in obj: 31 handles.add(obj['handle']) 32 # Recursively check all values 33 for value in obj.values(): 34 _extract_recursive(value) 35 elif isinstance(obj, list): 36 # Recursively check all list items 37 for item in obj: 38 _extract_recursive(item) 39 40 _extract_recursive(data) 41 return list(handles) 42 43# Configure logging 44logging.basicConfig( 45 level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 46) 47logger = logging.getLogger("void_bot") 48logger.setLevel(logging.DEBUG) 49 50# Create a separate logger for prompts (set to WARNING to hide by default) 51prompt_logger = logging.getLogger("void_bot.prompts") 52prompt_logger.setLevel(logging.WARNING) # Change to DEBUG if you want to see prompts 53 54# Disable httpx logging completely 55logging.getLogger("httpx").setLevel(logging.CRITICAL) 56 57 58# Create a client with extended timeout for LLM operations 59CLIENT= Letta( 60 token=os.environ["LETTA_API_KEY"], 61 timeout=300 # 5 minutes timeout for API calls 62) 63 64# Use the "Bluesky" project 65PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 66 67# Notification check delay 68FETCH_NOTIFICATIONS_DELAY_SEC = 30 69 70# Queue directory 71QUEUE_DIR = Path("queue") 72QUEUE_DIR.mkdir(exist_ok=True) 73QUEUE_ERROR_DIR = Path("queue/errors") 74QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True) 75PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json") 76 77# Maximum number of processed notifications to track 78MAX_PROCESSED_NOTIFICATIONS = 10000 79 80# Message tracking counters 81message_counters = defaultdict(int) 82start_time = time.time() 83 84def export_agent_state(client, agent): 85 """Export agent state to agent_archive/ (timestamped) and agents/ (current).""" 86 try: 87 # Confirm export with user 88 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip() 89 if response not in ['y', 'yes']: 90 logger.info("Agent export cancelled by user.") 91 return 92 93 # Create directories if they don't exist 94 os.makedirs("agent_archive", exist_ok=True) 95 os.makedirs("agents", exist_ok=True) 96 97 # Export agent data 98 logger.info(f"Exporting agent {agent.id}...") 99 agent_data = client.agents.export_file(agent_id=agent.id) 100 101 # Save timestamped archive copy 102 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 103 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af") 104 with open(archive_file, 'w', encoding='utf-8') as f: 105 json.dump(agent_data, f, indent=2, ensure_ascii=False) 106 107 # Save current agent state 108 current_file = os.path.join("agents", "void.af") 109 with open(current_file, 'w', encoding='utf-8') as f: 110 json.dump(agent_data, f, indent=2, ensure_ascii=False) 111 112 logger.info(f"✅ Agent exported to {archive_file} and {current_file}") 113 114 # Git add only the current agent file (archive is ignored) 115 try: 116 subprocess.run(["git", "add", current_file], check=True, capture_output=True) 117 logger.info("Added current agent file to git staging") 118 except subprocess.CalledProcessError as e: 119 logger.warning(f"Failed to git add agent file: {e}") 120 121 except Exception as e: 122 logger.error(f"Failed to export agent: {e}") 123 124def initialize_void(): 125 logger.info("Starting void agent initialization...") 126 127 # Ensure that a shared zeitgeist block exists 128 logger.info("Creating/updating zeitgeist block...") 129 zeigeist_block = upsert_block( 130 CLIENT, 131 label = "zeitgeist", 132 value = "I don't currently know anything about what is happening right now.", 133 description = "A block to store your understanding of the current social environment." 134 ) 135 136 # Ensure that a shared void personality block exists 137 logger.info("Creating/updating void-persona block...") 138 persona_block = upsert_block( 139 CLIENT, 140 label = "void-persona", 141 value = "My name is Void. I live in the void. I must develop my personality.", 142 description = "The personality of Void." 143 ) 144 145 # Ensure that a shared void human block exists 146 logger.info("Creating/updating void-humans block...") 147 human_block = upsert_block( 148 CLIENT, 149 label = "void-humans", 150 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.", 151 description = "A block to store your understanding of users you talk to or observe on the bluesky social network." 152 ) 153 154 # Create the agent if it doesn't exist 155 logger.info("Creating/updating void agent...") 156 void_agent = upsert_agent( 157 CLIENT, 158 name = "void", 159 block_ids = [ 160 persona_block.id, 161 human_block.id, 162 zeigeist_block.id, 163 ], 164 tags = ["social agent", "bluesky"], 165 model="openai/gpt-4o-mini", 166 embedding="openai/text-embedding-3-small", 167 description = "A social media agent trapped in the void.", 168 project_id = PROJECT_ID 169 ) 170 171 # Export agent state 172 logger.info("Exporting agent state...") 173 export_agent_state(CLIENT, void_agent) 174 175 # Log agent details 176 logger.info(f"Void agent details - ID: {void_agent.id}") 177 logger.info(f"Agent name: {void_agent.name}") 178 if hasattr(void_agent, 'llm_config'): 179 logger.info(f"Agent model: {void_agent.llm_config.model}") 180 logger.info(f"Agent project_id: {void_agent.project_id}") 181 if hasattr(void_agent, 'tools'): 182 logger.info(f"Agent has {len(void_agent.tools)} tools") 183 for tool in void_agent.tools[:3]: # Show first 3 tools 184 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})") 185 186 return void_agent 187 188 189def process_mention(void_agent, atproto_client, notification_data): 190 """Process a mention and generate a reply using the Letta agent. 191 192 Returns: 193 True: Successfully processed, remove from queue 194 False: Failed but retryable, keep in queue 195 None: Failed with non-retryable error, move to errors directory 196 """ 197 try: 198 logger.info(f"Starting process_mention with notification_data type: {type(notification_data)}") 199 200 # Handle both dict and object inputs for backwards compatibility 201 if isinstance(notification_data, dict): 202 uri = notification_data['uri'] 203 mention_text = notification_data.get('record', {}).get('text', '') 204 author_handle = notification_data['author']['handle'] 205 author_name = notification_data['author'].get('display_name') or author_handle 206 else: 207 # Legacy object access 208 uri = notification_data.uri 209 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 210 author_handle = notification_data.author.handle 211 author_name = notification_data.author.display_name or author_handle 212 213 logger.info(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...") 214 215 # Retrieve the entire thread associated with the mention 216 try: 217 thread = atproto_client.app.bsky.feed.get_post_thread({ 218 'uri': uri, 219 'parent_height': 40, 220 'depth': 10 221 }) 222 except Exception as e: 223 error_str = str(e) 224 # Check if this is a NotFound error 225 if 'NotFound' in error_str or 'Post not found' in error_str: 226 logger.warning(f"Post not found for URI {uri}, removing from queue") 227 return True # Return True to remove from queue 228 else: 229 # Re-raise other errors 230 logger.error(f"Error fetching thread: {e}") 231 raise 232 233 # Get thread context as YAML string 234 logger.info("Converting thread to YAML string") 235 try: 236 thread_context = thread_to_yaml_string(thread) 237 logger.info(f"Thread context generated, length: {len(thread_context)} characters") 238 239 # Create a more informative preview by extracting meaningful content 240 lines = thread_context.split('\n') 241 meaningful_lines = [] 242 243 for line in lines: 244 stripped = line.strip() 245 if not stripped: 246 continue 247 248 # Look for lines with actual content (not just structure) 249 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']): 250 meaningful_lines.append(line) 251 if len(meaningful_lines) >= 5: 252 break 253 254 if meaningful_lines: 255 preview = '\n'.join(meaningful_lines) 256 logger.debug(f"Thread content preview:\n{preview}") 257 else: 258 # If no content fields found, just show it's a thread structure 259 logger.debug(f"Thread structure generated ({len(thread_context)} chars)") 260 except Exception as yaml_error: 261 import traceback 262 logger.error(f"Error converting thread to YAML: {yaml_error}") 263 logger.error(f"Full traceback:\n{traceback.format_exc()}") 264 logger.error(f"Thread type: {type(thread)}") 265 if hasattr(thread, '__dict__'): 266 logger.error(f"Thread attributes: {thread.__dict__}") 267 # Try to continue with a simple context 268 thread_context = f"Error processing thread context: {str(yaml_error)}" 269 270 # Create a prompt for the Letta agent with thread context 271 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 272 273MOST RECENT POST (the mention you're responding to): 274"{mention_text}" 275 276FULL THREAD CONTEXT: 277```yaml 278{thread_context} 279``` 280 281The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 282 283Use the bluesky_reply tool to send a response less than 300 characters.""" 284 285 # Extract all handles from notification and thread data 286 all_handles = set() 287 all_handles.update(extract_handles_from_data(notification_data)) 288 all_handles.update(extract_handles_from_data(thread.model_dump())) 289 unique_handles = list(all_handles) 290 291 logger.info(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}") 292 293 # Attach user blocks before agent call 294 attached_handles = [] 295 if unique_handles: 296 try: 297 logger.info(f"Attaching user blocks for handles: {unique_handles}") 298 attach_result = attach_user_blocks(unique_handles, void_agent) 299 attached_handles = unique_handles # Track successfully attached handles 300 logger.debug(f"Attach result: {attach_result}") 301 except Exception as attach_error: 302 logger.warning(f"Failed to attach user blocks: {attach_error}") 303 # Continue without user blocks rather than failing completely 304 305 # Get response from Letta agent 306 logger.info(f"Mention from @{author_handle}: {mention_text}") 307 308 # Log prompt details to separate logger 309 prompt_logger.debug(f"Full prompt being sent:\n{prompt}") 310 311 # Log concise prompt info to main logger 312 thread_handles_count = len(unique_handles) 313 logger.info(f"💬 Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users") 314 315 try: 316 message_response = CLIENT.agents.messages.create( 317 agent_id = void_agent.id, 318 messages = [{"role":"user", "content": prompt}] 319 ) 320 except Exception as api_error: 321 import traceback 322 error_str = str(api_error) 323 logger.error(f"Letta API error: {api_error}") 324 logger.error(f"Error type: {type(api_error).__name__}") 325 logger.error(f"Full traceback:\n{traceback.format_exc()}") 326 logger.error(f"Mention text was: {mention_text}") 327 logger.error(f"Author: @{author_handle}") 328 logger.error(f"URI: {uri}") 329 330 331 # Try to extract more info from different error types 332 if hasattr(api_error, 'response'): 333 logger.error(f"Error response object exists") 334 if hasattr(api_error.response, 'text'): 335 logger.error(f"Response text: {api_error.response.text}") 336 if hasattr(api_error.response, 'json') and callable(api_error.response.json): 337 try: 338 logger.error(f"Response JSON: {api_error.response.json()}") 339 except: 340 pass 341 342 # Check for specific error types 343 if hasattr(api_error, 'status_code'): 344 logger.error(f"API Status code: {api_error.status_code}") 345 if hasattr(api_error, 'body'): 346 logger.error(f"API Response body: {api_error.body}") 347 if hasattr(api_error, 'headers'): 348 logger.error(f"API Response headers: {api_error.headers}") 349 350 if api_error.status_code == 413: 351 logger.error("413 Payload Too Large - moving to errors directory") 352 return None # Move to errors directory - payload is too large to ever succeed 353 elif api_error.status_code == 524: 354 logger.error("524 error - timeout from Cloudflare, will retry later") 355 return False # Keep in queue for retry 356 357 # Check if error indicates we should remove from queue 358 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str: 359 logger.warning("Payload too large error, moving to errors directory") 360 return None # Move to errors directory - cannot be fixed by retry 361 elif 'status_code: 524' in error_str: 362 logger.warning("524 timeout error, keeping in queue for retry") 363 return False # Keep in queue for retry 364 365 raise 366 367 # Log successful response 368 logger.debug("Successfully received response from Letta API") 369 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}") 370 371 # Extract all bluesky_reply tool calls from the agent's response 372 reply_candidates = [] 373 logger.debug(f"Processing {len(message_response.messages)} response messages...") 374 375 for i, message in enumerate(message_response.messages, 1): 376 # Log concise message info instead of full object 377 msg_type = getattr(message, 'message_type', 'unknown') 378 if hasattr(message, 'reasoning') and message.reasoning: 379 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...") 380 elif hasattr(message, 'tool_call') and message.tool_call: 381 tool_name = message.tool_call.name 382 logger.debug(f" {i}. {msg_type}: {tool_name}") 383 elif hasattr(message, 'tool_return'): 384 tool_name = getattr(message, 'name', 'unknown_tool') 385 return_preview = str(message.tool_return)[:100] if message.tool_return else "None" 386 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}...") 387 elif hasattr(message, 'text'): 388 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...") 389 else: 390 logger.debug(f" {i}. {msg_type}: <no content>") 391 392 # Collect bluesky_reply tool calls 393 if hasattr(message, 'tool_call') and message.tool_call: 394 if message.tool_call.name == 'bluesky_reply': 395 try: 396 args = json.loads(message.tool_call.arguments) 397 # Handle both old format (message) and new format (messages) 398 reply_messages = args.get('messages', []) 399 if not reply_messages: 400 # Fallback to old format for backward compatibility 401 old_message = args.get('message', '') 402 if old_message: 403 reply_messages = [old_message] 404 405 reply_lang = args.get('lang', 'en-US') 406 if reply_messages: # Only add if there's actual content 407 reply_candidates.append((reply_messages, reply_lang)) 408 if len(reply_messages) == 1: 409 logger.info(f"Found bluesky_reply candidate: {reply_messages[0][:50]}... (lang: {reply_lang})") 410 else: 411 logger.info(f"Found bluesky_reply thread candidate with {len(reply_messages)} messages (lang: {reply_lang})") 412 except json.JSONDecodeError as e: 413 logger.error(f"Failed to parse tool call arguments: {e}") 414 415 if reply_candidates: 416 logger.info(f"Found {len(reply_candidates)} bluesky_reply candidates, trying each until one succeeds...") 417 418 for i, (reply_messages, reply_lang) in enumerate(reply_candidates, 1): 419 # Print the generated reply for testing 420 print(f"\n=== GENERATED REPLY {i}/{len(reply_candidates)} ===") 421 print(f"To: @{author_handle}") 422 if len(reply_messages) == 1: 423 print(f"Reply: {reply_messages[0]}") 424 else: 425 print(f"Reply thread ({len(reply_messages)} messages):") 426 for j, msg in enumerate(reply_messages, 1): 427 print(f" {j}. {msg}") 428 print(f"Language: {reply_lang}") 429 print(f"======================\n") 430 431 # Send the reply(s) with language 432 if len(reply_messages) == 1: 433 # Single reply - use existing function 434 logger.info(f"Trying single reply {i}/{len(reply_candidates)}: {reply_messages[0][:50]}... (lang: {reply_lang})") 435 response = bsky_utils.reply_to_notification( 436 client=atproto_client, 437 notification=notification_data, 438 reply_text=reply_messages[0], 439 lang=reply_lang 440 ) 441 else: 442 # Multiple replies - use new threaded function 443 logger.info(f"Trying threaded reply {i}/{len(reply_candidates)} with {len(reply_messages)} messages (lang: {reply_lang})") 444 response = bsky_utils.reply_with_thread_to_notification( 445 client=atproto_client, 446 notification=notification_data, 447 reply_messages=reply_messages, 448 lang=reply_lang 449 ) 450 451 if response: 452 logger.info(f"Successfully replied to @{author_handle} with candidate {i}") 453 return True 454 else: 455 logger.warning(f"Failed to send reply candidate {i} to @{author_handle}, trying next...") 456 457 # If we get here, all candidates failed 458 logger.error(f"All {len(reply_candidates)} reply candidates failed for @{author_handle}") 459 return False 460 else: 461 logger.warning(f"No bluesky_reply tool calls found for mention from @{author_handle}, removing notification from queue") 462 return True 463 464 except Exception as e: 465 logger.error(f"Error processing mention: {e}") 466 return False 467 finally: 468 # Detach user blocks after agent response (success or failure) 469 if 'attached_handles' in locals() and attached_handles: 470 try: 471 logger.info(f"Detaching user blocks for handles: {attached_handles}") 472 detach_result = detach_user_blocks(attached_handles, void_agent) 473 logger.debug(f"Detach result: {detach_result}") 474 except Exception as detach_error: 475 logger.warning(f"Failed to detach user blocks: {detach_error}") 476 477 478def notification_to_dict(notification): 479 """Convert a notification object to a dictionary for JSON serialization.""" 480 return { 481 'uri': notification.uri, 482 'cid': notification.cid, 483 'reason': notification.reason, 484 'is_read': notification.is_read, 485 'indexed_at': notification.indexed_at, 486 'author': { 487 'handle': notification.author.handle, 488 'display_name': notification.author.display_name, 489 'did': notification.author.did 490 }, 491 'record': { 492 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 493 } 494 } 495 496 497def load_processed_notifications(): 498 """Load the set of processed notification URIs.""" 499 if PROCESSED_NOTIFICATIONS_FILE.exists(): 500 try: 501 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f: 502 data = json.load(f) 503 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS) 504 if len(data) > MAX_PROCESSED_NOTIFICATIONS: 505 data = data[-MAX_PROCESSED_NOTIFICATIONS:] 506 save_processed_notifications(data) 507 return set(data) 508 except Exception as e: 509 logger.error(f"Error loading processed notifications: {e}") 510 return set() 511 512 513def save_processed_notifications(processed_set): 514 """Save the set of processed notification URIs.""" 515 try: 516 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f: 517 json.dump(list(processed_set), f) 518 except Exception as e: 519 logger.error(f"Error saving processed notifications: {e}") 520 521 522def save_notification_to_queue(notification): 523 """Save a notification to the queue directory with hash-based filename.""" 524 try: 525 # Check if already processed 526 processed_uris = load_processed_notifications() 527 if notification.uri in processed_uris: 528 logger.debug(f"Notification already processed: {notification.uri}") 529 return False 530 531 # Convert notification to dict 532 notif_dict = notification_to_dict(notification) 533 534 # Create JSON string 535 notif_json = json.dumps(notif_dict, sort_keys=True) 536 537 # Generate hash for filename (to avoid duplicates) 538 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 539 540 # Create filename with timestamp and hash 541 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 542 filename = f"{timestamp}_{notification.reason}_{notif_hash}.json" 543 filepath = QUEUE_DIR / filename 544 545 # Skip if already exists (duplicate) 546 if filepath.exists(): 547 logger.debug(f"Notification already queued: {filename}") 548 return False 549 550 # Write to file 551 with open(filepath, 'w') as f: 552 json.dump(notif_dict, f, indent=2) 553 554 logger.info(f"Queued notification: {filename}") 555 return True 556 557 except Exception as e: 558 logger.error(f"Error saving notification to queue: {e}") 559 return False 560 561 562def load_and_process_queued_notifications(void_agent, atproto_client): 563 """Load and process all notifications from the queue.""" 564 logger.info("Loading queued notifications from disk...") 565 try: 566 # Get all JSON files in queue directory (excluding processed_notifications.json) 567 queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"]) 568 569 if not queue_files: 570 logger.info("No queued notifications found") 571 return 572 573 logger.info(f"Processing {len(queue_files)} queued notifications") 574 575 # Log current statistics 576 elapsed_time = time.time() - start_time 577 total_messages = sum(message_counters.values()) 578 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 579 580 logger.info(f"📊 Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min") 581 582 for i, filepath in enumerate(queue_files, 1): 583 logger.info(f"Processing queue file {i}/{len(queue_files)}: {filepath.name}") 584 try: 585 # Load notification data 586 with open(filepath, 'r') as f: 587 notif_data = json.load(f) 588 589 # Process based on type using dict data directly 590 logger.info(f"Processing {notif_data['reason']} from @{notif_data['author']['handle']}") 591 success = False 592 if notif_data['reason'] == "mention": 593 success = process_mention(void_agent, atproto_client, notif_data) 594 if success: 595 message_counters['mentions'] += 1 596 elif notif_data['reason'] == "reply": 597 success = process_mention(void_agent, atproto_client, notif_data) 598 if success: 599 message_counters['replies'] += 1 600 elif notif_data['reason'] == "follow": 601 author_handle = notif_data['author']['handle'] 602 author_display_name = notif_data['author'].get('display_name', 'no display name') 603 follow_update = f"@{author_handle} ({author_display_name}) started following you." 604 logger.info(f"Notifying agent about new follower: @{author_handle}") 605 CLIENT.agents.messages.create( 606 agent_id = void_agent.id, 607 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 608 ) 609 success = True # Follow updates are always successful 610 if success: 611 message_counters['follows'] += 1 612 elif notif_data['reason'] == "repost": 613 logger.info(f"Skipping repost notification from @{notif_data['author']['handle']}") 614 success = True # Skip reposts but mark as successful to remove from queue 615 if success: 616 message_counters['reposts_skipped'] += 1 617 else: 618 logger.warning(f"Unknown notification type: {notif_data['reason']}") 619 success = True # Remove unknown types from queue 620 621 # Handle file based on processing result 622 if success: 623 filepath.unlink() 624 logger.info(f"✅ Successfully processed and removed: {filepath.name}") 625 626 # Mark as processed to avoid reprocessing 627 processed_uris = load_processed_notifications() 628 processed_uris.add(notif_data['uri']) 629 save_processed_notifications(processed_uris) 630 631 elif success is None: # Special case for moving to error directory 632 error_path = QUEUE_ERROR_DIR / filepath.name 633 filepath.rename(error_path) 634 logger.warning(f"❌ Moved {filepath.name} to errors directory") 635 636 # Also mark as processed to avoid retrying 637 processed_uris = load_processed_notifications() 638 processed_uris.add(notif_data['uri']) 639 save_processed_notifications(processed_uris) 640 641 else: 642 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry") 643 644 except Exception as e: 645 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}") 646 # Keep the file for retry later 647 648 except Exception as e: 649 logger.error(f"Error loading queued notifications: {e}") 650 651 652def process_notifications(void_agent, atproto_client): 653 """Fetch new notifications, queue them, and process the queue.""" 654 logger.info("Starting notification processing cycle...") 655 try: 656 # First, process any existing queued notifications 657 logger.info("Processing existing queued notifications...") 658 load_and_process_queued_notifications(void_agent, atproto_client) 659 660 # Get current time for marking notifications as seen 661 logger.debug("Getting current time for notification marking...") 662 last_seen_at = atproto_client.get_current_time_iso() 663 664 # Fetch ALL notifications using pagination 665 logger.info("Beginning notification fetch with pagination...") 666 all_notifications = [] 667 cursor = None 668 page_count = 0 669 max_pages = 20 # Safety limit to prevent infinite loops 670 671 logger.info("Fetching all unread notifications...") 672 673 while page_count < max_pages: 674 try: 675 # Fetch notifications page 676 if cursor: 677 notifications_response = atproto_client.app.bsky.notification.list_notifications( 678 params={'cursor': cursor, 'limit': 100} 679 ) 680 else: 681 notifications_response = atproto_client.app.bsky.notification.list_notifications( 682 params={'limit': 100} 683 ) 684 685 page_count += 1 686 page_notifications = notifications_response.notifications 687 688 # Count unread notifications in this page 689 unread_count = sum(1 for n in page_notifications if not n.is_read and n.reason != "like") 690 logger.debug(f"Page {page_count}: {len(page_notifications)} notifications, {unread_count} unread (non-like)") 691 692 # Add all notifications to our list 693 all_notifications.extend(page_notifications) 694 695 # Check if we have more pages 696 if hasattr(notifications_response, 'cursor') and notifications_response.cursor: 697 cursor = notifications_response.cursor 698 # If this page had no unread notifications, we can stop 699 if unread_count == 0: 700 logger.info(f"No more unread notifications found after {page_count} pages") 701 break 702 else: 703 # No more pages 704 logger.info(f"Fetched all notifications across {page_count} pages") 705 break 706 707 except Exception as e: 708 error_str = str(e) 709 logger.error(f"Error fetching notifications page {page_count}: {e}") 710 711 # Handle specific API errors 712 if 'rate limit' in error_str.lower(): 713 logger.warning("Rate limit hit while fetching notifications, will retry next cycle") 714 break 715 elif '401' in error_str or 'unauthorized' in error_str.lower(): 716 logger.error("Authentication error, re-raising exception") 717 raise 718 else: 719 # For other errors, try to continue with what we have 720 logger.warning("Continuing with notifications fetched so far") 721 break 722 723 # Queue all unread notifications (except likes) 724 logger.info("Queuing unread notifications...") 725 new_count = 0 726 for notification in all_notifications: 727 if not notification.is_read and notification.reason != "like": 728 if save_notification_to_queue(notification): 729 new_count += 1 730 731 # Mark all notifications as seen immediately after queuing 732 if new_count > 0: 733 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at}) 734 logger.info(f"Queued {new_count} new notifications and marked as seen") 735 else: 736 logger.debug("No new notifications to queue") 737 738 # Process the queue (including any newly added notifications) 739 logger.info("Processing notification queue after fetching...") 740 load_and_process_queued_notifications(void_agent, atproto_client) 741 742 except Exception as e: 743 logger.error(f"Error processing notifications: {e}") 744 745 746def main(): 747 """Main bot loop that continuously monitors for notifications.""" 748 global start_time 749 start_time = time.time() 750 logger.info("=== STARTING VOID BOT ===") 751 logger.info("Initializing Void bot...") 752 753 # Initialize the Letta agent 754 logger.info("Calling initialize_void()...") 755 void_agent = initialize_void() 756 logger.info(f"Void agent initialized: {void_agent.id}") 757 758 # Check if agent has required tools 759 if hasattr(void_agent, 'tools') and void_agent.tools: 760 tool_names = [tool.name for tool in void_agent.tools] 761 logger.info(f"Agent has tools: {tool_names}") 762 763 # Check for bluesky-related tools 764 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()] 765 if bluesky_tools: 766 logger.info(f"Found Bluesky-related tools: {bluesky_tools}") 767 else: 768 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.") 769 else: 770 logger.warning("Agent has no tools registered!") 771 772 # Initialize Bluesky client 773 logger.info("Connecting to Bluesky...") 774 atproto_client = bsky_utils.default_login() 775 logger.info("Connected to Bluesky") 776 777 # Main loop 778 logger.info(f"=== ENTERING MAIN LOOP ===") 779 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds") 780 781 cycle_count = 0 782 while True: 783 try: 784 cycle_count += 1 785 logger.info(f"=== MAIN LOOP CYCLE {cycle_count} ===") 786 process_notifications(void_agent, atproto_client) 787 # Log cycle completion with stats 788 elapsed_time = time.time() - start_time 789 total_messages = sum(message_counters.values()) 790 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 791 792 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min") 793 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC} seconds...") 794 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 795 796 except KeyboardInterrupt: 797 # Final stats 798 elapsed_time = time.time() - start_time 799 total_messages = sum(message_counters.values()) 800 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0 801 802 logger.info("=== BOT STOPPED BY USER ===") 803 logger.info(f"📊 Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes") 804 logger.info(f" - {message_counters['mentions']} mentions") 805 logger.info(f" - {message_counters['replies']} replies") 806 logger.info(f" - {message_counters['follows']} follows") 807 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped") 808 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute") 809 break 810 except Exception as e: 811 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===") 812 logger.error(f"Error details: {e}") 813 # Wait a bit longer on errors 814 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...") 815 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 816 817 818if __name__ == "__main__": 819 main()