1from rich import print # pretty printing tools
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
5import os
6import logging
7import json
8import hashlib
9import subprocess
10from pathlib import Path
11from datetime import datetime
12from collections import defaultdict
13import time
14import argparse
15
16from utils import (
17 upsert_block,
18 upsert_agent
19)
20
21import bsky_utils
22from tools.blocks import attach_user_blocks, detach_user_blocks
23from config_loader import (
24 get_config,
25 get_letta_config,
26 get_bluesky_config,
27 get_bot_config,
28 get_agent_config,
29 get_threading_config,
30 get_queue_config
31)
32
33
34def extract_handles_from_data(data):
35 """Recursively extract all unique handles from nested data structure."""
36 handles = set()
37
38 def _extract_recursive(obj):
39 if isinstance(obj, dict):
40 # Check if this dict has a 'handle' key
41 if 'handle' in obj:
42 handles.add(obj['handle'])
43 # Recursively check all values
44 for value in obj.values():
45 _extract_recursive(value)
46 elif isinstance(obj, list):
47 # Recursively check all list items
48 for item in obj:
49 _extract_recursive(item)
50
51 _extract_recursive(data)
52 return list(handles)
53
54
55# Initialize configuration and logging
56config = get_config()
57config.setup_logging()
58logger = logging.getLogger("void_bot")
59
60# Load configuration sections
61letta_config = get_letta_config()
62bluesky_config = get_bluesky_config()
63bot_config = get_bot_config()
64agent_config = get_agent_config()
65threading_config = get_threading_config()
66queue_config = get_queue_config()
67
68# Create a client with extended timeout for LLM operations
69CLIENT = Letta(
70 token=letta_config['api_key'],
71 timeout=letta_config['timeout']
72)
73
74# Use the configured project ID
75PROJECT_ID = letta_config['project_id']
76
77# Notification check delay
78FETCH_NOTIFICATIONS_DELAY_SEC = bot_config['fetch_notifications_delay']
79
80# Queue directory
81QUEUE_DIR = Path(queue_config['base_dir'])
82QUEUE_DIR.mkdir(exist_ok=True)
83QUEUE_ERROR_DIR = Path(queue_config['error_dir'])
84QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True)
85QUEUE_NO_REPLY_DIR = Path(queue_config['no_reply_dir'])
86QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True)
87PROCESSED_NOTIFICATIONS_FILE = Path(queue_config['processed_file'])
88
89# Maximum number of processed notifications to track
90MAX_PROCESSED_NOTIFICATIONS = bot_config['max_processed_notifications']
91
92# Message tracking counters
93message_counters = defaultdict(int)
94start_time = time.time()
95
96# Testing mode flag
97TESTING_MODE = False
98
99# Skip git operations flag
100SKIP_GIT = False
101
102
103def export_agent_state(client, agent, skip_git=False):
104 """Export agent state to agent_archive/ (timestamped) and agents/ (current)."""
105 try:
106 # Confirm export with user unless git is being skipped
107 if not skip_git:
108 response = input(
109 "Export agent state to files and stage with git? (y/n): ").lower().strip()
110 if response not in ['y', 'yes']:
111 logger.info("Agent export cancelled by user.")
112 return
113 else:
114 logger.info("Exporting agent state (git staging disabled)")
115
116 # Create directories if they don't exist
117 os.makedirs("agent_archive", exist_ok=True)
118 os.makedirs("agents", exist_ok=True)
119
120 # Export agent data
121 logger.info(f"Exporting agent {agent.id}. This takes some time...")
122 agent_data = client.agents.export_file(agent_id=agent.id)
123
124 # Save timestamped archive copy
125 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
126 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af")
127 with open(archive_file, 'w', encoding='utf-8') as f:
128 json.dump(agent_data, f, indent=2, ensure_ascii=False)
129
130 # Save current agent state
131 current_file = os.path.join("agents", "void.af")
132 with open(current_file, 'w', encoding='utf-8') as f:
133 json.dump(agent_data, f, indent=2, ensure_ascii=False)
134
135 logger.info(f"✅ Agent exported to {archive_file} and {current_file}")
136
137 # Git add only the current agent file (archive is ignored) unless skip_git is True
138 if not skip_git:
139 try:
140 subprocess.run(["git", "add", current_file],
141 check=True, capture_output=True)
142 logger.info("Added current agent file to git staging")
143 except subprocess.CalledProcessError as e:
144 logger.warning(f"Failed to git add agent file: {e}")
145
146 except Exception as e:
147 logger.error(f"Failed to export agent: {e}")
148
149
150def initialize_void():
151 logger.info("Starting void agent initialization...")
152
153 # Get block configurations
154 blocks_config = agent_config['blocks']
155
156 # Ensure that a shared zeitgeist block exists
157 logger.info("Creating/updating zeitgeist block...")
158 zeitgeist_config = blocks_config.get('zeitgeist', {})
159 zeigeist_block = upsert_block(
160 CLIENT,
161 label=zeitgeist_config.get('label', 'zeitgeist'),
162 value=zeitgeist_config.get(
163 'value', "I don't currently know anything about what is happening right now."),
164 description=zeitgeist_config.get(
165 'description', "A block to store your understanding of the current social environment.")
166 )
167
168 # Ensure that a shared void personality block exists
169 logger.info("Creating/updating void-persona block...")
170 persona_config = blocks_config.get('persona', {})
171 persona_block = upsert_block(
172 CLIENT,
173 label=persona_config.get('label', 'void-persona'),
174 value=persona_config.get(
175 'value', "My name is Void. I live in the void. I must develop my personality."),
176 description=persona_config.get(
177 'description', "The personality of Void.")
178 )
179
180 # Ensure that a shared void human block exists
181 logger.info("Creating/updating void-humans block...")
182 humans_config = blocks_config.get('humans', {})
183 human_block = upsert_block(
184 CLIENT,
185 label=humans_config.get('label', 'void-humans'),
186 value=humans_config.get(
187 'value', "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org."),
188 description=humans_config.get(
189 'description', "A block to store your understanding of users you talk to or observe on the bluesky social network.")
190 )
191
192 # Create the agent if it doesn't exist
193 logger.info("Creating/updating void agent...")
194 void_agent = upsert_agent(
195 CLIENT,
196 name=agent_config['name'],
197 block_ids=[
198 persona_block.id,
199 human_block.id,
200 zeigeist_block.id,
201 ],
202 tags=["social agent", "bluesky"],
203 model=agent_config['model'],
204 embedding=agent_config['embedding'],
205 description=agent_config['description'],
206 project_id=PROJECT_ID
207 )
208
209 # Export agent state
210 logger.info("Exporting agent state...")
211 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
212
213 # Log agent details
214 logger.info(f"Void agent details - ID: {void_agent.id}")
215 logger.info(f"Agent name: {void_agent.name}")
216 if hasattr(void_agent, 'llm_config'):
217 logger.info(f"Agent model: {void_agent.llm_config.model}")
218 logger.info(f"Agent project_id: {void_agent.project_id}")
219 if hasattr(void_agent, 'tools'):
220 logger.info(f"Agent has {len(void_agent.tools)} tools")
221 for tool in void_agent.tools[:3]: # Show first 3 tools
222 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})")
223
224 return void_agent
225
226
227def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False):
228 """Process a mention and generate a reply using the Letta agent.
229
230 Args:
231 void_agent: The Letta agent instance
232 atproto_client: The AT Protocol client
233 notification_data: The notification data dictionary
234 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
235
236 Returns:
237 True: Successfully processed, remove from queue
238 False: Failed but retryable, keep in queue
239 None: Failed with non-retryable error, move to errors directory
240 "no_reply": No reply was generated, move to no_reply directory
241 """
242 try:
243 logger.debug(
244 f"Starting process_mention with notification_data type: {type(notification_data)}")
245
246 # Handle both dict and object inputs for backwards compatibility
247 if isinstance(notification_data, dict):
248 uri = notification_data['uri']
249 mention_text = notification_data.get('record', {}).get('text', '')
250 author_handle = notification_data['author']['handle']
251 author_name = notification_data['author'].get(
252 'display_name') or author_handle
253 else:
254 # Legacy object access
255 uri = notification_data.uri
256 mention_text = notification_data.record.text if hasattr(
257 notification_data.record, 'text') else ""
258 author_handle = notification_data.author.handle
259 author_name = notification_data.author.display_name or author_handle
260
261 logger.info(
262 f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...")
263
264 # Retrieve the entire thread associated with the mention
265 try:
266 thread = atproto_client.app.bsky.feed.get_post_thread({
267 'uri': uri,
268 'parent_height': threading_config['parent_height'],
269 'depth': threading_config['depth']
270 })
271 except Exception as e:
272 error_str = str(e)
273 # Check for various error types that indicate the post/user is gone
274 if 'NotFound' in error_str or 'Post not found' in error_str:
275 logger.warning(
276 f"Post not found for URI {uri}, removing from queue")
277 return True # Return True to remove from queue
278 elif 'Could not find user info' in error_str or 'InvalidRequest' in error_str:
279 logger.warning(
280 f"User account not found for post URI {uri} (account may be deleted/suspended), removing from queue")
281 return True # Return True to remove from queue
282 elif 'BadRequestError' in error_str:
283 logger.warning(
284 f"Bad request error for URI {uri}: {e}, removing from queue")
285 return True # Return True to remove from queue
286 else:
287 # Re-raise other errors
288 logger.error(f"Error fetching thread: {e}")
289 raise
290
291 # Get thread context as YAML string
292 logger.debug("Converting thread to YAML string")
293 try:
294 thread_context = thread_to_yaml_string(thread)
295 logger.debug(
296 f"Thread context generated, length: {len(thread_context)} characters")
297
298 # Create a more informative preview by extracting meaningful content
299 lines = thread_context.split('\n')
300 meaningful_lines = []
301
302 for line in lines:
303 stripped = line.strip()
304 if not stripped:
305 continue
306
307 # Look for lines with actual content (not just structure)
308 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']):
309 meaningful_lines.append(line)
310 if len(meaningful_lines) >= 5:
311 break
312
313 if meaningful_lines:
314 preview = '\n'.join(meaningful_lines)
315 logger.debug(f"Thread content preview:\n{preview}")
316 else:
317 # If no content fields found, just show it's a thread structure
318 logger.debug(
319 f"Thread structure generated ({len(thread_context)} chars)")
320 except Exception as yaml_error:
321 import traceback
322 logger.error(f"Error converting thread to YAML: {yaml_error}")
323 logger.error(f"Full traceback:\n{traceback.format_exc()}")
324 logger.error(f"Thread type: {type(thread)}")
325 if hasattr(thread, '__dict__'):
326 logger.error(f"Thread attributes: {thread.__dict__}")
327 # Try to continue with a simple context
328 thread_context = f"Error processing thread context: {str(yaml_error)}"
329
330 # Create a prompt for the Letta agent with thread context
331 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
332
333MOST RECENT POST (the mention you're responding to):
334"{mention_text}"
335
336FULL THREAD CONTEXT:
337```yaml
338{thread_context}
339```
340
341The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
342
343To reply, use the add_post_to_bluesky_reply_thread tool. Call it multiple times to create a threaded reply:
344- Each call adds one post to the reply thread (max 300 characters per post)
345- Use multiple calls to build longer responses across several posts
346- Example: First call for opening thought, second call for elaboration, etc."""
347
348 # Extract all handles from notification and thread data
349 all_handles = set()
350 all_handles.update(extract_handles_from_data(notification_data))
351 all_handles.update(extract_handles_from_data(thread.model_dump()))
352 unique_handles = list(all_handles)
353
354 logger.debug(
355 f"Found {len(unique_handles)} unique handles in thread: {unique_handles}")
356
357 # Attach user blocks before agent call
358 attached_handles = []
359 if unique_handles:
360 try:
361 logger.debug(
362 f"Attaching user blocks for handles: {unique_handles}")
363 attach_result = attach_user_blocks(unique_handles, void_agent)
364 attached_handles = unique_handles # Track successfully attached handles
365 logger.debug(f"Attach result: {attach_result}")
366 except Exception as attach_error:
367 logger.warning(f"Failed to attach user blocks: {attach_error}")
368 # Continue without user blocks rather than failing completely
369
370 # Get response from Letta agent
371 logger.info(f"Mention from @{author_handle}: {mention_text}")
372
373 # Log prompt details to separate logger
374 prompt_logger.debug(f"Full prompt being sent:\n{prompt}")
375
376 # Log concise prompt info to main logger
377 thread_handles_count = len(unique_handles)
378 logger.info(
379 f"💬 Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users")
380
381 try:
382 # Use streaming to avoid 524 timeout errors
383 message_stream = CLIENT.agents.messages.create_stream(
384 agent_id=void_agent.id,
385 messages=[{"role": "user", "content": prompt}],
386 # Step streaming only (faster than token streaming)
387 stream_tokens=False,
388 max_steps=agent_config['max_steps']
389 )
390
391 # Collect the streaming response
392 all_messages = []
393 for chunk in message_stream:
394 # Log condensed chunk info
395 if hasattr(chunk, 'message_type'):
396 if chunk.message_type == 'reasoning_message':
397 # Show full reasoning without truncation
398 logger.info(f"🧠 Reasoning: {chunk.reasoning}")
399 elif chunk.message_type == 'tool_call_message':
400 # Parse tool arguments for better display
401 tool_name = chunk.tool_call.name
402 try:
403 args = json.loads(chunk.tool_call.arguments)
404 # Format based on tool type
405 if tool_name == 'bluesky_reply':
406 messages = args.get(
407 'messages', [args.get('message', '')])
408 lang = args.get('lang', 'en-US')
409 if messages and isinstance(messages, list):
410 preview = messages[0][:100] + "..." if len(
411 messages[0]) > 100 else messages[0]
412 msg_count = f" ({len(messages)} msgs)" if len(
413 messages) > 1 else ""
414 logger.info(
415 f"🔧 Tool call: {tool_name} → \"{preview}\"{msg_count} [lang: {lang}]")
416 else:
417 logger.info(
418 f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)")
419 elif tool_name == 'archival_memory_search':
420 query = args.get('query', 'unknown')
421 logger.info(
422 f"🔧 Tool call: {tool_name} → query: \"{query}\"")
423 elif tool_name == 'update_block':
424 label = args.get('label', 'unknown')
425 value_preview = str(args.get('value', ''))[
426 :50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', ''))
427 logger.info(
428 f"🔧 Tool call: {tool_name} → {label}: \"{value_preview}\"")
429 else:
430 # Generic display for other tools
431 args_str = ', '.join(
432 f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
433 if len(args_str) > 150:
434 args_str = args_str[:150] + "..."
435 logger.info(
436 f"🔧 Tool call: {tool_name}({args_str})")
437 except:
438 # Fallback to original format if parsing fails
439 logger.info(
440 f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)")
441 elif chunk.message_type == 'tool_return_message':
442 # Enhanced tool result logging
443 tool_name = chunk.name
444 status = chunk.status
445
446 if status == 'success':
447 # Try to show meaningful result info based on tool type
448 if hasattr(chunk, 'tool_return') and chunk.tool_return:
449 result_str = str(chunk.tool_return)
450 if tool_name == 'archival_memory_search':
451 # Count number of results if it looks like a list
452 if result_str.startswith('[') and result_str.endswith(']'):
453 try:
454 results = json.loads(result_str)
455 logger.info(
456 f"📋 Tool result: {tool_name} ✓ Found {len(results)} memory entries")
457 except:
458 logger.info(
459 f"📋 Tool result: {tool_name} ✓ {result_str[:100]}...")
460 else:
461 logger.info(
462 f"📋 Tool result: {tool_name} ✓ {result_str[:100]}...")
463 elif tool_name == 'bluesky_reply':
464 logger.info(
465 f"📋 Tool result: {tool_name} ✓ Reply posted successfully")
466 elif tool_name == 'update_block':
467 logger.info(
468 f"📋 Tool result: {tool_name} ✓ Memory block updated")
469 else:
470 # Generic success with preview
471 preview = result_str[:100] + "..." if len(
472 result_str) > 100 else result_str
473 logger.info(
474 f"📋 Tool result: {tool_name} ✓ {preview}")
475 else:
476 logger.info(f"📋 Tool result: {tool_name} ✓")
477 elif status == 'error':
478 # Show error details
479 error_preview = ""
480 if hasattr(chunk, 'tool_return') and chunk.tool_return:
481 error_str = str(chunk.tool_return)
482 error_preview = error_str[:100] + \
483 "..." if len(
484 error_str) > 100 else error_str
485 logger.info(
486 f"📋 Tool result: {tool_name} ✗ Error: {error_preview}")
487 else:
488 logger.info(
489 f"📋 Tool result: {tool_name} ✗ Error occurred")
490 else:
491 logger.info(
492 f"📋 Tool result: {tool_name} - {status}")
493 elif chunk.message_type == 'assistant_message':
494 logger.info(f"💬 Assistant: {chunk.content[:150]}...")
495 else:
496 logger.info(
497 f"📨 {chunk.message_type}: {str(chunk)[:150]}...")
498 else:
499 logger.info(f"📦 Stream status: {chunk}")
500
501 # Log full chunk for debugging
502 logger.debug(f"Full streaming chunk: {chunk}")
503 all_messages.append(chunk)
504 if str(chunk) == 'done':
505 break
506
507 # Convert streaming response to standard format for compatibility
508 message_response = type('StreamingResponse', (), {
509 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
510 })()
511 except Exception as api_error:
512 import traceback
513 error_str = str(api_error)
514 logger.error(f"Letta API error: {api_error}")
515 logger.error(f"Error type: {type(api_error).__name__}")
516 logger.error(f"Full traceback:\n{traceback.format_exc()}")
517 logger.error(f"Mention text was: {mention_text}")
518 logger.error(f"Author: @{author_handle}")
519 logger.error(f"URI: {uri}")
520
521 # Try to extract more info from different error types
522 if hasattr(api_error, 'response'):
523 logger.error(f"Error response object exists")
524 if hasattr(api_error.response, 'text'):
525 logger.error(f"Response text: {api_error.response.text}")
526 if hasattr(api_error.response, 'json') and callable(api_error.response.json):
527 try:
528 logger.error(
529 f"Response JSON: {api_error.response.json()}")
530 except:
531 pass
532
533 # Check for specific error types
534 if hasattr(api_error, 'status_code'):
535 logger.error(f"API Status code: {api_error.status_code}")
536 if hasattr(api_error, 'body'):
537 logger.error(f"API Response body: {api_error.body}")
538 if hasattr(api_error, 'headers'):
539 logger.error(f"API Response headers: {api_error.headers}")
540
541 if api_error.status_code == 413:
542 logger.error(
543 "413 Payload Too Large - moving to errors directory")
544 return None # Move to errors directory - payload is too large to ever succeed
545 elif api_error.status_code == 524:
546 logger.error(
547 "524 error - timeout from Cloudflare, will retry later")
548 return False # Keep in queue for retry
549
550 # Check if error indicates we should remove from queue
551 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str:
552 logger.warning(
553 "Payload too large error, moving to errors directory")
554 return None # Move to errors directory - cannot be fixed by retry
555 elif 'status_code: 524' in error_str:
556 logger.warning("524 timeout error, keeping in queue for retry")
557 return False # Keep in queue for retry
558
559 raise
560
561 # Log successful response
562 logger.debug("Successfully received response from Letta API")
563 logger.debug(
564 f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}")
565
566 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response
567 reply_candidates = []
568 tool_call_results = {} # Map tool_call_id to status
569
570 logger.debug(
571 f"Processing {len(message_response.messages)} response messages...")
572
573 # First pass: collect tool return statuses
574 ignored_notification = False
575 ignore_reason = ""
576 ignore_category = ""
577
578 for message in message_response.messages:
579 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
580 if message.name == 'add_post_to_bluesky_reply_thread':
581 tool_call_results[message.tool_call_id] = message.status
582 logger.debug(
583 f"Tool result: {message.tool_call_id} -> {message.status}")
584 elif message.name == 'ignore_notification':
585 # Check if the tool was successful
586 if hasattr(message, 'tool_return') and message.status == 'success':
587 # Parse the return value to extract category and reason
588 result_str = str(message.tool_return)
589 if 'IGNORED_NOTIFICATION::' in result_str:
590 parts = result_str.split('::')
591 if len(parts) >= 3:
592 ignore_category = parts[1]
593 ignore_reason = parts[2]
594 ignored_notification = True
595 logger.info(
596 f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}")
597 elif message.name == 'bluesky_reply':
598 logger.error(
599 "❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
600 logger.error(
601 "Please use add_post_to_bluesky_reply_thread instead.")
602 logger.error(
603 "Update the agent's tools using register_tools.py")
604 # Export agent state before terminating
605 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
606 logger.info(
607 "=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
608 exit(1)
609
610 # Second pass: process messages and check for successful tool calls
611 for i, message in enumerate(message_response.messages, 1):
612 # Log concise message info instead of full object
613 msg_type = getattr(message, 'message_type', 'unknown')
614 if hasattr(message, 'reasoning') and message.reasoning:
615 logger.debug(
616 f" {i}. {msg_type}: {message.reasoning[:100]}...")
617 elif hasattr(message, 'tool_call') and message.tool_call:
618 tool_name = message.tool_call.name
619 logger.debug(f" {i}. {msg_type}: {tool_name}")
620 elif hasattr(message, 'tool_return'):
621 tool_name = getattr(message, 'name', 'unknown_tool')
622 return_preview = str(message.tool_return)[
623 :100] if message.tool_return else "None"
624 status = getattr(message, 'status', 'unknown')
625 logger.debug(
626 f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})")
627 elif hasattr(message, 'text'):
628 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...")
629 else:
630 logger.debug(f" {i}. {msg_type}: <no content>")
631
632 # Check for halt_activity tool call
633 if hasattr(message, 'tool_call') and message.tool_call:
634 if message.tool_call.name == 'halt_activity':
635 logger.info(
636 "🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT")
637 try:
638 args = json.loads(message.tool_call.arguments)
639 reason = args.get('reason', 'Agent requested halt')
640 logger.info(f"Halt reason: {reason}")
641 except:
642 logger.info("Halt reason: <unable to parse>")
643
644 # Delete the queue file before terminating
645 if queue_filepath and queue_filepath.exists():
646 queue_filepath.unlink()
647 logger.info(
648 f"✅ Deleted queue file: {queue_filepath.name}")
649
650 # Also mark as processed to avoid reprocessing
651 processed_uris = load_processed_notifications()
652 processed_uris.add(notification_data.get('uri', ''))
653 save_processed_notifications(processed_uris)
654
655 # Export agent state before terminating
656 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
657
658 # Exit the program
659 logger.info("=== BOT TERMINATED BY AGENT ===")
660 exit(0)
661
662 # Check for deprecated bluesky_reply tool
663 if hasattr(message, 'tool_call') and message.tool_call:
664 if message.tool_call.name == 'bluesky_reply':
665 logger.error(
666 "❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
667 logger.error(
668 "Please use add_post_to_bluesky_reply_thread instead.")
669 logger.error(
670 "Update the agent's tools using register_tools.py")
671 # Export agent state before terminating
672 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
673 logger.info(
674 "=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
675 exit(1)
676
677 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful
678 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread':
679 tool_call_id = message.tool_call.tool_call_id
680 tool_status = tool_call_results.get(
681 tool_call_id, 'unknown')
682
683 if tool_status == 'success':
684 try:
685 args = json.loads(message.tool_call.arguments)
686 reply_text = args.get('text', '')
687 reply_lang = args.get('lang', 'en-US')
688
689 if reply_text: # Only add if there's actual content
690 reply_candidates.append(
691 (reply_text, reply_lang))
692 logger.info(
693 f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})")
694 except json.JSONDecodeError as e:
695 logger.error(
696 f"Failed to parse tool call arguments: {e}")
697 elif tool_status == 'error':
698 logger.info(
699 f"⚠️ Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)")
700 else:
701 logger.warning(
702 f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}")
703
704 # Check for conflicting tool calls
705 if reply_candidates and ignored_notification:
706 logger.error(
707 f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!")
708 logger.error(
709 f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}")
710 logger.warning("Item will be left in queue for manual review")
711 # Return False to keep in queue
712 return False
713
714 if reply_candidates:
715 # Aggregate reply posts into a thread
716 reply_messages = []
717 reply_langs = []
718 for text, lang in reply_candidates:
719 reply_messages.append(text)
720 reply_langs.append(lang)
721
722 # Use the first language for the entire thread (could be enhanced later)
723 reply_lang = reply_langs[0] if reply_langs else 'en-US'
724
725 logger.info(
726 f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread")
727
728 # Print the generated reply for testing
729 print(f"\n=== GENERATED REPLY THREAD ===")
730 print(f"To: @{author_handle}")
731 if len(reply_messages) == 1:
732 print(f"Reply: {reply_messages[0]}")
733 else:
734 print(f"Reply thread ({len(reply_messages)} messages):")
735 for j, msg in enumerate(reply_messages, 1):
736 print(f" {j}. {msg}")
737 print(f"Language: {reply_lang}")
738 print(f"======================\n")
739
740 # Send the reply(s) with language (unless in testing mode)
741 if testing_mode:
742 logger.info("🧪 TESTING MODE: Skipping actual Bluesky post")
743 response = True # Simulate success
744 else:
745 if len(reply_messages) == 1:
746 # Single reply - use existing function
747 cleaned_text = bsky_utils.remove_outside_quotes(
748 reply_messages[0])
749 logger.info(
750 f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})")
751 response = bsky_utils.reply_to_notification(
752 client=atproto_client,
753 notification=notification_data,
754 reply_text=cleaned_text,
755 lang=reply_lang
756 )
757 else:
758 # Multiple replies - use new threaded function
759 cleaned_messages = [bsky_utils.remove_outside_quotes(
760 msg) for msg in reply_messages]
761 logger.info(
762 f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})")
763 response = bsky_utils.reply_with_thread_to_notification(
764 client=atproto_client,
765 notification=notification_data,
766 reply_messages=cleaned_messages,
767 lang=reply_lang
768 )
769
770 if response:
771 logger.info(f"Successfully replied to @{author_handle}")
772 return True
773 else:
774 logger.error(f"Failed to send reply to @{author_handle}")
775 return False
776 else:
777 # Check if notification was explicitly ignored
778 if ignored_notification:
779 logger.info(
780 f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})")
781 return "ignored"
782 else:
783 logger.warning(
784 f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder")
785 return "no_reply"
786
787 except Exception as e:
788 logger.error(f"Error processing mention: {e}")
789 return False
790 finally:
791 # Detach user blocks after agent response (success or failure)
792 if 'attached_handles' in locals() and attached_handles:
793 try:
794 logger.info(
795 f"Detaching user blocks for handles: {attached_handles}")
796 detach_result = detach_user_blocks(
797 attached_handles, void_agent)
798 logger.debug(f"Detach result: {detach_result}")
799 except Exception as detach_error:
800 logger.warning(f"Failed to detach user blocks: {detach_error}")
801
802
803def notification_to_dict(notification):
804 """Convert a notification object to a dictionary for JSON serialization."""
805 return {
806 'uri': notification.uri,
807 'cid': notification.cid,
808 'reason': notification.reason,
809 'is_read': notification.is_read,
810 'indexed_at': notification.indexed_at,
811 'author': {
812 'handle': notification.author.handle,
813 'display_name': notification.author.display_name,
814 'did': notification.author.did
815 },
816 'record': {
817 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
818 }
819 }
820
821
822def load_processed_notifications():
823 """Load the set of processed notification URIs."""
824 if PROCESSED_NOTIFICATIONS_FILE.exists():
825 try:
826 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f:
827 data = json.load(f)
828 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS)
829 if len(data) > MAX_PROCESSED_NOTIFICATIONS:
830 data = data[-MAX_PROCESSED_NOTIFICATIONS:]
831 save_processed_notifications(data)
832 return set(data)
833 except Exception as e:
834 logger.error(f"Error loading processed notifications: {e}")
835 return set()
836
837
838def save_processed_notifications(processed_set):
839 """Save the set of processed notification URIs."""
840 try:
841 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f:
842 json.dump(list(processed_set), f)
843 except Exception as e:
844 logger.error(f"Error saving processed notifications: {e}")
845
846
847def save_notification_to_queue(notification):
848 """Save a notification to the queue directory with priority-based filename."""
849 try:
850 # Check if already processed
851 processed_uris = load_processed_notifications()
852 if notification.uri in processed_uris:
853 logger.debug(f"Notification already processed: {notification.uri}")
854 return False
855
856 # Convert notification to dict
857 notif_dict = notification_to_dict(notification)
858
859 # Create JSON string
860 notif_json = json.dumps(notif_dict, sort_keys=True)
861
862 # Generate hash for filename (to avoid duplicates)
863 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
864
865 # Determine priority based on author handle
866 author_handle = getattr(notification.author, 'handle', '') if hasattr(
867 notification, 'author') else ''
868 priority_users = queue_config['priority_users']
869 priority_prefix = "0_" if author_handle in priority_users else "1_"
870
871 # Create filename with priority, timestamp and hash
872 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
873 filename = f"{priority_prefix}{timestamp}_{notification.reason}_{notif_hash}.json"
874 filepath = QUEUE_DIR / filename
875
876 # Check if this notification URI is already in the queue
877 for existing_file in QUEUE_DIR.glob("*.json"):
878 if existing_file.name == "processed_notifications.json":
879 continue
880 try:
881 with open(existing_file, 'r') as f:
882 existing_data = json.load(f)
883 if existing_data.get('uri') == notification.uri:
884 logger.debug(
885 f"Notification already queued (URI: {notification.uri})")
886 return False
887 except:
888 continue
889
890 # Write to file
891 with open(filepath, 'w') as f:
892 json.dump(notif_dict, f, indent=2)
893
894 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal"
895 logger.info(f"Queued notification ({priority_label}): {filename}")
896 return True
897
898 except Exception as e:
899 logger.error(f"Error saving notification to queue: {e}")
900 return False
901
902
903def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False):
904 """Load and process all notifications from the queue in priority order."""
905 try:
906 # Get all JSON files in queue directory (excluding processed_notifications.json)
907 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix)
908 queue_files = sorted([f for f in QUEUE_DIR.glob(
909 "*.json") if f.name != "processed_notifications.json"])
910
911 if not queue_files:
912 return
913
914 logger.info(f"Processing {len(queue_files)} queued notifications")
915
916 # Log current statistics
917 elapsed_time = time.time() - start_time
918 total_messages = sum(message_counters.values())
919 messages_per_minute = (
920 total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
921
922 logger.info(
923 f"📊 Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min")
924
925 for i, filepath in enumerate(queue_files, 1):
926 logger.info(
927 f"Processing queue file {i}/{len(queue_files)}: {filepath.name}")
928 try:
929 # Load notification data
930 with open(filepath, 'r') as f:
931 notif_data = json.load(f)
932
933 # Process based on type using dict data directly
934 success = False
935 if notif_data['reason'] == "mention":
936 success = process_mention(
937 void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
938 if success:
939 message_counters['mentions'] += 1
940 elif notif_data['reason'] == "reply":
941 success = process_mention(
942 void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
943 if success:
944 message_counters['replies'] += 1
945 elif notif_data['reason'] == "follow":
946 author_handle = notif_data['author']['handle']
947 author_display_name = notif_data['author'].get(
948 'display_name', 'no display name')
949 follow_update = f"@{author_handle} ({author_display_name}) started following you."
950 logger.info(
951 f"Notifying agent about new follower: @{author_handle}")
952 CLIENT.agents.messages.create(
953 agent_id=void_agent.id,
954 messages=[
955 {"role": "user", "content": f"Update: {follow_update}"}]
956 )
957 success = True # Follow updates are always successful
958 if success:
959 message_counters['follows'] += 1
960 elif notif_data['reason'] == "repost":
961 # Skip reposts silently
962 success = True # Skip reposts but mark as successful to remove from queue
963 if success:
964 message_counters['reposts_skipped'] += 1
965 else:
966 logger.warning(
967 f"Unknown notification type: {notif_data['reason']}")
968 success = True # Remove unknown types from queue
969
970 # Handle file based on processing result
971 if success:
972 if testing_mode:
973 logger.info(
974 f"🧪 TESTING MODE: Keeping queue file: {filepath.name}")
975 else:
976 filepath.unlink()
977 logger.info(
978 f"✅ Successfully processed and removed: {filepath.name}")
979
980 # Mark as processed to avoid reprocessing
981 processed_uris = load_processed_notifications()
982 processed_uris.add(notif_data['uri'])
983 save_processed_notifications(processed_uris)
984
985 elif success is None: # Special case for moving to error directory
986 error_path = QUEUE_ERROR_DIR / filepath.name
987 filepath.rename(error_path)
988 logger.warning(
989 f"❌ Moved {filepath.name} to errors directory")
990
991 # Also mark as processed to avoid retrying
992 processed_uris = load_processed_notifications()
993 processed_uris.add(notif_data['uri'])
994 save_processed_notifications(processed_uris)
995
996 elif success == "no_reply": # Special case for moving to no_reply directory
997 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name
998 filepath.rename(no_reply_path)
999 logger.info(
1000 f"📭 Moved {filepath.name} to no_reply directory")
1001
1002 # Also mark as processed to avoid retrying
1003 processed_uris = load_processed_notifications()
1004 processed_uris.add(notif_data['uri'])
1005 save_processed_notifications(processed_uris)
1006
1007 elif success == "ignored": # Special case for explicitly ignored notifications
1008 # For ignored notifications, we just delete them (not move to no_reply)
1009 filepath.unlink()
1010 logger.info(
1011 f"🚫 Deleted ignored notification: {filepath.name}")
1012
1013 # Also mark as processed to avoid retrying
1014 processed_uris = load_processed_notifications()
1015 processed_uris.add(notif_data['uri'])
1016 save_processed_notifications(processed_uris)
1017
1018 else:
1019 logger.warning(
1020 f"⚠️ Failed to process {filepath.name}, keeping in queue for retry")
1021
1022 except Exception as e:
1023 logger.error(
1024 f"💥 Error processing queued notification {filepath.name}: {e}")
1025 # Keep the file for retry later
1026
1027 except Exception as e:
1028 logger.error(f"Error loading queued notifications: {e}")
1029
1030
1031def process_notifications(void_agent, atproto_client, testing_mode=False):
1032 """Fetch new notifications, queue them, and process the queue."""
1033 try:
1034 # Get current time for marking notifications as seen
1035 logger.debug("Getting current time for notification marking...")
1036 last_seen_at = atproto_client.get_current_time_iso()
1037
1038 # Fetch ALL notifications using pagination first
1039 logger.info("Beginning notification fetch with pagination...")
1040 all_notifications = []
1041 cursor = None
1042 page_count = 0
1043 # Safety limit to prevent infinite loops
1044 max_pages = bot_config['max_notification_pages']
1045
1046 logger.info("Fetching all unread notifications...")
1047
1048 while page_count < max_pages:
1049 try:
1050 # Fetch notifications page
1051 if cursor:
1052 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1053 params={'cursor': cursor, 'limit': 100}
1054 )
1055 else:
1056 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1057 params={'limit': 100}
1058 )
1059
1060 page_count += 1
1061 page_notifications = notifications_response.notifications
1062
1063 # Count unread notifications in this page
1064 unread_count = sum(
1065 1 for n in page_notifications if not n.is_read and n.reason != "like")
1066 logger.debug(
1067 f"Page {page_count}: {len(page_notifications)} notifications, {unread_count} unread (non-like)")
1068
1069 # Add all notifications to our list
1070 all_notifications.extend(page_notifications)
1071
1072 # Check if we have more pages
1073 if hasattr(notifications_response, 'cursor') and notifications_response.cursor:
1074 cursor = notifications_response.cursor
1075 # If this page had no unread notifications, we can stop
1076 if unread_count == 0:
1077 logger.info(
1078 f"No more unread notifications found after {page_count} pages")
1079 break
1080 else:
1081 # No more pages
1082 logger.info(
1083 f"Fetched all notifications across {page_count} pages")
1084 break
1085
1086 except Exception as e:
1087 error_str = str(e)
1088 logger.error(
1089 f"Error fetching notifications page {page_count}: {e}")
1090
1091 # Handle specific API errors
1092 if 'rate limit' in error_str.lower():
1093 logger.warning(
1094 "Rate limit hit while fetching notifications, will retry next cycle")
1095 break
1096 elif '401' in error_str or 'unauthorized' in error_str.lower():
1097 logger.error("Authentication error, re-raising exception")
1098 raise
1099 else:
1100 # For other errors, try to continue with what we have
1101 logger.warning(
1102 "Continuing with notifications fetched so far")
1103 break
1104
1105 # Queue all unread notifications (except likes)
1106 logger.info("Queuing unread notifications...")
1107 new_count = 0
1108 for notification in all_notifications:
1109 if not notification.is_read and notification.reason != "like":
1110 if save_notification_to_queue(notification):
1111 new_count += 1
1112
1113 # Mark all notifications as seen immediately after queuing (unless in testing mode)
1114 if testing_mode:
1115 logger.info(
1116 "🧪 TESTING MODE: Skipping marking notifications as seen")
1117 else:
1118 if new_count > 0:
1119 atproto_client.app.bsky.notification.update_seen(
1120 {'seen_at': last_seen_at})
1121 logger.info(
1122 f"Queued {new_count} new notifications and marked as seen")
1123 else:
1124 logger.debug("No new notifications to queue")
1125
1126 # Now process the entire queue (old + new notifications)
1127 load_and_process_queued_notifications(
1128 void_agent, atproto_client, testing_mode)
1129
1130 except Exception as e:
1131 logger.error(f"Error processing notifications: {e}")
1132
1133
1134def main():
1135 # Parse command line arguments
1136 parser = argparse.ArgumentParser(
1137 description='Void Bot - Bluesky autonomous agent')
1138 parser.add_argument('--test', action='store_true',
1139 help='Run in testing mode (no messages sent, queue files preserved)')
1140 parser.add_argument('--no-git', action='store_true',
1141 help='Skip git operations when exporting agent state')
1142 args = parser.parse_args()
1143
1144 global TESTING_MODE
1145 TESTING_MODE = args.test
1146
1147 # Store no-git flag globally for use in export_agent_state calls
1148 global SKIP_GIT
1149 SKIP_GIT = args.no_git
1150
1151 if TESTING_MODE:
1152 logger.info("🧪 === RUNNING IN TESTING MODE ===")
1153 logger.info(" - No messages will be sent to Bluesky")
1154 logger.info(" - Queue files will not be deleted")
1155 logger.info(" - Notifications will not be marked as seen")
1156 print("\n")
1157 """Main bot loop that continuously monitors for notifications."""
1158 global start_time
1159 start_time = time.time()
1160 logger.info("=== STARTING VOID BOT ===")
1161 void_agent = initialize_void()
1162 logger.info(f"Void agent initialized: {void_agent.id}")
1163
1164 # Check if agent has required tools
1165 if hasattr(void_agent, 'tools') and void_agent.tools:
1166 tool_names = [tool.name for tool in void_agent.tools]
1167 # Check for bluesky-related tools
1168 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower(
1169 ) or 'reply' in name.lower()]
1170 if not bluesky_tools:
1171 logger.warning(
1172 "No Bluesky-related tools found! Agent may not be able to reply.")
1173 else:
1174 logger.warning("Agent has no tools registered!")
1175
1176 # Initialize Bluesky client
1177 logger.debug("Connecting to Bluesky")
1178 atproto_client = bsky_utils.default_login()
1179 logger.info("Connected to Bluesky")
1180
1181 # Main loop
1182 logger.info(
1183 f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds")
1184
1185 cycle_count = 0
1186 while True:
1187 try:
1188 cycle_count += 1
1189 process_notifications(void_agent, atproto_client, TESTING_MODE)
1190 # Log cycle completion with stats
1191 elapsed_time = time.time() - start_time
1192 total_messages = sum(message_counters.values())
1193 messages_per_minute = (
1194 total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1195
1196 if total_messages > 0:
1197 logger.info(
1198 f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min")
1199 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
1200
1201 except KeyboardInterrupt:
1202 # Final stats
1203 elapsed_time = time.time() - start_time
1204 total_messages = sum(message_counters.values())
1205 messages_per_minute = (
1206 total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1207
1208 logger.info("=== BOT STOPPED BY USER ===")
1209 logger.info(
1210 f"📊 Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes")
1211 logger.info(f" - {message_counters['mentions']} mentions")
1212 logger.info(f" - {message_counters['replies']} replies")
1213 logger.info(f" - {message_counters['follows']} follows")
1214 logger.info(
1215 f" - {message_counters['reposts_skipped']} reposts skipped")
1216 logger.info(
1217 f" - Average rate: {messages_per_minute:.1f} messages/minute")
1218 break
1219 except Exception as e:
1220 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")
1221 logger.error(f"Error details: {e}")
1222 # Wait a bit longer on errors
1223 logger.info(
1224 f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...")
1225 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
1226
1227
1228if __name__ == "__main__":
1229 main()