a digital person for bluesky
1# Rich imports removed - using simple text formatting
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
5import os
6import logging
7import json
8import hashlib
9import subprocess
10from pathlib import Path
11from datetime import datetime
12from collections import defaultdict
13import time
14import argparse
15
16from utils import (
17 upsert_block,
18 upsert_agent
19)
20from config_loader import get_letta_config, get_config, get_queue_config
21
22import bsky_utils
23from tools.blocks import attach_user_blocks, detach_user_blocks
24from datetime import date
25from notification_db import NotificationDB
26
27def extract_handles_from_data(data):
28 """Recursively extract all unique handles from nested data structure."""
29 handles = set()
30
31 def _extract_recursive(obj):
32 if isinstance(obj, dict):
33 # Check if this dict has a 'handle' key
34 if 'handle' in obj:
35 handles.add(obj['handle'])
36 # Recursively check all values
37 for value in obj.values():
38 _extract_recursive(value)
39 elif isinstance(obj, list):
40 # Recursively check all list items
41 for item in obj:
42 _extract_recursive(item)
43
44 _extract_recursive(data)
45 return list(handles)
46
47# Logging will be configured after argument parsing
48logger = None
49prompt_logger = None
50# Simple text formatting (Rich no longer used)
51SHOW_REASONING = False
52last_archival_query = "archival memory search"
53
54def log_with_panel(message, title=None, border_color="white"):
55 """Log a message with Unicode box-drawing characters"""
56 if title:
57 # Map old color names to appropriate symbols
58 symbol_map = {
59 "blue": "⚙", # Tool calls
60 "green": "✓", # Success/completion
61 "yellow": "◆", # Reasoning
62 "red": "✗", # Errors
63 "white": "▶", # Default/mentions
64 "cyan": "✎", # Posts
65 }
66 symbol = symbol_map.get(border_color, "▶")
67
68 print(f"\n{symbol} {title}")
69 print(f" {'─' * len(title)}")
70 # Indent message lines
71 for line in message.split('\n'):
72 print(f" {line}")
73 else:
74 print(message)
75
76
77# Load Letta configuration from config.yaml (will be initialized later with custom path if provided)
78letta_config = None
79CLIENT = None
80
81# Notification check delay
82FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response
83
84# Check for new notifications every N queue items
85CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing
86
87# Queue paths (will be initialized from config in main())
88QUEUE_DIR = None
89QUEUE_ERROR_DIR = None
90QUEUE_NO_REPLY_DIR = None
91PROCESSED_NOTIFICATIONS_FILE = None
92
93# Maximum number of processed notifications to track
94MAX_PROCESSED_NOTIFICATIONS = 10000
95
96# Message tracking counters
97message_counters = defaultdict(int)
98start_time = time.time()
99
100# Testing mode flag
101TESTING_MODE = False
102
103# Skip git operations flag
104SKIP_GIT = False
105
106# Synthesis message tracking
107last_synthesis_time = time.time()
108
109# Database for notification tracking
110NOTIFICATION_DB = None
111
112def export_agent_state(client, agent, skip_git=False):
113 """Export agent state to agent_archive/ (timestamped) and agents/ (current)."""
114 try:
115 # Confirm export with user unless git is being skipped
116 if not skip_git:
117 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip()
118 if response not in ['y', 'yes']:
119 logger.info("Agent export cancelled by user.")
120 return
121 else:
122 logger.info("Exporting agent state (git staging disabled)")
123
124 # Create directories if they don't exist
125 os.makedirs("agent_archive", exist_ok=True)
126 os.makedirs("agents", exist_ok=True)
127
128 # Export agent data
129 logger.info(f"Exporting agent {agent.id}. This takes some time...")
130 agent_data = client.agents.export_file(agent_id=agent.id)
131
132 # Save timestamped archive copy
133 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
134 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af")
135 with open(archive_file, 'w', encoding='utf-8') as f:
136 json.dump(agent_data, f, indent=2, ensure_ascii=False)
137
138 # Save current agent state
139 current_file = os.path.join("agents", "void.af")
140 with open(current_file, 'w', encoding='utf-8') as f:
141 json.dump(agent_data, f, indent=2, ensure_ascii=False)
142
143 logger.info(f"Agent exported to {archive_file} and {current_file}")
144
145 # Git add only the current agent file (archive is ignored) unless skip_git is True
146 if not skip_git:
147 try:
148 subprocess.run(["git", "add", current_file], check=True, capture_output=True)
149 logger.info("Added current agent file to git staging")
150 except subprocess.CalledProcessError as e:
151 logger.warning(f"Failed to git add agent file: {e}")
152
153 except Exception as e:
154 logger.error(f"Failed to export agent: {e}")
155
156def initialize_void():
157 logger.info("Starting void agent initialization...")
158
159 # Get the configured void agent by ID
160 logger.info("Loading void agent from config...")
161 agent_id = letta_config['agent_id']
162
163 try:
164 void_agent = CLIENT.agents.retrieve(agent_id=agent_id)
165 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})")
166 except Exception as e:
167 logger.error(f"Failed to load void agent {agent_id}: {e}")
168 logger.error("Please ensure the agent_id in config.yaml is correct")
169 raise e
170
171 # Export agent state
172 logger.info("Exporting agent state...")
173 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
174
175 # Log agent details
176 logger.info(f"Void agent details - ID: {void_agent.id}")
177 logger.info(f"Agent name: {void_agent.name}")
178 if hasattr(void_agent, 'llm_config'):
179 logger.info(f"Agent model: {void_agent.llm_config.model}")
180 if hasattr(void_agent, 'project_id') and void_agent.project_id:
181 logger.info(f"Agent project_id: {void_agent.project_id}")
182 if hasattr(void_agent, 'tools'):
183 logger.info(f"Agent has {len(void_agent.tools)} tools")
184 for tool in void_agent.tools[:3]: # Show first 3 tools
185 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})")
186
187 return void_agent
188
189
190def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False):
191 """Process a mention and generate a reply using the Letta agent.
192
193 Args:
194 void_agent: The Letta agent instance
195 atproto_client: The AT Protocol client
196 notification_data: The notification data dictionary
197 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
198
199 Returns:
200 True: Successfully processed, remove from queue
201 False: Failed but retryable, keep in queue
202 None: Failed with non-retryable error, move to errors directory
203 "no_reply": No reply was generated, move to no_reply directory
204 """
205 import uuid
206
207 # Generate correlation ID for tracking this notification through the pipeline
208 correlation_id = str(uuid.uuid4())[:8]
209
210 try:
211 logger.info(f"[{correlation_id}] Starting process_mention", extra={
212 'correlation_id': correlation_id,
213 'notification_type': type(notification_data).__name__
214 })
215
216 # Handle both dict and object inputs for backwards compatibility
217 if isinstance(notification_data, dict):
218 uri = notification_data['uri']
219 mention_text = notification_data.get('record', {}).get('text', '')
220 author_handle = notification_data['author']['handle']
221 author_name = notification_data['author'].get('display_name') or author_handle
222 else:
223 # Legacy object access
224 uri = notification_data.uri
225 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
226 author_handle = notification_data.author.handle
227 author_name = notification_data.author.display_name or author_handle
228
229 logger.info(f"[{correlation_id}] Processing mention from @{author_handle}", extra={
230 'correlation_id': correlation_id,
231 'author_handle': author_handle,
232 'author_name': author_name,
233 'mention_uri': uri,
234 'mention_text_length': len(mention_text),
235 'mention_preview': mention_text[:100] if mention_text else ''
236 })
237
238 # Retrieve the entire thread associated with the mention
239 try:
240 thread = atproto_client.app.bsky.feed.get_post_thread({
241 'uri': uri,
242 'parent_height': 40,
243 'depth': 10
244 })
245 except Exception as e:
246 error_str = str(e)
247 # Check if this is a NotFound error
248 if 'NotFound' in error_str or 'Post not found' in error_str:
249 logger.warning(f"Post not found for URI {uri}, removing from queue")
250 return True # Return True to remove from queue
251 else:
252 # Re-raise other errors
253 logger.error(f"Error fetching thread: {e}")
254 raise
255
256 # Get thread context as YAML string
257 logger.debug("Converting thread to YAML string")
258 try:
259 thread_context = thread_to_yaml_string(thread)
260 logger.debug(f"Thread context generated, length: {len(thread_context)} characters")
261
262 # Check if #voidstop appears anywhere in the thread
263 if "#voidstop" in thread_context.lower():
264 logger.info("Found #voidstop in thread context, skipping this mention")
265 return True # Return True to remove from queue
266
267 # Also check the mention text directly
268 if "#voidstop" in mention_text.lower():
269 logger.info("Found #voidstop in mention text, skipping this mention")
270 return True # Return True to remove from queue
271
272 # Create a more informative preview by extracting meaningful content
273 lines = thread_context.split('\n')
274 meaningful_lines = []
275
276 for line in lines:
277 stripped = line.strip()
278 if not stripped:
279 continue
280
281 # Look for lines with actual content (not just structure)
282 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']):
283 meaningful_lines.append(line)
284 if len(meaningful_lines) >= 5:
285 break
286
287 if meaningful_lines:
288 preview = '\n'.join(meaningful_lines)
289 logger.debug(f"Thread content preview:\n{preview}")
290 else:
291 # If no content fields found, just show it's a thread structure
292 logger.debug(f"Thread structure generated ({len(thread_context)} chars)")
293 except Exception as yaml_error:
294 import traceback
295 logger.error(f"Error converting thread to YAML: {yaml_error}")
296 logger.error(f"Full traceback:\n{traceback.format_exc()}")
297 logger.error(f"Thread type: {type(thread)}")
298 if hasattr(thread, '__dict__'):
299 logger.error(f"Thread attributes: {thread.__dict__}")
300 # Try to continue with a simple context
301 thread_context = f"Error processing thread context: {str(yaml_error)}"
302
303 # Create a prompt for the Letta agent with thread context
304 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
305
306MOST RECENT POST:
307"{mention_text}"
308
309THREAD CONTEXT:
310```yaml
311{thread_context}
312```
313
314If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call creates one post (max 300 characters). You may use multiple calls to create a thread if needed."""
315
316 # Extract all handles from notification and thread data
317 all_handles = set()
318 all_handles.update(extract_handles_from_data(notification_data))
319 all_handles.update(extract_handles_from_data(thread.model_dump()))
320 unique_handles = list(all_handles)
321
322 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}")
323
324 # Check if any handles are in known_bots list
325 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs
326 import json
327
328 try:
329 # Check for known bots in thread
330 bot_check_result = check_known_bots(unique_handles, void_agent)
331 bot_check_data = json.loads(bot_check_result)
332
333 # TEMPORARILY DISABLED: Bot detection causing issues with normal users
334 # TODO: Re-enable after debugging why normal users are being flagged as bots
335 if False: # bot_check_data.get("bot_detected", False):
336 detected_bots = bot_check_data.get("detected_bots", [])
337 logger.info(f"Bot detected in thread: {detected_bots}")
338
339 # Decide whether to respond (10% chance)
340 if not should_respond_to_bot_thread():
341 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}")
342 # Return False to keep in queue for potential later processing
343 return False
344 else:
345 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}")
346 else:
347 logger.debug("Bot detection disabled - processing all notifications")
348
349 except Exception as bot_check_error:
350 logger.warning(f"Error checking for bots: {bot_check_error}")
351 # Continue processing if bot check fails
352
353 # Attach user blocks before agent call
354 attached_handles = []
355 if unique_handles:
356 try:
357 logger.debug(f"Attaching user blocks for handles: {unique_handles}")
358 attach_result = attach_user_blocks(unique_handles, void_agent)
359 attached_handles = unique_handles # Track successfully attached handles
360 logger.debug(f"Attach result: {attach_result}")
361 except Exception as attach_error:
362 logger.warning(f"Failed to attach user blocks: {attach_error}")
363 # Continue without user blocks rather than failing completely
364
365 # Get response from Letta agent
366 # Format with Unicode characters
367 title = f"MENTION FROM @{author_handle}"
368 print(f"\n▶ {title}")
369 print(f" {'═' * len(title)}")
370 # Indent the mention text
371 for line in mention_text.split('\n'):
372 print(f" {line}")
373
374 # Log prompt details to separate logger
375 prompt_logger.debug(f"Full prompt being sent:\n{prompt}")
376
377 # Log concise prompt info to main logger
378 thread_handles_count = len(unique_handles)
379 prompt_char_count = len(prompt)
380 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars")
381
382 try:
383 # Use streaming to avoid 524 timeout errors
384 message_stream = CLIENT.agents.messages.create_stream(
385 agent_id=void_agent.id,
386 messages=[{"role": "user", "content": prompt}],
387 stream_tokens=False, # Step streaming only (faster than token streaming)
388 max_steps=100
389 )
390
391 # Collect the streaming response
392 all_messages = []
393 for chunk in message_stream:
394 # Log condensed chunk info
395 if hasattr(chunk, 'message_type'):
396 if chunk.message_type == 'reasoning_message':
397 # Show full reasoning without truncation
398 if SHOW_REASONING:
399 # Format with Unicode characters
400 print("\n◆ Reasoning")
401 print(" ─────────")
402 # Indent reasoning lines
403 for line in chunk.reasoning.split('\n'):
404 print(f" {line}")
405 else:
406 # Default log format (only when --reasoning is used due to log level)
407 # Format with Unicode characters
408 print("\n◆ Reasoning")
409 print(" ─────────")
410 # Indent reasoning lines
411 for line in chunk.reasoning.split('\n'):
412 print(f" {line}")
413
414 # Create ATProto record for reasoning (unless in testing mode)
415 if not testing_mode and hasattr(chunk, 'reasoning'):
416 try:
417 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning)
418 except Exception as e:
419 logger.debug(f"Failed to create reasoning record: {e}")
420 elif chunk.message_type == 'tool_call_message':
421 # Parse tool arguments for better display
422 tool_name = chunk.tool_call.name
423
424 # Create ATProto record for tool call (unless in testing mode)
425 if not testing_mode:
426 try:
427 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None
428 bsky_utils.create_tool_call_record(
429 atproto_client,
430 tool_name,
431 chunk.tool_call.arguments,
432 tool_call_id
433 )
434 except Exception as e:
435 logger.debug(f"Failed to create tool call record: {e}")
436
437 try:
438 args = json.loads(chunk.tool_call.arguments)
439 # Format based on tool type
440 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']:
441 # Extract the text being posted
442 text = args.get('text', '')
443 if text:
444 # Format with Unicode characters
445 print("\n✎ Bluesky Post")
446 print(" ────────────")
447 # Indent post text
448 for line in text.split('\n'):
449 print(f" {line}")
450 else:
451 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
452 elif tool_name == 'archival_memory_search':
453 query = args.get('query', 'unknown')
454 global last_archival_query
455 last_archival_query = query
456 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue")
457 elif tool_name == 'archival_memory_insert':
458 content = args.get('content', '')
459 # Show the full content being inserted
460 log_with_panel(content, f"Tool call: {tool_name}", "blue")
461 elif tool_name == 'update_block':
462 label = args.get('label', 'unknown')
463 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', ''))
464 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue")
465 else:
466 # Generic display for other tools
467 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
468 if len(args_str) > 150:
469 args_str = args_str[:150] + "..."
470 log_with_panel(args_str, f"Tool call: {tool_name}", "blue")
471 except:
472 # Fallback to original format if parsing fails
473 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
474 elif chunk.message_type == 'tool_return_message':
475 # Enhanced tool result logging
476 tool_name = chunk.name
477 status = chunk.status
478
479 if status == 'success':
480 # Try to show meaningful result info based on tool type
481 if hasattr(chunk, 'tool_return') and chunk.tool_return:
482 result_str = str(chunk.tool_return)
483 if tool_name == 'archival_memory_search':
484
485 try:
486 # Handle both string and list formats
487 if isinstance(chunk.tool_return, str):
488 # The string format is: "([{...}, {...}], count)"
489 # We need to extract just the list part
490 if chunk.tool_return.strip():
491 # Find the list part between the first [ and last ]
492 start_idx = chunk.tool_return.find('[')
493 end_idx = chunk.tool_return.rfind(']')
494 if start_idx != -1 and end_idx != -1:
495 list_str = chunk.tool_return[start_idx:end_idx+1]
496 # Use ast.literal_eval since this is Python literal syntax, not JSON
497 import ast
498 results = ast.literal_eval(list_str)
499 else:
500 logger.warning("Could not find list in archival_memory_search result")
501 results = []
502 else:
503 logger.warning("Empty string returned from archival_memory_search")
504 results = []
505 else:
506 # If it's already a list, use directly
507 results = chunk.tool_return
508
509 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name} ✓", "green")
510
511 # Use the captured search query from the tool call
512 search_query = last_archival_query
513
514 # Combine all results into a single text block
515 content_text = ""
516 for i, entry in enumerate(results, 1):
517 timestamp = entry.get('timestamp', 'N/A')
518 content = entry.get('content', '')
519 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n"
520
521 # Format with Unicode characters
522 title = f"{search_query} ({len(results)} results)"
523 print(f"\n⚙ {title}")
524 print(f" {'─' * len(title)}")
525 # Indent content text
526 for line in content_text.strip().split('\n'):
527 print(f" {line}")
528
529 except Exception as e:
530 logger.error(f"Error formatting archival memory results: {e}")
531 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name} ✓", "green")
532 elif tool_name == 'add_post_to_bluesky_reply_thread':
533 # Just show success for bluesky posts, the text was already shown in tool call
534 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green")
535 elif tool_name == 'archival_memory_insert':
536 # Skip archival memory insert results (always returns None)
537 pass
538 elif tool_name == 'update_block':
539 log_with_panel("Memory block updated", f"Tool result: {tool_name} ✓", "green")
540 else:
541 # Generic success with preview
542 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str
543 log_with_panel(preview, f"Tool result: {tool_name} ✓", "green")
544 else:
545 log_with_panel("Success", f"Tool result: {tool_name} ✓", "green")
546 elif status == 'error':
547 # Show error details
548 if tool_name == 'add_post_to_bluesky_reply_thread':
549 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred"
550 log_with_panel(error_str, f"Bluesky Post ✗", "red")
551 elif tool_name == 'archival_memory_insert':
552 # Skip archival memory insert errors too
553 pass
554 else:
555 error_preview = ""
556 if hasattr(chunk, 'tool_return') and chunk.tool_return:
557 error_str = str(chunk.tool_return)
558 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str
559 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name} ✗", "red")
560 else:
561 log_with_panel("Error occurred", f"Tool result: {tool_name} ✗", "red")
562 else:
563 logger.info(f"Tool result: {tool_name} - {status}")
564 elif chunk.message_type == 'assistant_message':
565 # Format with Unicode characters
566 print("\n▶ Assistant Response")
567 print(" ──────────────────")
568 # Indent response text
569 for line in chunk.content.split('\n'):
570 print(f" {line}")
571 else:
572 # Filter out verbose message types
573 if chunk.message_type not in ['usage_statistics', 'stop_reason']:
574 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...")
575 else:
576 logger.info(f"📦 Stream status: {chunk}")
577
578 # Log full chunk for debugging
579 logger.debug(f"Full streaming chunk: {chunk}")
580 all_messages.append(chunk)
581 if str(chunk) == 'done':
582 break
583
584 # Convert streaming response to standard format for compatibility
585 message_response = type('StreamingResponse', (), {
586 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
587 })()
588 except Exception as api_error:
589 import traceback
590 error_str = str(api_error)
591 logger.error(f"Letta API error: {api_error}")
592 logger.error(f"Error type: {type(api_error).__name__}")
593 logger.error(f"Full traceback:\n{traceback.format_exc()}")
594 logger.error(f"Mention text was: {mention_text}")
595 logger.error(f"Author: @{author_handle}")
596 logger.error(f"URI: {uri}")
597
598
599 # Try to extract more info from different error types
600 if hasattr(api_error, 'response'):
601 logger.error(f"Error response object exists")
602 if hasattr(api_error.response, 'text'):
603 logger.error(f"Response text: {api_error.response.text}")
604 if hasattr(api_error.response, 'json') and callable(api_error.response.json):
605 try:
606 logger.error(f"Response JSON: {api_error.response.json()}")
607 except:
608 pass
609
610 # Check for specific error types
611 if hasattr(api_error, 'status_code'):
612 logger.error(f"API Status code: {api_error.status_code}")
613 if hasattr(api_error, 'body'):
614 logger.error(f"API Response body: {api_error.body}")
615 if hasattr(api_error, 'headers'):
616 logger.error(f"API Response headers: {api_error.headers}")
617
618 if api_error.status_code == 413:
619 logger.error("413 Payload Too Large - moving to errors directory")
620 return None # Move to errors directory - payload is too large to ever succeed
621 elif api_error.status_code == 524:
622 logger.error("524 error - timeout from Cloudflare, will retry later")
623 return False # Keep in queue for retry
624
625 # Check if error indicates we should remove from queue
626 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str:
627 logger.warning("Payload too large error, moving to errors directory")
628 return None # Move to errors directory - cannot be fixed by retry
629 elif 'status_code: 524' in error_str:
630 logger.warning("524 timeout error, keeping in queue for retry")
631 return False # Keep in queue for retry
632
633 raise
634
635 # Log successful response
636 logger.debug("Successfully received response from Letta API")
637 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}")
638
639 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response
640 reply_candidates = []
641 tool_call_results = {} # Map tool_call_id to status
642 ack_note = None # Track any note from annotate_ack tool
643 flagged_memories = [] # Track memories flagged for deletion
644
645 logger.debug(f"Processing {len(message_response.messages)} response messages...")
646
647 # First pass: collect tool return statuses
648 ignored_notification = False
649 ignore_reason = ""
650 ignore_category = ""
651
652 logger.debug(f"Processing {len(message_response.messages)} messages from agent")
653
654 for message in message_response.messages:
655 # Log detailed message attributes for debugging
656 msg_type = getattr(message, 'message_type', 'unknown')
657 has_tool_call_id = hasattr(message, 'tool_call_id')
658 has_status = hasattr(message, 'status')
659 has_tool_return = hasattr(message, 'tool_return')
660
661 logger.debug(f"Message type={msg_type}, has_tool_call_id={has_tool_call_id}, has_status={has_status}, has_tool_return={has_tool_return}")
662
663 # Tool return messages are identified by having tool_return attribute, tool_call_id, and status
664 if has_tool_call_id and has_status and has_tool_return:
665 logger.debug(f" -> tool_call_id={message.tool_call_id}, status={message.status}")
666
667 # Store the result for ANY tool that has a return - we'll match by tool_call_id later
668 tool_call_results[message.tool_call_id] = message.status
669 logger.debug(f"Stored tool result: {message.tool_call_id} -> {message.status}")
670
671 # Handle special processing for ignore_notification
672 if message.status == 'success':
673 result_str = str(message.tool_return) if message.tool_return else ""
674 if 'IGNORED_NOTIFICATION::' in result_str:
675 parts = result_str.split('::')
676 if len(parts) >= 3:
677 ignore_category = parts[1]
678 ignore_reason = parts[2]
679 ignored_notification = True
680 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}")
681
682 # Check for deprecated tool in tool call messages
683 elif hasattr(message, 'tool_call') and message.tool_call:
684 if message.tool_call.name == 'bluesky_reply':
685 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
686 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
687 logger.error("Update the agent's tools using register_tools.py")
688 # Export agent state before terminating
689 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
690 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
691 exit(1)
692
693 logger.debug(f"First pass complete. Collected {len(tool_call_results)} tool call results")
694 logger.debug(f"tool_call_results: {tool_call_results}")
695
696 # Second pass: process messages and check for successful tool calls
697 for i, message in enumerate(message_response.messages, 1):
698 # Log concise message info instead of full object
699 msg_type = getattr(message, 'message_type', 'unknown')
700 if hasattr(message, 'reasoning') and message.reasoning:
701 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...")
702 elif hasattr(message, 'tool_call') and message.tool_call:
703 tool_name = message.tool_call.name
704 logger.debug(f" {i}. {msg_type}: {tool_name}")
705 elif hasattr(message, 'tool_return'):
706 tool_name = getattr(message, 'name', 'unknown_tool')
707 return_preview = str(message.tool_return)[:100] if message.tool_return else "None"
708 status = getattr(message, 'status', 'unknown')
709 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})")
710 elif hasattr(message, 'text'):
711 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...")
712 else:
713 logger.debug(f" {i}. {msg_type}: <no content>")
714
715 # Check for halt_activity tool call
716 if hasattr(message, 'tool_call') and message.tool_call:
717 if message.tool_call.name == 'halt_activity':
718 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT")
719 try:
720 args = json.loads(message.tool_call.arguments)
721 reason = args.get('reason', 'Agent requested halt')
722 logger.info(f"Halt reason: {reason}")
723 except:
724 logger.info("Halt reason: <unable to parse>")
725
726 # Delete the queue file before terminating
727 if queue_filepath and queue_filepath.exists():
728 queue_filepath.unlink()
729 logger.info(f"Deleted queue file: {queue_filepath.name}")
730
731 # Also mark as processed to avoid reprocessing
732 if NOTIFICATION_DB:
733 NOTIFICATION_DB.mark_processed(notification_data.get('uri', ''), status='processed')
734 else:
735 processed_uris = load_processed_notifications()
736 processed_uris.add(notification_data.get('uri', ''))
737 save_processed_notifications(processed_uris)
738
739 # Export agent state before terminating
740 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
741
742 # Exit the program
743 logger.info("=== BOT TERMINATED BY AGENT ===")
744 exit(0)
745
746 # Check for deprecated bluesky_reply tool
747 if hasattr(message, 'tool_call') and message.tool_call:
748 if message.tool_call.name == 'bluesky_reply':
749 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
750 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
751 logger.error("Update the agent's tools using register_tools.py")
752 # Export agent state before terminating
753 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
754 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
755 exit(1)
756
757 # Collect annotate_ack tool calls
758 elif message.tool_call.name == 'annotate_ack':
759 try:
760 args = json.loads(message.tool_call.arguments)
761 note = args.get('note', '')
762 if note:
763 ack_note = note
764 logger.debug(f"Found annotate_ack with note: {note[:50]}...")
765 except json.JSONDecodeError as e:
766 logger.error(f"Failed to parse annotate_ack arguments: {e}")
767
768 # Collect flag_archival_memory_for_deletion tool calls
769 elif message.tool_call.name == 'flag_archival_memory_for_deletion':
770 try:
771 args = json.loads(message.tool_call.arguments)
772 reason = args.get('reason', '')
773 memory_text = args.get('memory_text', '')
774 confirm = args.get('confirm', False)
775
776 # Only flag for deletion if confirmed and has all required fields
777 if confirm and memory_text and reason:
778 flagged_memories.append({
779 'reason': reason,
780 'memory_text': memory_text
781 })
782 logger.debug(f"Found memory flagged for deletion (reason: {reason}): {memory_text[:50]}...")
783 elif not confirm:
784 logger.debug(f"Memory deletion not confirmed, skipping: {memory_text[:50]}...")
785 elif not reason:
786 logger.warning(f"Memory deletion missing reason, skipping: {memory_text[:50]}...")
787 except json.JSONDecodeError as e:
788 logger.error(f"Failed to parse flag_archival_memory_for_deletion arguments: {e}")
789
790 # Collect archival_memory_insert tool calls for recording to AT Protocol
791 elif message.tool_call.name == 'archival_memory_insert':
792 try:
793 args = json.loads(message.tool_call.arguments)
794 content = args.get('content', '')
795 tags_str = args.get('tags', None)
796
797 # Parse tags from string representation if present
798 tags = None
799 if tags_str:
800 try:
801 tags = json.loads(tags_str) if isinstance(tags_str, str) else tags_str
802 except json.JSONDecodeError:
803 logger.warning(f"Failed to parse tags from archival_memory_insert: {tags_str[:50]}...")
804
805 if content:
806 # Create stream.thought.memory record
807 try:
808 memory_result = bsky_utils.create_memory_record(atproto_client, content, tags)
809 if memory_result:
810 tags_info = f" ({len(tags)} tags)" if tags else ""
811 logger.info(f"📝 Recorded archival memory to AT Protocol{tags_info}: {content[:100]}...")
812 else:
813 logger.warning(f"Failed to record archival memory to AT Protocol")
814 except Exception as e:
815 logger.error(f"Error creating memory record: {e}")
816 except json.JSONDecodeError as e:
817 logger.error(f"Failed to parse archival_memory_insert arguments: {e}")
818
819 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful
820 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread':
821 tool_call_id = message.tool_call.tool_call_id
822 tool_status = tool_call_results.get(tool_call_id, 'unknown')
823
824 logger.debug(f"Found add_post_to_bluesky_reply_thread tool call: id={tool_call_id}, status={tool_status}")
825 logger.debug(f"Available tool_call_results: {tool_call_results}")
826
827 if tool_status == 'success':
828 try:
829 args = json.loads(message.tool_call.arguments)
830 reply_text = args.get('text', '')
831 reply_lang = args.get('lang', 'en-US')
832
833 if reply_text: # Only add if there's actual content
834 reply_candidates.append((reply_text, reply_lang))
835 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})")
836 except json.JSONDecodeError as e:
837 logger.error(f"Failed to parse tool call arguments: {e}")
838 elif tool_status == 'error':
839 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)")
840 else:
841 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}")
842 logger.warning(f" Tool call ID '{tool_call_id}' not found in tool_call_results dict")
843
844 # Handle archival memory deletion if any were flagged (only if no halt was received)
845 if flagged_memories:
846 logger.info(f"Processing {len(flagged_memories)} flagged memories for deletion")
847 for flagged_memory in flagged_memories:
848 reason = flagged_memory['reason']
849 memory_text = flagged_memory['memory_text']
850
851 try:
852 # Search for passages with this exact text
853 logger.debug(f"Searching for passages matching: {memory_text[:100]}...")
854 passages = CLIENT.agents.passages.list(
855 agent_id=void_agent.id,
856 query=memory_text
857 )
858
859 if not passages:
860 logger.warning(f"No passages found matching flagged memory: {memory_text[:50]}...")
861 continue
862
863 # Delete all matching passages
864 deleted_count = 0
865 for passage in passages:
866 # Check if the passage text exactly matches (to avoid partial matches)
867 if hasattr(passage, 'text') and passage.text == memory_text:
868 try:
869 CLIENT.agents.passages.delete(
870 agent_id=void_agent.id,
871 passage_id=str(passage.id)
872 )
873 deleted_count += 1
874 logger.debug(f"Deleted passage {passage.id}")
875 except Exception as delete_error:
876 logger.error(f"Failed to delete passage {passage.id}: {delete_error}")
877
878 if deleted_count > 0:
879 logger.info(f"🗑️ Deleted {deleted_count} archival memory passage(s) (reason: {reason}): {memory_text[:50]}...")
880 else:
881 logger.warning(f"No exact matches found for deletion: {memory_text[:50]}...")
882
883 except Exception as e:
884 logger.error(f"Error processing memory deletion: {e}")
885
886 # Check for conflicting tool calls
887 if reply_candidates and ignored_notification:
888 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!")
889 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}")
890 logger.warning("Item will be left in queue for manual review")
891 # Return False to keep in queue
892 return False
893
894 if reply_candidates:
895 # Aggregate reply posts into a thread
896 reply_messages = []
897 reply_langs = []
898 for text, lang in reply_candidates:
899 reply_messages.append(text)
900 reply_langs.append(lang)
901
902 # Use the first language for the entire thread (could be enhanced later)
903 reply_lang = reply_langs[0] if reply_langs else 'en-US'
904
905 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread")
906
907 # Display the generated reply thread
908 if len(reply_messages) == 1:
909 content = reply_messages[0]
910 title = f"Reply to @{author_handle}"
911 else:
912 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)])
913 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)"
914
915 # Format with Unicode characters
916 print(f"\n✎ {title}")
917 print(f" {'─' * len(title)}")
918 # Indent content lines
919 for line in content.split('\n'):
920 print(f" {line}")
921
922 # Send the reply(s) with language (unless in testing mode)
923 if testing_mode:
924 logger.info("TESTING MODE: Skipping actual Bluesky post")
925 response = True # Simulate success
926 else:
927 if len(reply_messages) == 1:
928 # Single reply - use existing function
929 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0])
930 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})")
931 response = bsky_utils.reply_to_notification(
932 client=atproto_client,
933 notification=notification_data,
934 reply_text=cleaned_text,
935 lang=reply_lang,
936 correlation_id=correlation_id
937 )
938 else:
939 # Multiple replies - use new threaded function
940 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages]
941 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})")
942 response = bsky_utils.reply_with_thread_to_notification(
943 client=atproto_client,
944 notification=notification_data,
945 reply_messages=cleaned_messages,
946 lang=reply_lang,
947 correlation_id=correlation_id
948 )
949
950 if response:
951 logger.info(f"[{correlation_id}] Successfully replied to @{author_handle}", extra={
952 'correlation_id': correlation_id,
953 'author_handle': author_handle,
954 'reply_count': len(reply_messages)
955 })
956
957 # Acknowledge the post we're replying to with stream.thought.ack
958 try:
959 post_uri = notification_data.get('uri')
960 post_cid = notification_data.get('cid')
961
962 if post_uri and post_cid:
963 ack_result = bsky_utils.acknowledge_post(
964 client=atproto_client,
965 post_uri=post_uri,
966 post_cid=post_cid,
967 note=ack_note
968 )
969 if ack_result:
970 if ack_note:
971 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")")
972 else:
973 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack")
974 else:
975 logger.warning(f"Failed to acknowledge post from @{author_handle}")
976 else:
977 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}")
978 except Exception as e:
979 logger.error(f"Error acknowledging post from @{author_handle}: {e}")
980 # Don't fail the entire operation if acknowledgment fails
981
982 return True
983 else:
984 logger.error(f"Failed to send reply to @{author_handle}")
985 return False
986 else:
987 # Check if notification was explicitly ignored
988 if ignored_notification:
989 logger.info(f"[{correlation_id}] Notification from @{author_handle} was explicitly ignored (category: {ignore_category})", extra={
990 'correlation_id': correlation_id,
991 'author_handle': author_handle,
992 'ignore_category': ignore_category
993 })
994 return "ignored"
995 else:
996 logger.warning(f"[{correlation_id}] No reply generated for mention from @{author_handle}, moving to no_reply folder", extra={
997 'correlation_id': correlation_id,
998 'author_handle': author_handle
999 })
1000 return "no_reply"
1001
1002 except Exception as e:
1003 logger.error(f"[{correlation_id}] Error processing mention: {e}", extra={
1004 'correlation_id': correlation_id,
1005 'error': str(e),
1006 'error_type': type(e).__name__,
1007 'author_handle': author_handle if 'author_handle' in locals() else 'unknown'
1008 })
1009 return False
1010 finally:
1011 # Detach user blocks after agent response (success or failure)
1012 if 'attached_handles' in locals() and attached_handles:
1013 try:
1014 logger.info(f"Detaching user blocks for handles: {attached_handles}")
1015 detach_result = detach_user_blocks(attached_handles, void_agent)
1016 logger.debug(f"Detach result: {detach_result}")
1017 except Exception as detach_error:
1018 logger.warning(f"Failed to detach user blocks: {detach_error}")
1019
1020
1021def notification_to_dict(notification):
1022 """Convert a notification object to a dictionary for JSON serialization."""
1023 return {
1024 'uri': notification.uri,
1025 'cid': notification.cid,
1026 'reason': notification.reason,
1027 'is_read': notification.is_read,
1028 'indexed_at': notification.indexed_at,
1029 'author': {
1030 'handle': notification.author.handle,
1031 'display_name': notification.author.display_name,
1032 'did': notification.author.did
1033 },
1034 'record': {
1035 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
1036 }
1037 }
1038
1039
1040def load_processed_notifications():
1041 """Load the set of processed notification URIs from database."""
1042 global NOTIFICATION_DB
1043 if NOTIFICATION_DB:
1044 return NOTIFICATION_DB.get_processed_uris(limit=MAX_PROCESSED_NOTIFICATIONS)
1045 return set()
1046
1047
1048def save_processed_notifications(processed_set):
1049 """Save the set of processed notification URIs to database."""
1050 # This is now handled by marking individual notifications in the DB
1051 # Keeping function for compatibility but it doesn't need to do anything
1052 pass
1053
1054
1055def save_notification_to_queue(notification, is_priority=None):
1056 """Save a notification to the queue directory with priority-based filename."""
1057 try:
1058 global NOTIFICATION_DB
1059
1060 # Handle both notification objects and dicts
1061 if isinstance(notification, dict):
1062 notif_dict = notification
1063 notification_uri = notification.get('uri')
1064 else:
1065 notif_dict = notification_to_dict(notification)
1066 notification_uri = notification.uri
1067
1068 # Check if already processed (using database if available)
1069 if NOTIFICATION_DB:
1070 if NOTIFICATION_DB.is_processed(notification_uri):
1071 logger.debug(f"Notification already processed (DB): {notification_uri}")
1072 return False
1073 # Add to database - if this fails, don't queue the notification
1074 if not NOTIFICATION_DB.add_notification(notif_dict):
1075 logger.warning(f"Failed to add notification to database, skipping: {notification_uri}")
1076 return False
1077 else:
1078 # Fall back to old JSON method
1079 processed_uris = load_processed_notifications()
1080 if notification_uri in processed_uris:
1081 logger.debug(f"Notification already processed: {notification_uri}")
1082 return False
1083
1084 # Create JSON string
1085 notif_json = json.dumps(notif_dict, sort_keys=True)
1086
1087 # Generate hash for filename (to avoid duplicates)
1088 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
1089
1090 # Determine priority based on author handle or explicit priority
1091 if is_priority is not None:
1092 priority_prefix = "0_" if is_priority else "1_"
1093 else:
1094 if isinstance(notification, dict):
1095 author_handle = notification.get('author', {}).get('handle', '')
1096 else:
1097 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else ''
1098 # Prioritize cameron.pfiffer.org responses
1099 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_"
1100
1101 # Create filename with priority, timestamp and hash
1102 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
1103 reason = notif_dict.get('reason', 'unknown')
1104 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json"
1105 filepath = QUEUE_DIR / filename
1106
1107 # Check if this notification URI is already in the queue
1108 for existing_file in QUEUE_DIR.glob("*.json"):
1109 if existing_file.name == "processed_notifications.json":
1110 continue
1111 try:
1112 with open(existing_file, 'r') as f:
1113 existing_data = json.load(f)
1114 if existing_data.get('uri') == notification_uri:
1115 logger.debug(f"Notification already queued (URI: {notification_uri})")
1116 return False
1117 except:
1118 continue
1119
1120 # Write to file
1121 with open(filepath, 'w') as f:
1122 json.dump(notif_dict, f, indent=2)
1123
1124 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal"
1125 logger.info(f"Queued notification ({priority_label}): {filename}")
1126 return True
1127
1128 except Exception as e:
1129 logger.error(f"Error saving notification to queue: {e}")
1130 return False
1131
1132
1133def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False):
1134 """Load and process all notifications from the queue in priority order."""
1135 try:
1136 # Get all JSON files in queue directory (excluding processed_notifications.json)
1137 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix)
1138 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
1139
1140 # Filter out and delete like notifications immediately
1141 queue_files = []
1142 likes_deleted = 0
1143
1144 for filepath in all_queue_files:
1145 try:
1146 with open(filepath, 'r') as f:
1147 notif_data = json.load(f)
1148
1149 # If it's a like, delete it immediately and don't process
1150 if notif_data.get('reason') == 'like':
1151 filepath.unlink()
1152 likes_deleted += 1
1153 logger.debug(f"Deleted like notification: {filepath.name}")
1154 else:
1155 queue_files.append(filepath)
1156 except Exception as e:
1157 logger.warning(f"Error checking notification file {filepath.name}: {e}")
1158 queue_files.append(filepath) # Keep it in case it's valid
1159
1160 if likes_deleted > 0:
1161 logger.info(f"Deleted {likes_deleted} like notifications from queue")
1162
1163 if not queue_files:
1164 return
1165
1166 logger.info(f"Processing {len(queue_files)} queued notifications")
1167
1168 # Log current statistics
1169 elapsed_time = time.time() - start_time
1170 total_messages = sum(message_counters.values())
1171 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1172
1173 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min")
1174
1175 for i, filepath in enumerate(queue_files, 1):
1176 # Determine if this is a priority notification
1177 is_priority = filepath.name.startswith("0_")
1178
1179 # Check for new notifications periodically during queue processing
1180 # Also check immediately after processing each priority item
1181 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1)
1182
1183 # If we just processed a priority item, immediately check for new priority notifications
1184 if is_priority and i > 1:
1185 should_check_notifications = True
1186
1187 if should_check_notifications:
1188 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)")
1189 try:
1190 # Fetch and queue new notifications without processing them
1191 new_count = fetch_and_queue_new_notifications(atproto_client)
1192
1193 if new_count > 0:
1194 logger.info(f"Added {new_count} new notifications to queue")
1195 # Reload the queue files to include the new items
1196 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
1197 queue_files = updated_queue_files
1198 logger.info(f"Queue updated: now {len(queue_files)} total items")
1199 except Exception as e:
1200 logger.error(f"Error checking for new notifications: {e}")
1201
1202 priority_label = " [PRIORITY]" if is_priority else ""
1203 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}")
1204 try:
1205 # Load notification data
1206 with open(filepath, 'r') as f:
1207 notif_data = json.load(f)
1208
1209 # Process based on type using dict data directly
1210 success = False
1211 if notif_data['reason'] == "mention":
1212 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
1213 if success:
1214 message_counters['mentions'] += 1
1215 elif notif_data['reason'] == "reply":
1216 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
1217 if success:
1218 message_counters['replies'] += 1
1219 elif notif_data['reason'] == "follow":
1220 author_handle = notif_data['author']['handle']
1221 author_display_name = notif_data['author'].get('display_name', 'no display name')
1222 follow_update = f"@{author_handle} ({author_display_name}) started following you."
1223 follow_message = f"Update: {follow_update}"
1224 logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars")
1225 CLIENT.agents.messages.create(
1226 agent_id = void_agent.id,
1227 messages = [{"role":"user", "content": follow_message}]
1228 )
1229 success = True # Follow updates are always successful
1230 if success:
1231 message_counters['follows'] += 1
1232 elif notif_data['reason'] == "repost":
1233 # Skip reposts silently
1234 success = True # Skip reposts but mark as successful to remove from queue
1235 if success:
1236 message_counters['reposts_skipped'] += 1
1237 elif notif_data['reason'] == "like":
1238 # Skip likes silently
1239 success = True # Skip likes but mark as successful to remove from queue
1240 if success:
1241 message_counters.setdefault('likes_skipped', 0)
1242 message_counters['likes_skipped'] += 1
1243 else:
1244 logger.warning(f"Unknown notification type: {notif_data['reason']}")
1245 success = True # Remove unknown types from queue
1246
1247 # Handle file based on processing result
1248 if success:
1249 if testing_mode:
1250 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}")
1251 else:
1252 filepath.unlink()
1253 logger.info(f"Successfully processed and removed: {filepath.name}")
1254
1255 # Mark as processed to avoid reprocessing
1256 if NOTIFICATION_DB:
1257 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='processed')
1258 else:
1259 processed_uris = load_processed_notifications()
1260 processed_uris.add(notif_data['uri'])
1261 save_processed_notifications(processed_uris)
1262
1263 elif success is None: # Special case for moving to error directory
1264 error_path = QUEUE_ERROR_DIR / filepath.name
1265 filepath.rename(error_path)
1266 logger.warning(f"Moved {filepath.name} to errors directory")
1267
1268 # Also mark as processed to avoid retrying
1269 if NOTIFICATION_DB:
1270 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1271 else:
1272 processed_uris = load_processed_notifications()
1273 processed_uris.add(notif_data['uri'])
1274 save_processed_notifications(processed_uris)
1275
1276 elif success == "no_reply": # Special case for moving to no_reply directory
1277 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name
1278 filepath.rename(no_reply_path)
1279 logger.info(f"Moved {filepath.name} to no_reply directory")
1280
1281 # Also mark as processed to avoid retrying
1282 if NOTIFICATION_DB:
1283 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1284 else:
1285 processed_uris = load_processed_notifications()
1286 processed_uris.add(notif_data['uri'])
1287 save_processed_notifications(processed_uris)
1288
1289 elif success == "ignored": # Special case for explicitly ignored notifications
1290 # For ignored notifications, we just delete them (not move to no_reply)
1291 filepath.unlink()
1292 logger.info(f"🚫 Deleted ignored notification: {filepath.name}")
1293
1294 # Also mark as processed to avoid retrying
1295 if NOTIFICATION_DB:
1296 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1297 else:
1298 processed_uris = load_processed_notifications()
1299 processed_uris.add(notif_data['uri'])
1300 save_processed_notifications(processed_uris)
1301
1302 else:
1303 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry")
1304
1305 except Exception as e:
1306 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}")
1307 # Keep the file for retry later
1308
1309 except Exception as e:
1310 logger.error(f"Error loading queued notifications: {e}")
1311
1312
1313def fetch_and_queue_new_notifications(atproto_client):
1314 """Fetch new notifications and queue them without processing."""
1315 try:
1316 global NOTIFICATION_DB
1317
1318 # Get current time for marking notifications as seen
1319 logger.debug("Getting current time for notification marking...")
1320 last_seen_at = atproto_client.get_current_time_iso()
1321
1322 # Get timestamp of last processed notification for filtering
1323 last_processed_time = None
1324 if NOTIFICATION_DB:
1325 last_processed_time = NOTIFICATION_DB.get_latest_processed_time()
1326 if last_processed_time:
1327 logger.debug(f"Last processed notification was at: {last_processed_time}")
1328
1329 # Fetch ALL notifications using pagination
1330 all_notifications = []
1331 cursor = None
1332 page_count = 0
1333 max_pages = 20 # Safety limit to prevent infinite loops
1334
1335 while page_count < max_pages:
1336 try:
1337 # Fetch notifications page
1338 if cursor:
1339 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1340 params={'cursor': cursor, 'limit': 100}
1341 )
1342 else:
1343 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1344 params={'limit': 100}
1345 )
1346
1347 page_count += 1
1348 page_notifications = notifications_response.notifications
1349
1350 if not page_notifications:
1351 break
1352
1353 all_notifications.extend(page_notifications)
1354
1355 # Check if there are more pages
1356 cursor = getattr(notifications_response, 'cursor', None)
1357 if not cursor:
1358 break
1359
1360 except Exception as e:
1361 logger.error(f"Error fetching notifications page {page_count}: {e}")
1362 break
1363
1364 # Now process all fetched notifications
1365 new_count = 0
1366 if all_notifications:
1367 logger.info(f"📥 Fetched {len(all_notifications)} total notifications from API")
1368
1369 # Mark as seen first
1370 try:
1371 atproto_client.app.bsky.notification.update_seen(
1372 data={'seenAt': last_seen_at}
1373 )
1374 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}")
1375 except Exception as e:
1376 logger.error(f"Error marking notifications as seen: {e}")
1377
1378 # Debug counters
1379 skipped_read = 0
1380 skipped_likes = 0
1381 skipped_processed = 0
1382 skipped_old_timestamp = 0
1383 processed_uris = load_processed_notifications()
1384
1385 # Queue all new notifications (except likes)
1386 for notif in all_notifications:
1387 # Skip if older than last processed (when we have timestamp filtering)
1388 if last_processed_time and hasattr(notif, 'indexed_at'):
1389 if notif.indexed_at <= last_processed_time:
1390 skipped_old_timestamp += 1
1391 logger.debug(f"Skipping old notification (indexed_at {notif.indexed_at} <= {last_processed_time})")
1392 continue
1393
1394 # Debug: Log is_read status but DON'T skip based on it
1395 if hasattr(notif, 'is_read') and notif.is_read:
1396 skipped_read += 1
1397 logger.debug(f"Notification has is_read=True (but processing anyway): {notif.uri if hasattr(notif, 'uri') else 'unknown'}")
1398
1399 # Skip likes
1400 if hasattr(notif, 'reason') and notif.reason == 'like':
1401 skipped_likes += 1
1402 continue
1403
1404 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif
1405
1406 # Skip likes in dict form too
1407 if notif_dict.get('reason') == 'like':
1408 continue
1409
1410 # Check if already processed
1411 notif_uri = notif_dict.get('uri', '')
1412 if notif_uri in processed_uris:
1413 skipped_processed += 1
1414 logger.debug(f"Skipping already processed: {notif_uri}")
1415 continue
1416
1417 # Check if it's a priority notification
1418 is_priority = False
1419
1420 # Priority for cameron.pfiffer.org notifications
1421 author_handle = notif_dict.get('author', {}).get('handle', '')
1422 if author_handle == "cameron.pfiffer.org":
1423 is_priority = True
1424
1425 # Also check for priority keywords in mentions
1426 if notif_dict.get('reason') == 'mention':
1427 # Get the mention text to check for priority keywords
1428 record = notif_dict.get('record', {})
1429 text = record.get('text', '')
1430 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']):
1431 is_priority = True
1432
1433 if save_notification_to_queue(notif_dict, is_priority=is_priority):
1434 new_count += 1
1435 logger.debug(f"Queued notification from @{author_handle}: {notif_dict.get('reason', 'unknown')}")
1436
1437 # Log summary of filtering
1438 logger.info(f"📊 Notification processing summary:")
1439 logger.info(f" • Total fetched: {len(all_notifications)}")
1440 logger.info(f" • Had is_read=True: {skipped_read} (not skipped)")
1441 logger.info(f" • Skipped (likes): {skipped_likes}")
1442 logger.info(f" • Skipped (old timestamp): {skipped_old_timestamp}")
1443 logger.info(f" • Skipped (already processed): {skipped_processed}")
1444 logger.info(f" • Queued for processing: {new_count}")
1445 else:
1446 logger.debug("No new notifications to queue")
1447
1448 return new_count
1449
1450 except Exception as e:
1451 logger.error(f"Error fetching and queueing notifications: {e}")
1452 return 0
1453
1454
1455def process_notifications(void_agent, atproto_client, testing_mode=False):
1456 """Fetch new notifications, queue them, and process the queue."""
1457 try:
1458 # Fetch and queue new notifications
1459 new_count = fetch_and_queue_new_notifications(atproto_client)
1460
1461 if new_count > 0:
1462 logger.info(f"Found {new_count} new notifications to process")
1463
1464 # Now process the entire queue (old + new notifications)
1465 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode)
1466
1467 except Exception as e:
1468 logger.error(f"Error processing notifications: {e}")
1469
1470
1471def send_synthesis_message(client: Letta, agent_id: str, agent_name: str = "void", atproto_client=None) -> None:
1472 """
1473 Send a synthesis message to the agent every 10 minutes.
1474 This prompts the agent to synthesize its recent experiences.
1475
1476 Args:
1477 client: Letta client
1478 agent_id: Agent ID to send synthesis to
1479 agent_name: Agent name for temporal block labels
1480 atproto_client: Optional AT Protocol client for posting synthesis results
1481 """
1482 # Track attached temporal blocks for cleanup
1483 attached_temporal_labels = []
1484
1485 try:
1486 logger.info("🧠 Preparing synthesis with temporal journal blocks")
1487
1488 # Attach temporal blocks before synthesis
1489 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id, agent_name)
1490 if not success:
1491 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway")
1492
1493 # Create synthesis prompt with agent-specific block names
1494 today = date.today()
1495 synthesis_prompt = f"""Time for synthesis.
1496
1497This is your periodic opportunity to reflect on recent experiences and update your memory.
1498
1499The following temporal journal blocks are temporarily available for this session:
1500- {agent_name}_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')})
1501- {agent_name}_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')})
1502- {agent_name}_year_{today.year}: This year's journal ({today.year})
1503
1504You may use these blocks as you see fit. Synthesize your recent experiences into your memory as appropriate."""
1505
1506 logger.info("🧠 Sending enhanced synthesis prompt to agent")
1507
1508 # Send synthesis message with streaming to show tool use
1509 message_stream = client.agents.messages.create_stream(
1510 agent_id=agent_id,
1511 messages=[{"role": "user", "content": synthesis_prompt}],
1512 stream_tokens=False,
1513 max_steps=100
1514 )
1515
1516 # Track synthesis content for potential posting
1517 synthesis_posts = []
1518 ack_note = None
1519
1520 # Process the streaming response
1521 for chunk in message_stream:
1522 if hasattr(chunk, 'message_type'):
1523 if chunk.message_type == 'reasoning_message':
1524 if SHOW_REASONING:
1525 print("\n◆ Reasoning")
1526 print(" ─────────")
1527 for line in chunk.reasoning.split('\n'):
1528 print(f" {line}")
1529
1530 # Create ATProto record for reasoning (if we have atproto client)
1531 if atproto_client and hasattr(chunk, 'reasoning'):
1532 try:
1533 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning)
1534 except Exception as e:
1535 logger.debug(f"Failed to create reasoning record during synthesis: {e}")
1536 elif chunk.message_type == 'tool_call_message':
1537 tool_name = chunk.tool_call.name
1538
1539 # Create ATProto record for tool call (if we have atproto client)
1540 if atproto_client:
1541 try:
1542 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None
1543 bsky_utils.create_tool_call_record(
1544 atproto_client,
1545 tool_name,
1546 chunk.tool_call.arguments,
1547 tool_call_id
1548 )
1549 except Exception as e:
1550 logger.debug(f"Failed to create tool call record during synthesis: {e}")
1551 try:
1552 args = json.loads(chunk.tool_call.arguments)
1553 if tool_name == 'archival_memory_search':
1554 query = args.get('query', 'unknown')
1555 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue")
1556 elif tool_name == 'archival_memory_insert':
1557 content = args.get('content', '')
1558 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue")
1559
1560 # Record archival memory insert to AT Protocol
1561 if atproto_client:
1562 try:
1563 tags_str = args.get('tags', None)
1564 tags = None
1565 if tags_str:
1566 try:
1567 tags = json.loads(tags_str) if isinstance(tags_str, str) else tags_str
1568 except json.JSONDecodeError:
1569 logger.warning(f"Failed to parse tags from archival_memory_insert: {tags_str[:50]}...")
1570
1571 memory_result = bsky_utils.create_memory_record(atproto_client, content, tags)
1572 if memory_result:
1573 tags_info = f" ({len(tags)} tags)" if tags else ""
1574 logger.info(f"📝 Recorded archival memory to AT Protocol{tags_info}")
1575 except Exception as e:
1576 logger.debug(f"Failed to create memory record during synthesis: {e}")
1577 elif tool_name == 'update_block':
1578 label = args.get('label', 'unknown')
1579 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', ''))
1580 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue")
1581 elif tool_name == 'annotate_ack':
1582 note = args.get('note', '')
1583 if note:
1584 ack_note = note
1585 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue")
1586 elif tool_name == 'add_post_to_bluesky_reply_thread':
1587 text = args.get('text', '')
1588 synthesis_posts.append(text)
1589 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue")
1590 else:
1591 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
1592 if len(args_str) > 150:
1593 args_str = args_str[:150] + "..."
1594 log_with_panel(args_str, f"Tool call: {tool_name}", "blue")
1595 except:
1596 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
1597 elif chunk.message_type == 'tool_return_message':
1598 if chunk.status == 'success':
1599 log_with_panel("Success", f"Tool result: {chunk.name} ✓", "green")
1600 else:
1601 log_with_panel("Error", f"Tool result: {chunk.name} ✗", "red")
1602 elif chunk.message_type == 'assistant_message':
1603 print("\n▶ Synthesis Response")
1604 print(" ──────────────────")
1605 for line in chunk.content.split('\n'):
1606 print(f" {line}")
1607
1608 if str(chunk) == 'done':
1609 break
1610
1611 logger.info("🧠 Synthesis message processed successfully")
1612
1613 # Handle synthesis acknowledgments if we have an atproto client
1614 if atproto_client and ack_note:
1615 try:
1616 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note)
1617 if result:
1618 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...")
1619 else:
1620 logger.warning("Failed to create synthesis acknowledgment")
1621 except Exception as e:
1622 logger.error(f"Error creating synthesis acknowledgment: {e}")
1623
1624 # Handle synthesis posts if any were generated
1625 if atproto_client and synthesis_posts:
1626 try:
1627 for post_text in synthesis_posts:
1628 cleaned_text = bsky_utils.remove_outside_quotes(post_text)
1629 response = bsky_utils.send_post(atproto_client, cleaned_text)
1630 if response:
1631 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...")
1632 else:
1633 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...")
1634 except Exception as e:
1635 logger.error(f"Error posting synthesis content: {e}")
1636
1637 except Exception as e:
1638 logger.error(f"Error sending synthesis message: {e}")
1639 finally:
1640 # Always detach temporal blocks after synthesis
1641 if attached_temporal_labels:
1642 logger.info("🧠 Detaching temporal journal blocks after synthesis")
1643 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels, agent_name)
1644 if not detach_success:
1645 logger.warning("Some temporal blocks may not have been detached properly")
1646
1647
1648def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None:
1649 """
1650 Detach all user blocks from the agent to prevent memory bloat.
1651 This should be called periodically to ensure clean state.
1652 """
1653 try:
1654 # Get all blocks attached to the agent
1655 attached_blocks = client.agents.blocks.list(agent_id=agent_id)
1656
1657 user_blocks_to_detach = []
1658 for block in attached_blocks:
1659 if hasattr(block, 'label') and block.label.startswith('user_'):
1660 user_blocks_to_detach.append({
1661 'label': block.label,
1662 'id': block.id
1663 })
1664
1665 if not user_blocks_to_detach:
1666 logger.debug("No user blocks found to detach during periodic cleanup")
1667 return
1668
1669 # Detach each user block
1670 detached_count = 0
1671 for block_info in user_blocks_to_detach:
1672 try:
1673 client.agents.blocks.detach(
1674 agent_id=agent_id,
1675 block_id=str(block_info['id'])
1676 )
1677 detached_count += 1
1678 logger.debug(f"Detached user block: {block_info['label']}")
1679 except Exception as e:
1680 logger.warning(f"Failed to detach block {block_info['label']}: {e}")
1681
1682 if detached_count > 0:
1683 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks")
1684
1685 except Exception as e:
1686 logger.error(f"Error during periodic user block cleanup: {e}")
1687
1688
1689def attach_temporal_blocks(client: Letta, agent_id: str, agent_name: str = "void") -> tuple:
1690 """
1691 Attach temporal journal blocks (day, month, year) to the agent for synthesis.
1692 Creates blocks if they don't exist.
1693
1694 Args:
1695 client: Letta client
1696 agent_id: Agent ID
1697 agent_name: Agent name for prefixing block labels (prevents collision across agents)
1698
1699 Returns:
1700 Tuple of (success: bool, attached_labels: list)
1701 """
1702 try:
1703 today = date.today()
1704
1705 # Generate temporal block labels with agent-specific prefix
1706 day_label = f"{agent_name}_day_{today.strftime('%Y_%m_%d')}"
1707 month_label = f"{agent_name}_month_{today.strftime('%Y_%m')}"
1708 year_label = f"{agent_name}_year_{today.year}"
1709
1710 temporal_labels = [day_label, month_label, year_label]
1711 attached_labels = []
1712
1713 # Get current blocks attached to agent
1714 current_blocks = client.agents.blocks.list(agent_id=agent_id)
1715 current_block_labels = {block.label for block in current_blocks}
1716 current_block_ids = {str(block.id) for block in current_blocks}
1717
1718 for label in temporal_labels:
1719 try:
1720 # Skip if already attached
1721 if label in current_block_labels:
1722 logger.debug(f"Temporal block already attached: {label}")
1723 attached_labels.append(label)
1724 continue
1725
1726 # Check if block exists globally
1727 blocks = client.blocks.list(label=label)
1728
1729 if blocks and len(blocks) > 0:
1730 block = blocks[0]
1731 # Check if already attached by ID
1732 if str(block.id) in current_block_ids:
1733 logger.debug(f"Temporal block already attached by ID: {label}")
1734 attached_labels.append(label)
1735 continue
1736 else:
1737 # Create new temporal block with appropriate header
1738 if "day" in label:
1739 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}"
1740 initial_content = f"{header}\n\nNo entries yet for today."
1741 elif "month" in label:
1742 header = f"# Monthly Journal - {today.strftime('%B %Y')}"
1743 initial_content = f"{header}\n\nNo entries yet for this month."
1744 else: # year
1745 header = f"# Yearly Journal - {today.year}"
1746 initial_content = f"{header}\n\nNo entries yet for this year."
1747
1748 block = client.blocks.create(
1749 label=label,
1750 value=initial_content,
1751 limit=10000 # Larger limit for journal blocks
1752 )
1753 logger.info(f"Created new temporal block: {label}")
1754
1755 # Attach the block
1756 client.agents.blocks.attach(
1757 agent_id=agent_id,
1758 block_id=str(block.id)
1759 )
1760 attached_labels.append(label)
1761 logger.info(f"Attached temporal block: {label}")
1762
1763 except Exception as e:
1764 # Check for duplicate constraint errors
1765 error_str = str(e)
1766 if "duplicate key value violates unique constraint" in error_str:
1767 logger.debug(f"Temporal block already attached (constraint): {label}")
1768 attached_labels.append(label)
1769 else:
1770 logger.warning(f"Failed to attach temporal block {label}: {e}")
1771
1772 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}")
1773 return True, attached_labels
1774
1775 except Exception as e:
1776 logger.error(f"Error attaching temporal blocks: {e}")
1777 return False, []
1778
1779
1780def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: list = None, agent_name: str = "void") -> bool:
1781 """
1782 Detach temporal journal blocks from the agent after synthesis.
1783
1784 Args:
1785 client: Letta client
1786 agent_id: Agent ID
1787 labels_to_detach: Optional list of specific labels to detach.
1788 If None, detaches all temporal blocks for this agent.
1789 agent_name: Agent name for prefixing block labels (prevents collision across agents)
1790
1791 Returns:
1792 bool: Success status
1793 """
1794 try:
1795 # If no specific labels provided, generate today's labels
1796 if labels_to_detach is None:
1797 today = date.today()
1798 labels_to_detach = [
1799 f"{agent_name}_day_{today.strftime('%Y_%m_%d')}",
1800 f"{agent_name}_month_{today.strftime('%Y_%m')}",
1801 f"{agent_name}_year_{today.year}"
1802 ]
1803
1804 # Get current blocks and build label to ID mapping
1805 current_blocks = client.agents.blocks.list(agent_id=agent_id)
1806 block_label_to_id = {block.label: str(block.id) for block in current_blocks}
1807
1808 detached_count = 0
1809 for label in labels_to_detach:
1810 if label in block_label_to_id:
1811 try:
1812 client.agents.blocks.detach(
1813 agent_id=agent_id,
1814 block_id=block_label_to_id[label]
1815 )
1816 detached_count += 1
1817 logger.debug(f"Detached temporal block: {label}")
1818 except Exception as e:
1819 logger.warning(f"Failed to detach temporal block {label}: {e}")
1820 else:
1821 logger.debug(f"Temporal block not attached: {label}")
1822
1823 logger.info(f"Detached {detached_count} temporal blocks")
1824 return True
1825
1826 except Exception as e:
1827 logger.error(f"Error detaching temporal blocks: {e}")
1828 return False
1829
1830
1831def main():
1832 # Parse command line arguments
1833 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent')
1834 parser.add_argument('--config', type=str, default='config.yaml', help='Path to config file (default: config.yaml)')
1835 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)')
1836 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state')
1837 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)')
1838 # --rich option removed as we now use simple text formatting
1839 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO')
1840 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)')
1841 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)')
1842 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)')
1843 parser.add_argument('--debug', action='store_true', help='Enable debug logging')
1844 args = parser.parse_args()
1845
1846 # Initialize configuration with custom path
1847 global letta_config, CLIENT, QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR, PROCESSED_NOTIFICATIONS_FILE, NOTIFICATION_DB
1848 get_config(args.config) # Initialize the global config instance
1849 letta_config = get_letta_config()
1850
1851 # Initialize queue paths from config
1852 queue_config = get_queue_config()
1853 QUEUE_DIR = Path(queue_config['base_dir'])
1854 QUEUE_ERROR_DIR = Path(queue_config['error_dir'])
1855 QUEUE_NO_REPLY_DIR = Path(queue_config['no_reply_dir'])
1856 PROCESSED_NOTIFICATIONS_FILE = Path(queue_config['processed_file'])
1857
1858 # Create queue directories
1859 QUEUE_DIR.mkdir(exist_ok=True)
1860 QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True)
1861 QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True)
1862
1863 # Create Letta client with configuration
1864 CLIENT_PARAMS = {
1865 'token': letta_config['api_key'],
1866 'timeout': letta_config['timeout']
1867 }
1868 if letta_config.get('base_url'):
1869 CLIENT_PARAMS['base_url'] = letta_config['base_url']
1870 CLIENT = Letta(**CLIENT_PARAMS)
1871
1872 # Configure logging based on command line arguments
1873 if args.simple_logs:
1874 log_format = "void - %(levelname)s - %(message)s"
1875 else:
1876 # Create custom formatter with symbols
1877 class SymbolFormatter(logging.Formatter):
1878 """Custom formatter that adds symbols for different log levels"""
1879
1880 SYMBOLS = {
1881 logging.DEBUG: '◇',
1882 logging.INFO: '✓',
1883 logging.WARNING: '⚠',
1884 logging.ERROR: '✗',
1885 logging.CRITICAL: '‼'
1886 }
1887
1888 def format(self, record):
1889 # Get the symbol for this log level
1890 symbol = self.SYMBOLS.get(record.levelno, '•')
1891
1892 # Format time as HH:MM:SS
1893 timestamp = self.formatTime(record, "%H:%M:%S")
1894
1895 # Build the formatted message
1896 level_name = f"{record.levelname:<5}" # Left-align, 5 chars
1897
1898 # Use vertical bar as separator
1899 parts = [symbol, timestamp, '│', level_name, '│', record.getMessage()]
1900
1901 return ' '.join(parts)
1902
1903 # Reset logging configuration
1904 for handler in logging.root.handlers[:]:
1905 logging.root.removeHandler(handler)
1906
1907 # Create handler with custom formatter
1908 handler = logging.StreamHandler()
1909 if not args.simple_logs:
1910 handler.setFormatter(SymbolFormatter())
1911 else:
1912 handler.setFormatter(logging.Formatter(log_format))
1913
1914 # Configure root logger
1915 logging.root.setLevel(logging.INFO)
1916 logging.root.addHandler(handler)
1917
1918 global logger, prompt_logger, console
1919 logger = logging.getLogger("void_bot")
1920 logger.setLevel(logging.DEBUG if args.debug else logging.INFO)
1921
1922 # Create a separate logger for prompts (set to WARNING to hide by default)
1923 prompt_logger = logging.getLogger("void_bot.prompts")
1924 if args.reasoning:
1925 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used
1926 else:
1927 prompt_logger.setLevel(logging.WARNING) # Hide by default
1928
1929 # Disable httpx logging completely
1930 logging.getLogger("httpx").setLevel(logging.CRITICAL)
1931
1932 # Create Rich console for pretty printing
1933 # Console no longer used - simple text formatting
1934
1935 global TESTING_MODE, SKIP_GIT, SHOW_REASONING
1936 TESTING_MODE = args.test
1937
1938 # Store no-git flag globally for use in export_agent_state calls
1939 SKIP_GIT = args.no_git
1940
1941 # Store rich flag globally
1942 # Rich formatting no longer used
1943
1944 # Store reasoning flag globally
1945 SHOW_REASONING = args.reasoning
1946
1947 if TESTING_MODE:
1948 logger.info("=== RUNNING IN TESTING MODE ===")
1949 logger.info(" - No messages will be sent to Bluesky")
1950 logger.info(" - Queue files will not be deleted")
1951 logger.info(" - Notifications will not be marked as seen")
1952 print("\n")
1953
1954 # Check for synthesis-only mode
1955 SYNTHESIS_ONLY = args.synthesis_only
1956 if SYNTHESIS_ONLY:
1957 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===")
1958 logger.info(" - Only synthesis messages will be sent")
1959 logger.info(" - No notification processing")
1960 logger.info(" - No Bluesky client needed")
1961 print("\n")
1962 """Main bot loop that continuously monitors for notifications."""
1963 global start_time
1964 start_time = time.time()
1965 logger.info("=== STARTING VOID BOT ===")
1966 void_agent = initialize_void()
1967 logger.info(f"Void agent initialized: {void_agent.id}")
1968
1969 # Initialize notification database with config-based path
1970 logger.info("Initializing notification database...")
1971 NOTIFICATION_DB = NotificationDB(db_path=queue_config['db_path'])
1972
1973 # Migrate from old JSON format if it exists
1974 if PROCESSED_NOTIFICATIONS_FILE.exists():
1975 logger.info("Found old processed_notifications.json, migrating to database...")
1976 NOTIFICATION_DB.migrate_from_json(str(PROCESSED_NOTIFICATIONS_FILE))
1977
1978 # Log database stats
1979 db_stats = NOTIFICATION_DB.get_stats()
1980 logger.info(f"Database initialized - Total notifications: {db_stats.get('total', 0)}, Recent (24h): {db_stats.get('recent_24h', 0)}")
1981
1982 # Clean up old records
1983 NOTIFICATION_DB.cleanup_old_records(days=7)
1984
1985 # Ensure correct tools are attached for Bluesky
1986 logger.info("Configuring tools for Bluesky platform...")
1987 try:
1988 from tool_manager import ensure_platform_tools
1989 ensure_platform_tools('bluesky', void_agent.id)
1990 except Exception as e:
1991 logger.error(f"Failed to configure platform tools: {e}")
1992 logger.warning("Continuing with existing tool configuration")
1993
1994 # Check if agent has required tools
1995 if hasattr(void_agent, 'tools') and void_agent.tools:
1996 tool_names = [tool.name for tool in void_agent.tools]
1997 # Check for bluesky-related tools
1998 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()]
1999 if not bluesky_tools:
2000 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.")
2001 else:
2002 logger.warning("Agent has no tools registered!")
2003
2004 # Clean up all user blocks at startup
2005 logger.info("🧹 Cleaning up user blocks at startup...")
2006 periodic_user_block_cleanup(CLIENT, void_agent.id)
2007
2008 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts)
2009 if not SYNTHESIS_ONLY:
2010 atproto_client = bsky_utils.default_login()
2011 logger.info("Connected to Bluesky")
2012 else:
2013 # In synthesis-only mode, still connect for acks and posts (unless in test mode)
2014 if not args.test:
2015 atproto_client = bsky_utils.default_login()
2016 logger.info("Connected to Bluesky (for synthesis acks/posts)")
2017 else:
2018 atproto_client = None
2019 logger.info("Skipping Bluesky connection (test mode)")
2020
2021 # Configure intervals
2022 CLEANUP_INTERVAL = args.cleanup_interval
2023 SYNTHESIS_INTERVAL = args.synthesis_interval
2024
2025 # Synthesis-only mode
2026 if SYNTHESIS_ONLY:
2027 if SYNTHESIS_INTERVAL <= 0:
2028 logger.error("Synthesis-only mode requires --synthesis-interval > 0")
2029 return
2030
2031 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)")
2032
2033 while True:
2034 try:
2035 # Send synthesis message immediately on first run
2036 logger.info("🧠 Sending synthesis message")
2037 send_synthesis_message(CLIENT, void_agent.id, void_agent.name, atproto_client)
2038
2039 # Wait for next interval
2040 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...")
2041 sleep(SYNTHESIS_INTERVAL)
2042
2043 except KeyboardInterrupt:
2044 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===")
2045 break
2046 except Exception as e:
2047 logger.error(f"Error in synthesis loop: {e}")
2048 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...")
2049 sleep(SYNTHESIS_INTERVAL)
2050
2051 # Normal mode with notification processing
2052 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds")
2053
2054 cycle_count = 0
2055
2056 if CLEANUP_INTERVAL > 0:
2057 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles")
2058 else:
2059 logger.info("User block cleanup disabled")
2060
2061 if SYNTHESIS_INTERVAL > 0:
2062 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)")
2063 else:
2064 logger.info("Synthesis messages disabled")
2065
2066 while True:
2067 try:
2068 cycle_count += 1
2069 process_notifications(void_agent, atproto_client, TESTING_MODE)
2070
2071 # Check if synthesis interval has passed
2072 if SYNTHESIS_INTERVAL > 0:
2073 current_time = time.time()
2074 global last_synthesis_time
2075 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL:
2076 logger.info(f"⏰ {SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis")
2077 send_synthesis_message(CLIENT, void_agent.id, void_agent.name, atproto_client)
2078 last_synthesis_time = current_time
2079
2080 # Run periodic cleanup every N cycles
2081 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0:
2082 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})")
2083 periodic_user_block_cleanup(CLIENT, void_agent.id)
2084
2085 # Check if autofollow is enabled and sync followers
2086 from config_loader import get_bluesky_config
2087 bluesky_config = get_bluesky_config()
2088 if bluesky_config.get('autofollow', False):
2089 logger.info("🔄 Syncing followers (autofollow enabled)")
2090 try:
2091 sync_result = bsky_utils.sync_followers(atproto_client, dry_run=False)
2092 if 'error' in sync_result:
2093 logger.error(f"Autofollow failed: {sync_result['error']}")
2094 else:
2095 if sync_result['newly_followed']:
2096 logger.info(f"✓ Followed {len(sync_result['newly_followed'])} new users: {', '.join(sync_result['newly_followed'])}")
2097 else:
2098 logger.debug(f"No new followers to follow back ({sync_result['followers_count']} followers, {sync_result['following_count']} following)")
2099 if sync_result.get('errors'):
2100 logger.warning(f"Some follows failed: {len(sync_result['errors'])} errors")
2101 except Exception as e:
2102 logger.error(f"Error during autofollow sync: {e}")
2103
2104 # Also check database health when doing cleanup
2105 if NOTIFICATION_DB:
2106 db_stats = NOTIFICATION_DB.get_stats()
2107 pending = db_stats.get('status_pending', 0)
2108 errors = db_stats.get('status_error', 0)
2109
2110 if pending > 50:
2111 logger.warning(f"⚠️ Queue health check: {pending} pending notifications (may be stuck)")
2112 if errors > 20:
2113 logger.warning(f"⚠️ Queue health check: {errors} error notifications")
2114
2115 # Periodic cleanup of old records
2116 if cycle_count % (CLEANUP_INTERVAL * 10) == 0: # Every 100 cycles
2117 logger.info("Running database cleanup of old records...")
2118 NOTIFICATION_DB.cleanup_old_records(days=7)
2119
2120 # Log cycle completion with stats
2121 elapsed_time = time.time() - start_time
2122 total_messages = sum(message_counters.values())
2123 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
2124
2125 if total_messages > 0:
2126 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min")
2127 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
2128
2129 except KeyboardInterrupt:
2130 # Final stats
2131 elapsed_time = time.time() - start_time
2132 total_messages = sum(message_counters.values())
2133 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
2134
2135 logger.info("=== BOT STOPPED BY USER ===")
2136 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes")
2137 logger.info(f" - {message_counters['mentions']} mentions")
2138 logger.info(f" - {message_counters['replies']} replies")
2139 logger.info(f" - {message_counters['follows']} follows")
2140 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped")
2141 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute")
2142
2143 # Close database connection
2144 if NOTIFICATION_DB:
2145 logger.info("Closing database connection...")
2146 NOTIFICATION_DB.close()
2147
2148 break
2149 except Exception as e:
2150 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")
2151 logger.error(f"Error details: {e}")
2152 # Wait a bit longer on errors
2153 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...")
2154 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
2155
2156
2157if __name__ == "__main__":
2158 main()