a digital person for bluesky
1# Rich imports removed - using simple text formatting
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string, count_thread_posts
5import os
6import logging
7import json
8import hashlib
9import subprocess
10from pathlib import Path
11from datetime import datetime
12from collections import defaultdict
13import time
14import argparse
15
16from utils import (
17 upsert_block,
18 upsert_agent
19)
20from config_loader import get_letta_config, get_config, get_queue_config
21
22import bsky_utils
23from datetime import date
24from notification_db import NotificationDB
25
26def extract_handles_from_data(data):
27 """Recursively extract all unique handles from nested data structure."""
28 handles = set()
29
30 def _extract_recursive(obj):
31 if isinstance(obj, dict):
32 # Check if this dict has a 'handle' key
33 if 'handle' in obj:
34 handles.add(obj['handle'])
35 # Recursively check all values
36 for value in obj.values():
37 _extract_recursive(value)
38 elif isinstance(obj, list):
39 # Recursively check all list items
40 for item in obj:
41 _extract_recursive(item)
42
43 _extract_recursive(data)
44 return list(handles)
45
46# Logging will be configured after argument parsing
47logger = None
48prompt_logger = None
49# Simple text formatting (Rich no longer used)
50SHOW_REASONING = False
51last_archival_query = "archival memory search"
52
53def log_with_panel(message, title=None, border_color="white"):
54 """Log a message with Unicode box-drawing characters"""
55 if title:
56 # Map old color names to appropriate symbols
57 symbol_map = {
58 "blue": "⚙", # Tool calls
59 "green": "✓", # Success/completion
60 "yellow": "◆", # Reasoning
61 "red": "✗", # Errors
62 "white": "▶", # Default/mentions
63 "cyan": "✎", # Posts
64 }
65 symbol = symbol_map.get(border_color, "▶")
66
67 print(f"\n{symbol} {title}")
68 print(f" {'─' * len(title)}")
69 # Indent message lines
70 for line in message.split('\n'):
71 print(f" {line}")
72 else:
73 print(message)
74
75
76# Load Letta configuration from config.yaml (will be initialized later with custom path if provided)
77letta_config = None
78CLIENT = None
79
80# Notification check delay
81FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response
82
83# Check for new notifications every N queue items
84CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing
85
86# Queue paths (will be initialized from config in main())
87QUEUE_DIR = None
88QUEUE_ERROR_DIR = None
89QUEUE_NO_REPLY_DIR = None
90PROCESSED_NOTIFICATIONS_FILE = None
91
92# Maximum number of processed notifications to track
93MAX_PROCESSED_NOTIFICATIONS = 10000
94
95# Message tracking counters
96message_counters = defaultdict(int)
97start_time = time.time()
98
99# Testing mode flag
100TESTING_MODE = False
101
102# Skip git operations flag
103SKIP_GIT = False
104
105# Synthesis message tracking
106last_synthesis_time = time.time()
107
108# Database for notification tracking
109NOTIFICATION_DB = None
110
111def export_agent_state(client, agent, skip_git=False):
112 """Export agent state to agent_archive/ (timestamped) and agents/ (current)."""
113 try:
114 # Confirm export with user unless git is being skipped
115 if not skip_git:
116 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip()
117 if response not in ['y', 'yes']:
118 logger.info("Agent export cancelled by user.")
119 return
120 else:
121 logger.info("Exporting agent state (git staging disabled)")
122
123 # Create directories if they don't exist
124 os.makedirs("agent_archive", exist_ok=True)
125 os.makedirs("agents", exist_ok=True)
126
127 # Export agent data
128 logger.info(f"Exporting agent {agent.id}. This takes some time...")
129 agent_data = client.agents.export_file(agent_id=agent.id)
130
131 # Save timestamped archive copy
132 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
133 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af")
134 with open(archive_file, 'w', encoding='utf-8') as f:
135 json.dump(agent_data, f, indent=2, ensure_ascii=False)
136
137 # Save current agent state
138 current_file = os.path.join("agents", "void.af")
139 with open(current_file, 'w', encoding='utf-8') as f:
140 json.dump(agent_data, f, indent=2, ensure_ascii=False)
141
142 logger.info(f"Agent exported to {archive_file} and {current_file}")
143
144 # Git add only the current agent file (archive is ignored) unless skip_git is True
145 if not skip_git:
146 try:
147 subprocess.run(["git", "add", current_file], check=True, capture_output=True)
148 logger.info("Added current agent file to git staging")
149 except subprocess.CalledProcessError as e:
150 logger.warning(f"Failed to git add agent file: {e}")
151
152 except Exception as e:
153 logger.error(f"Failed to export agent: {e}")
154
155def initialize_void():
156 logger.info("Starting void agent initialization...")
157
158 # Get the configured void agent by ID
159 logger.info("Loading void agent from config...")
160 agent_id = letta_config['agent_id']
161
162 try:
163 void_agent = CLIENT.agents.retrieve(agent_id=agent_id)
164 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})")
165 except Exception as e:
166 logger.error(f"Failed to load void agent {agent_id}: {e}")
167 logger.error("Please ensure the agent_id in config.yaml is correct")
168 raise e
169
170 # Export agent state
171 logger.info("Exporting agent state...")
172 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
173
174 # Log agent details
175 logger.info(f"Void agent details - ID: {void_agent.id}")
176 logger.info(f"Agent name: {void_agent.name}")
177 if hasattr(void_agent, 'llm_config'):
178 logger.info(f"Agent model: {void_agent.llm_config.model}")
179 if hasattr(void_agent, 'project_id') and void_agent.project_id:
180 logger.info(f"Agent project_id: {void_agent.project_id}")
181 if hasattr(void_agent, 'tools'):
182 logger.info(f"Agent has {len(void_agent.tools)} tools")
183 for tool in void_agent.tools[:3]: # Show first 3 tools
184 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})")
185
186 return void_agent
187
188
189def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False):
190 """Process a mention and generate a reply using the Letta agent.
191
192 Args:
193 void_agent: The Letta agent instance
194 atproto_client: The AT Protocol client
195 notification_data: The notification data dictionary
196 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
197
198 Returns:
199 True: Successfully processed, remove from queue
200 False: Failed but retryable, keep in queue
201 None: Failed with non-retryable error, move to errors directory
202 "no_reply": No reply was generated, move to no_reply directory
203 """
204 import uuid
205
206 # Generate correlation ID for tracking this notification through the pipeline
207 correlation_id = str(uuid.uuid4())[:8]
208
209 try:
210 logger.info(f"[{correlation_id}] Starting process_mention", extra={
211 'correlation_id': correlation_id,
212 'notification_type': type(notification_data).__name__
213 })
214
215 # Handle both dict and object inputs for backwards compatibility
216 if isinstance(notification_data, dict):
217 uri = notification_data['uri']
218 mention_text = notification_data.get('record', {}).get('text', '')
219 author_handle = notification_data['author']['handle']
220 author_name = notification_data['author'].get('display_name') or author_handle
221 else:
222 # Legacy object access
223 uri = notification_data.uri
224 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
225 author_handle = notification_data.author.handle
226 author_name = notification_data.author.display_name or author_handle
227
228 logger.info(f"[{correlation_id}] Processing mention from @{author_handle}", extra={
229 'correlation_id': correlation_id,
230 'author_handle': author_handle,
231 'author_name': author_name,
232 'mention_uri': uri,
233 'mention_text_length': len(mention_text),
234 'mention_preview': mention_text[:100] if mention_text else ''
235 })
236
237 # Check if handle is in allowed list (if configured)
238 allowed_handles = get_config().get('bot.allowed_handles', [])
239 if allowed_handles and author_handle not in allowed_handles:
240 logger.info(f"[{correlation_id}] Skipping mention from @{author_handle} (not in allowed_handles)")
241 return True # Remove from queue
242
243 # Retrieve the entire thread associated with the mention
244 try:
245 thread = atproto_client.app.bsky.feed.get_post_thread({
246 'uri': uri,
247 'parent_height': 40,
248 'depth': 10
249 })
250 except Exception as e:
251 error_str = str(e)
252 # Check if this is a NotFound error
253 if 'NotFound' in error_str or 'Post not found' in error_str:
254 logger.warning(f"Post not found for URI {uri}, removing from queue")
255 return True # Return True to remove from queue
256 elif 'InternalServerError' in error_str:
257 # Bluesky sometimes returns InternalServerError for deleted posts
258 # Verify if post actually exists using getRecord
259 try:
260 parts = uri.replace('at://', '').split('/')
261 repo, collection, rkey = parts[0], parts[1], parts[2]
262 atproto_client.com.atproto.repo.get_record({
263 'repo': repo, 'collection': collection, 'rkey': rkey
264 })
265 # Post exists, this is a real server error - re-raise
266 logger.error(f"Error fetching thread (post exists, server error): {e}")
267 raise
268 except Exception as verify_e:
269 if 'RecordNotFound' in str(verify_e) or 'not found' in str(verify_e).lower():
270 logger.warning(f"Post deleted (verified via getRecord), removing from queue: {uri}")
271 return True # Remove from queue
272 # Some other verification error, re-raise original
273 logger.error(f"Error fetching thread: {e}")
274 raise
275 else:
276 # Re-raise other errors
277 logger.error(f"Error fetching thread: {e}")
278 raise
279
280 # Check thread length against configured maximum
281 max_thread_posts = get_config().get('bot.max_thread_posts', 0)
282 if max_thread_posts > 0:
283 thread_post_count = count_thread_posts(thread)
284 if thread_post_count > max_thread_posts:
285 logger.info(f"Thread too long ({thread_post_count} posts > {max_thread_posts} max), skipping this mention")
286 return True # Return True to remove from queue
287
288 # Get thread context as YAML string
289 logger.debug("Converting thread to YAML string")
290 try:
291 thread_context = thread_to_yaml_string(thread)
292 logger.debug(f"Thread context generated, length: {len(thread_context)} characters")
293
294 # Check if #voidstop appears anywhere in the thread
295 if "#voidstop" in thread_context.lower():
296 logger.info("Found #voidstop in thread context, skipping this mention")
297 return True # Return True to remove from queue
298
299 # Also check the mention text directly
300 if "#voidstop" in mention_text.lower():
301 logger.info("Found #voidstop in mention text, skipping this mention")
302 return True # Return True to remove from queue
303
304 # Create a more informative preview by extracting meaningful content
305 lines = thread_context.split('\n')
306 meaningful_lines = []
307
308 for line in lines:
309 stripped = line.strip()
310 if not stripped:
311 continue
312
313 # Look for lines with actual content (not just structure)
314 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']):
315 meaningful_lines.append(line)
316 if len(meaningful_lines) >= 5:
317 break
318
319 if meaningful_lines:
320 preview = '\n'.join(meaningful_lines)
321 logger.debug(f"Thread content preview:\n{preview}")
322 else:
323 # If no content fields found, just show it's a thread structure
324 logger.debug(f"Thread structure generated ({len(thread_context)} chars)")
325 except Exception as yaml_error:
326 import traceback
327 logger.error(f"Error converting thread to YAML: {yaml_error}")
328 logger.error(f"Full traceback:\n{traceback.format_exc()}")
329 logger.error(f"Thread type: {type(thread)}")
330 if hasattr(thread, '__dict__'):
331 logger.error(f"Thread attributes: {thread.__dict__}")
332 # Try to continue with a simple context
333 thread_context = f"Error processing thread context: {str(yaml_error)}"
334
335 # Create a prompt for the Letta agent with thread context
336 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
337
338MOST RECENT POST:
339"{mention_text}"
340
341THREAD CONTEXT:
342```yaml
343{thread_context}
344```
345
346If you choose to reply, use the add_post_to_bluesky_reply_thread tool. Each call creates one post (max 300 characters). You may use multiple calls to create a thread if needed."""
347
348 # Extract all handles from notification and thread data
349 all_handles = set()
350 all_handles.update(extract_handles_from_data(notification_data))
351 all_handles.update(extract_handles_from_data(thread.model_dump()))
352 unique_handles = list(all_handles)
353
354 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}")
355
356 # Check if any handles are in known_bots list
357 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs
358 import json
359
360 try:
361 # Check for known bots in thread
362 bot_check_result = check_known_bots(unique_handles, void_agent)
363 bot_check_data = json.loads(bot_check_result)
364
365 # TEMPORARILY DISABLED: Bot detection causing issues with normal users
366 # TODO: Re-enable after debugging why normal users are being flagged as bots
367 if False: # bot_check_data.get("bot_detected", False):
368 detected_bots = bot_check_data.get("detected_bots", [])
369 logger.info(f"Bot detected in thread: {detected_bots}")
370
371 # Decide whether to respond (10% chance)
372 if not should_respond_to_bot_thread():
373 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}")
374 # Return False to keep in queue for potential later processing
375 return False
376 else:
377 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}")
378 else:
379 logger.debug("Bot detection disabled - processing all notifications")
380
381 except Exception as bot_check_error:
382 logger.warning(f"Error checking for bots: {bot_check_error}")
383 # Continue processing if bot check fails
384
385 # Get response from Letta agent
386 # Format with Unicode characters
387 title = f"MENTION FROM @{author_handle}"
388 print(f"\n▶ {title}")
389 print(f" {'═' * len(title)}")
390 # Indent the mention text
391 for line in mention_text.split('\n'):
392 print(f" {line}")
393
394 # Log prompt details to separate logger
395 prompt_logger.debug(f"Full prompt being sent:\n{prompt}")
396
397 # Log concise prompt info to main logger
398 thread_handles_count = len(unique_handles)
399 prompt_char_count = len(prompt)
400 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars")
401
402 try:
403 # Use streaming to avoid 524 timeout errors
404 message_stream = CLIENT.agents.messages.create_stream(
405 agent_id=void_agent.id,
406 messages=[{"role": "user", "content": prompt}],
407 stream_tokens=False, # Step streaming only (faster than token streaming)
408 max_steps=100
409 )
410
411 # Collect the streaming response
412 all_messages = []
413 for chunk in message_stream:
414 # Log condensed chunk info
415 if hasattr(chunk, 'message_type'):
416 if chunk.message_type == 'reasoning_message':
417 # Show full reasoning without truncation
418 if SHOW_REASONING:
419 # Format with Unicode characters
420 print("\n◆ Reasoning")
421 print(" ─────────")
422 # Indent reasoning lines
423 for line in chunk.reasoning.split('\n'):
424 print(f" {line}")
425 else:
426 # Default log format (only when --reasoning is used due to log level)
427 # Format with Unicode characters
428 print("\n◆ Reasoning")
429 print(" ─────────")
430 # Indent reasoning lines
431 for line in chunk.reasoning.split('\n'):
432 print(f" {line}")
433
434 # Create ATProto record for reasoning (unless in testing mode)
435 if not testing_mode and hasattr(chunk, 'reasoning'):
436 try:
437 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning)
438 except Exception as e:
439 logger.debug(f"Failed to create reasoning record: {e}")
440 elif chunk.message_type == 'tool_call_message':
441 # Parse tool arguments for better display
442 tool_name = chunk.tool_call.name
443
444 # Create ATProto record for tool call (unless in testing mode)
445 if not testing_mode:
446 try:
447 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None
448 bsky_utils.create_tool_call_record(
449 atproto_client,
450 tool_name,
451 chunk.tool_call.arguments,
452 tool_call_id
453 )
454 except Exception as e:
455 logger.debug(f"Failed to create tool call record: {e}")
456
457 try:
458 args = json.loads(chunk.tool_call.arguments)
459 # Format based on tool type
460 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']:
461 # Extract the text being posted
462 text = args.get('text', '')
463 if text:
464 # Format with Unicode characters
465 print("\n✎ Bluesky Post")
466 print(" ────────────")
467 # Indent post text
468 for line in text.split('\n'):
469 print(f" {line}")
470 else:
471 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
472 elif tool_name == 'archival_memory_search':
473 query = args.get('query', 'unknown')
474 global last_archival_query
475 last_archival_query = query
476 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue")
477 elif tool_name == 'archival_memory_insert':
478 content = args.get('content', '')
479 # Show the full content being inserted
480 log_with_panel(content, f"Tool call: {tool_name}", "blue")
481 elif tool_name == 'update_block':
482 label = args.get('label', 'unknown')
483 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', ''))
484 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue")
485 else:
486 # Generic display for other tools
487 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
488 if len(args_str) > 150:
489 args_str = args_str[:150] + "..."
490 log_with_panel(args_str, f"Tool call: {tool_name}", "blue")
491 except:
492 # Fallback to original format if parsing fails
493 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
494 elif chunk.message_type == 'tool_return_message':
495 # Enhanced tool result logging
496 tool_name = chunk.name
497 status = chunk.status
498
499 if status == 'success':
500 # Try to show meaningful result info based on tool type
501 if hasattr(chunk, 'tool_return') and chunk.tool_return:
502 result_str = str(chunk.tool_return)
503 if tool_name == 'archival_memory_search':
504
505 try:
506 # Handle both string and list formats
507 if isinstance(chunk.tool_return, str):
508 # The string format is: "([{...}, {...}], count)"
509 # We need to extract just the list part
510 if chunk.tool_return.strip():
511 # Find the list part between the first [ and last ]
512 start_idx = chunk.tool_return.find('[')
513 end_idx = chunk.tool_return.rfind(']')
514 if start_idx != -1 and end_idx != -1:
515 list_str = chunk.tool_return[start_idx:end_idx+1]
516 # Use ast.literal_eval since this is Python literal syntax, not JSON
517 import ast
518 results = ast.literal_eval(list_str)
519 else:
520 logger.warning("Could not find list in archival_memory_search result")
521 results = []
522 else:
523 logger.warning("Empty string returned from archival_memory_search")
524 results = []
525 else:
526 # If it's already a list, use directly
527 results = chunk.tool_return
528
529 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name} ✓", "green")
530
531 # Use the captured search query from the tool call
532 search_query = last_archival_query
533
534 # Combine all results into a single text block
535 content_text = ""
536 for i, entry in enumerate(results, 1):
537 timestamp = entry.get('timestamp', 'N/A')
538 content = entry.get('content', '')
539 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n"
540
541 # Format with Unicode characters
542 title = f"{search_query} ({len(results)} results)"
543 print(f"\n⚙ {title}")
544 print(f" {'─' * len(title)}")
545 # Indent content text
546 for line in content_text.strip().split('\n'):
547 print(f" {line}")
548
549 except Exception as e:
550 logger.error(f"Error formatting archival memory results: {e}")
551 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name} ✓", "green")
552 elif tool_name == 'add_post_to_bluesky_reply_thread':
553 # Just show success for bluesky posts, the text was already shown in tool call
554 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green")
555 elif tool_name == 'archival_memory_insert':
556 # Skip archival memory insert results (always returns None)
557 pass
558 elif tool_name == 'update_block':
559 log_with_panel("Memory block updated", f"Tool result: {tool_name} ✓", "green")
560 else:
561 # Generic success with preview
562 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str
563 log_with_panel(preview, f"Tool result: {tool_name} ✓", "green")
564 else:
565 log_with_panel("Success", f"Tool result: {tool_name} ✓", "green")
566 elif status == 'error':
567 # Show error details
568 if tool_name == 'add_post_to_bluesky_reply_thread':
569 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred"
570 log_with_panel(error_str, f"Bluesky Post ✗", "red")
571 elif tool_name == 'archival_memory_insert':
572 # Skip archival memory insert errors too
573 pass
574 else:
575 error_preview = ""
576 if hasattr(chunk, 'tool_return') and chunk.tool_return:
577 error_str = str(chunk.tool_return)
578 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str
579 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name} ✗", "red")
580 else:
581 log_with_panel("Error occurred", f"Tool result: {tool_name} ✗", "red")
582 else:
583 logger.info(f"Tool result: {tool_name} - {status}")
584 elif chunk.message_type == 'assistant_message':
585 # Format with Unicode characters
586 print("\n▶ Assistant Response")
587 print(" ──────────────────")
588 # Indent response text
589 for line in chunk.content.split('\n'):
590 print(f" {line}")
591 elif chunk.message_type == 'error_message':
592 # Dump full error object
593 logger.error(f"Agent error_message: {chunk}")
594 if hasattr(chunk, 'model_dump'):
595 logger.error(f"Agent error (dict): {chunk.model_dump()}")
596 elif hasattr(chunk, '__dict__'):
597 logger.error(f"Agent error (vars): {vars(chunk)}")
598 else:
599 # Filter out verbose message types
600 if chunk.message_type not in ['usage_statistics', 'stop_reason']:
601 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...")
602 else:
603 logger.info(f"📦 Stream status: {chunk}")
604
605 # Log full chunk for debugging
606 logger.debug(f"Full streaming chunk: {chunk}")
607 all_messages.append(chunk)
608 if str(chunk) == 'done':
609 break
610
611 # Convert streaming response to standard format for compatibility
612 message_response = type('StreamingResponse', (), {
613 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
614 })()
615 except Exception as api_error:
616 import traceback
617 error_str = str(api_error)
618 logger.error(f"Letta API error: {api_error}")
619 logger.error(f"Error type: {type(api_error).__name__}")
620 logger.error(f"Full traceback:\n{traceback.format_exc()}")
621 logger.error(f"Mention text was: {mention_text}")
622 logger.error(f"Author: @{author_handle}")
623 logger.error(f"URI: {uri}")
624
625
626 # Try to extract more info from different error types
627 if hasattr(api_error, 'response'):
628 logger.error(f"Error response object exists")
629 if hasattr(api_error.response, 'text'):
630 logger.error(f"Response text: {api_error.response.text}")
631 if hasattr(api_error.response, 'json') and callable(api_error.response.json):
632 try:
633 logger.error(f"Response JSON: {api_error.response.json()}")
634 except:
635 pass
636
637 # Check for specific error types
638 if hasattr(api_error, 'status_code'):
639 logger.error(f"API Status code: {api_error.status_code}")
640 if hasattr(api_error, 'body'):
641 logger.error(f"API Response body: {api_error.body}")
642 if hasattr(api_error, 'headers'):
643 logger.error(f"API Response headers: {api_error.headers}")
644
645 if api_error.status_code == 413:
646 logger.error("413 Payload Too Large - moving to errors directory")
647 return None # Move to errors directory - payload is too large to ever succeed
648 elif api_error.status_code == 524:
649 logger.error("524 error - timeout from Cloudflare, will retry later")
650 return False # Keep in queue for retry
651
652 # Check if error indicates we should remove from queue
653 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str:
654 logger.warning("Payload too large error, moving to errors directory")
655 return None # Move to errors directory - cannot be fixed by retry
656 elif 'status_code: 524' in error_str:
657 logger.warning("524 timeout error, keeping in queue for retry")
658 return False # Keep in queue for retry
659
660 raise
661
662 # Log successful response
663 logger.debug("Successfully received response from Letta API")
664 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}")
665
666 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response
667 reply_candidates = []
668 tool_call_results = {} # Map tool_call_id to status
669 ack_note = None # Track any note from annotate_ack tool
670 flagged_memories = [] # Track memories flagged for deletion
671
672 logger.debug(f"Processing {len(message_response.messages)} response messages...")
673
674 # First pass: collect tool return statuses
675 ignored_notification = False
676 ignore_reason = ""
677 ignore_category = ""
678
679 logger.debug(f"Processing {len(message_response.messages)} messages from agent")
680
681 for message in message_response.messages:
682 # Log detailed message attributes for debugging
683 msg_type = getattr(message, 'message_type', 'unknown')
684 has_tool_call_id = hasattr(message, 'tool_call_id')
685 has_status = hasattr(message, 'status')
686 has_tool_return = hasattr(message, 'tool_return')
687
688 logger.debug(f"Message type={msg_type}, has_tool_call_id={has_tool_call_id}, has_status={has_status}, has_tool_return={has_tool_return}")
689
690 # Tool return messages are identified by having tool_return attribute, tool_call_id, and status
691 if has_tool_call_id and has_status and has_tool_return:
692 logger.debug(f" -> tool_call_id={message.tool_call_id}, status={message.status}")
693
694 # Store the result for ANY tool that has a return - we'll match by tool_call_id later
695 tool_call_results[message.tool_call_id] = message.status
696 logger.debug(f"Stored tool result: {message.tool_call_id} -> {message.status}")
697
698 # Handle special processing for ignore_notification
699 if message.status == 'success':
700 result_str = str(message.tool_return) if message.tool_return else ""
701 if 'IGNORED_NOTIFICATION::' in result_str:
702 parts = result_str.split('::')
703 if len(parts) >= 3:
704 ignore_category = parts[1]
705 ignore_reason = parts[2]
706 ignored_notification = True
707 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}")
708
709 # Check for deprecated tool in tool call messages
710 elif hasattr(message, 'tool_call') and message.tool_call:
711 if message.tool_call.name == 'bluesky_reply':
712 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
713 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
714 logger.error("Update the agent's tools using register_tools.py")
715 # Export agent state before terminating
716 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
717 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
718 exit(1)
719
720 logger.debug(f"First pass complete. Collected {len(tool_call_results)} tool call results")
721 logger.debug(f"tool_call_results: {tool_call_results}")
722
723 # Second pass: process messages and check for successful tool calls
724 for i, message in enumerate(message_response.messages, 1):
725 # Log concise message info instead of full object
726 msg_type = getattr(message, 'message_type', 'unknown')
727 if hasattr(message, 'reasoning') and message.reasoning:
728 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...")
729 elif hasattr(message, 'tool_call') and message.tool_call:
730 tool_name = message.tool_call.name
731 logger.debug(f" {i}. {msg_type}: {tool_name}")
732 elif hasattr(message, 'tool_return'):
733 tool_name = getattr(message, 'name', 'unknown_tool')
734 return_preview = str(message.tool_return)[:100] if message.tool_return else "None"
735 status = getattr(message, 'status', 'unknown')
736 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})")
737 elif hasattr(message, 'text'):
738 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...")
739 else:
740 logger.debug(f" {i}. {msg_type}: <no content>")
741
742 # Check for halt_activity tool call
743 if hasattr(message, 'tool_call') and message.tool_call:
744 if message.tool_call.name == 'halt_activity':
745 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT")
746 try:
747 args = json.loads(message.tool_call.arguments)
748 reason = args.get('reason', 'Agent requested halt')
749 logger.info(f"Halt reason: {reason}")
750 except:
751 logger.info("Halt reason: <unable to parse>")
752
753 # Delete the queue file before terminating
754 if queue_filepath and queue_filepath.exists():
755 queue_filepath.unlink()
756 logger.info(f"Deleted queue file: {queue_filepath.name}")
757
758 # Also mark as processed to avoid reprocessing
759 if NOTIFICATION_DB:
760 NOTIFICATION_DB.mark_processed(notification_data.get('uri', ''), status='processed')
761 else:
762 processed_uris = load_processed_notifications()
763 processed_uris.add(notification_data.get('uri', ''))
764 save_processed_notifications(processed_uris)
765
766 # Export agent state before terminating
767 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
768
769 # Exit the program
770 logger.info("=== BOT TERMINATED BY AGENT ===")
771 exit(0)
772
773 # Check for deprecated bluesky_reply tool
774 if hasattr(message, 'tool_call') and message.tool_call:
775 if message.tool_call.name == 'bluesky_reply':
776 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
777 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
778 logger.error("Update the agent's tools using register_tools.py")
779 # Export agent state before terminating
780 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
781 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
782 exit(1)
783
784 # Collect annotate_ack tool calls
785 elif message.tool_call.name == 'annotate_ack':
786 try:
787 args = json.loads(message.tool_call.arguments)
788 note = args.get('note', '')
789 if note:
790 ack_note = note
791 logger.debug(f"Found annotate_ack with note: {note[:50]}...")
792 except json.JSONDecodeError as e:
793 logger.error(f"Failed to parse annotate_ack arguments: {e}")
794
795 # Collect flag_archival_memory_for_deletion tool calls
796 elif message.tool_call.name == 'flag_archival_memory_for_deletion':
797 try:
798 args = json.loads(message.tool_call.arguments)
799 reason = args.get('reason', '')
800 memory_text = args.get('memory_text', '')
801 confirm = args.get('confirm', False)
802
803 # Only flag for deletion if confirmed and has all required fields
804 if confirm and memory_text and reason:
805 flagged_memories.append({
806 'reason': reason,
807 'memory_text': memory_text
808 })
809 logger.debug(f"Found memory flagged for deletion (reason: {reason}): {memory_text[:50]}...")
810 elif not confirm:
811 logger.debug(f"Memory deletion not confirmed, skipping: {memory_text[:50]}...")
812 elif not reason:
813 logger.warning(f"Memory deletion missing reason, skipping: {memory_text[:50]}...")
814 except json.JSONDecodeError as e:
815 logger.error(f"Failed to parse flag_archival_memory_for_deletion arguments: {e}")
816
817 # Collect archival_memory_insert tool calls for recording to AT Protocol
818 elif message.tool_call.name == 'archival_memory_insert':
819 try:
820 args = json.loads(message.tool_call.arguments)
821 content = args.get('content', '')
822 tags_str = args.get('tags', None)
823
824 # Parse tags from string representation if present
825 tags = None
826 if tags_str:
827 try:
828 tags = json.loads(tags_str) if isinstance(tags_str, str) else tags_str
829 except json.JSONDecodeError:
830 logger.warning(f"Failed to parse tags from archival_memory_insert: {tags_str[:50]}...")
831
832 if content:
833 # Create stream.thought.memory record
834 try:
835 memory_result = bsky_utils.create_memory_record(atproto_client, content, tags)
836 if memory_result:
837 tags_info = f" ({len(tags)} tags)" if tags else ""
838 logger.info(f"📝 Recorded archival memory to AT Protocol{tags_info}: {content[:100]}...")
839 else:
840 logger.warning(f"Failed to record archival memory to AT Protocol")
841 except Exception as e:
842 logger.error(f"Error creating memory record: {e}")
843 except json.JSONDecodeError as e:
844 logger.error(f"Failed to parse archival_memory_insert arguments: {e}")
845
846 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful
847 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread':
848 tool_call_id = message.tool_call.tool_call_id
849 tool_status = tool_call_results.get(tool_call_id, 'unknown')
850
851 logger.debug(f"Found add_post_to_bluesky_reply_thread tool call: id={tool_call_id}, status={tool_status}")
852 logger.debug(f"Available tool_call_results: {tool_call_results}")
853
854 if tool_status == 'success':
855 try:
856 args = json.loads(message.tool_call.arguments)
857 reply_text = args.get('text', '')
858 reply_lang = args.get('lang', 'en-US')
859
860 if reply_text: # Only add if there's actual content
861 reply_candidates.append((reply_text, reply_lang))
862 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})")
863 except json.JSONDecodeError as e:
864 logger.error(f"Failed to parse tool call arguments: {e}")
865 elif tool_status == 'error':
866 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)")
867 else:
868 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}")
869 logger.warning(f" Tool call ID '{tool_call_id}' not found in tool_call_results dict")
870
871 # Handle archival memory deletion if any were flagged (only if no halt was received)
872 if flagged_memories:
873 logger.info(f"Processing {len(flagged_memories)} flagged memories for deletion")
874 for flagged_memory in flagged_memories:
875 reason = flagged_memory['reason']
876 memory_text = flagged_memory['memory_text']
877
878 try:
879 # Search for passages with this exact text
880 logger.debug(f"Searching for passages matching: {memory_text[:100]}...")
881 passages = CLIENT.agents.passages.list(
882 agent_id=void_agent.id,
883 query=memory_text
884 )
885
886 if not passages:
887 logger.warning(f"No passages found matching flagged memory: {memory_text[:50]}...")
888 continue
889
890 # Delete all matching passages
891 deleted_count = 0
892 for passage in passages:
893 # Check if the passage text exactly matches (to avoid partial matches)
894 if hasattr(passage, 'text') and passage.text == memory_text:
895 try:
896 CLIENT.agents.passages.delete(
897 agent_id=void_agent.id,
898 passage_id=str(passage.id)
899 )
900 deleted_count += 1
901 logger.debug(f"Deleted passage {passage.id}")
902 except Exception as delete_error:
903 logger.error(f"Failed to delete passage {passage.id}: {delete_error}")
904
905 if deleted_count > 0:
906 logger.info(f"🗑️ Deleted {deleted_count} archival memory passage(s) (reason: {reason}): {memory_text[:50]}...")
907 else:
908 logger.warning(f"No exact matches found for deletion: {memory_text[:50]}...")
909
910 except Exception as e:
911 logger.error(f"Error processing memory deletion: {e}")
912
913 # Check for conflicting tool calls
914 if reply_candidates and ignored_notification:
915 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!")
916 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}")
917 logger.warning("Item will be left in queue for manual review")
918 # Return False to keep in queue
919 return False
920
921 if reply_candidates:
922 # Aggregate reply posts into a thread
923 reply_messages = []
924 reply_langs = []
925 for text, lang in reply_candidates:
926 reply_messages.append(text)
927 reply_langs.append(lang)
928
929 # Use the first language for the entire thread (could be enhanced later)
930 reply_lang = reply_langs[0] if reply_langs else 'en-US'
931
932 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread")
933
934 # Display the generated reply thread
935 if len(reply_messages) == 1:
936 content = reply_messages[0]
937 title = f"Reply to @{author_handle}"
938 else:
939 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)])
940 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)"
941
942 # Format with Unicode characters
943 print(f"\n✎ {title}")
944 print(f" {'─' * len(title)}")
945 # Indent content lines
946 for line in content.split('\n'):
947 print(f" {line}")
948
949 # Send the reply(s) with language (unless in testing mode)
950 if testing_mode:
951 logger.info("TESTING MODE: Skipping actual Bluesky post")
952 response = True # Simulate success
953 else:
954 if len(reply_messages) == 1:
955 # Single reply - use existing function
956 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0])
957 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})")
958 response = bsky_utils.reply_to_notification(
959 client=atproto_client,
960 notification=notification_data,
961 reply_text=cleaned_text,
962 lang=reply_lang,
963 correlation_id=correlation_id
964 )
965 else:
966 # Multiple replies - use new threaded function
967 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages]
968 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})")
969 response = bsky_utils.reply_with_thread_to_notification(
970 client=atproto_client,
971 notification=notification_data,
972 reply_messages=cleaned_messages,
973 lang=reply_lang,
974 correlation_id=correlation_id
975 )
976
977 if response:
978 logger.info(f"[{correlation_id}] Successfully replied to @{author_handle}", extra={
979 'correlation_id': correlation_id,
980 'author_handle': author_handle,
981 'reply_count': len(reply_messages)
982 })
983
984 # Acknowledge the post we're replying to with stream.thought.ack
985 try:
986 post_uri = notification_data.get('uri')
987 post_cid = notification_data.get('cid')
988
989 if post_uri and post_cid:
990 ack_result = bsky_utils.acknowledge_post(
991 client=atproto_client,
992 post_uri=post_uri,
993 post_cid=post_cid,
994 note=ack_note
995 )
996 if ack_result:
997 if ack_note:
998 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")")
999 else:
1000 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack")
1001 else:
1002 logger.warning(f"Failed to acknowledge post from @{author_handle}")
1003 else:
1004 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}")
1005 except Exception as e:
1006 logger.error(f"Error acknowledging post from @{author_handle}: {e}")
1007 # Don't fail the entire operation if acknowledgment fails
1008
1009 return True
1010 else:
1011 logger.error(f"Failed to send reply to @{author_handle}")
1012 return False
1013 else:
1014 # Check if notification was explicitly ignored
1015 if ignored_notification:
1016 logger.info(f"[{correlation_id}] Notification from @{author_handle} was explicitly ignored (category: {ignore_category})", extra={
1017 'correlation_id': correlation_id,
1018 'author_handle': author_handle,
1019 'ignore_category': ignore_category
1020 })
1021 return "ignored"
1022 else:
1023 logger.warning(f"[{correlation_id}] No reply generated for mention from @{author_handle}, moving to no_reply folder", extra={
1024 'correlation_id': correlation_id,
1025 'author_handle': author_handle
1026 })
1027 return "no_reply"
1028
1029 except Exception as e:
1030 logger.error(f"[{correlation_id}] Error processing mention: {e}", extra={
1031 'correlation_id': correlation_id,
1032 'error': str(e),
1033 'error_type': type(e).__name__,
1034 'author_handle': author_handle if 'author_handle' in locals() else 'unknown'
1035 })
1036 return False
1037
1038
1039def notification_to_dict(notification):
1040 """Convert a notification object to a dictionary for JSON serialization."""
1041 return {
1042 'uri': notification.uri,
1043 'cid': notification.cid,
1044 'reason': notification.reason,
1045 'is_read': notification.is_read,
1046 'indexed_at': notification.indexed_at,
1047 'author': {
1048 'handle': notification.author.handle,
1049 'display_name': notification.author.display_name,
1050 'did': notification.author.did
1051 },
1052 'record': {
1053 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
1054 }
1055 }
1056
1057
1058def load_processed_notifications():
1059 """Load the set of processed notification URIs from database."""
1060 global NOTIFICATION_DB
1061 if NOTIFICATION_DB:
1062 return NOTIFICATION_DB.get_processed_uris(limit=MAX_PROCESSED_NOTIFICATIONS)
1063 return set()
1064
1065
1066def save_processed_notifications(processed_set):
1067 """Save the set of processed notification URIs to database."""
1068 # This is now handled by marking individual notifications in the DB
1069 # Keeping function for compatibility but it doesn't need to do anything
1070 pass
1071
1072
1073def save_notification_to_queue(notification, is_priority=None):
1074 """Save a notification to the queue directory with priority-based filename."""
1075 try:
1076 global NOTIFICATION_DB
1077
1078 # Handle both notification objects and dicts
1079 if isinstance(notification, dict):
1080 notif_dict = notification
1081 notification_uri = notification.get('uri')
1082 else:
1083 notif_dict = notification_to_dict(notification)
1084 notification_uri = notification.uri
1085
1086 # Check if already processed (using database if available)
1087 if NOTIFICATION_DB:
1088 if NOTIFICATION_DB.is_processed(notification_uri):
1089 logger.debug(f"Notification already processed (DB): {notification_uri}")
1090 return False
1091 # Add to database - if this fails, don't queue the notification
1092 if not NOTIFICATION_DB.add_notification(notif_dict):
1093 logger.warning(f"Failed to add notification to database, skipping: {notification_uri}")
1094 return False
1095 else:
1096 # Fall back to old JSON method
1097 processed_uris = load_processed_notifications()
1098 if notification_uri in processed_uris:
1099 logger.debug(f"Notification already processed: {notification_uri}")
1100 return False
1101
1102 # Create JSON string
1103 notif_json = json.dumps(notif_dict, sort_keys=True)
1104
1105 # Generate hash for filename (to avoid duplicates)
1106 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
1107
1108 # Determine priority based on author handle or explicit priority
1109 if is_priority is not None:
1110 priority_prefix = "0_" if is_priority else "1_"
1111 else:
1112 if isinstance(notification, dict):
1113 author_handle = notification.get('author', {}).get('handle', '')
1114 else:
1115 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else ''
1116 # Prioritize cameron.pfiffer.org responses
1117 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_"
1118
1119 # Create filename with priority, timestamp and hash
1120 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
1121 reason = notif_dict.get('reason', 'unknown')
1122 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json"
1123 filepath = QUEUE_DIR / filename
1124
1125 # Check if this notification URI is already in the queue
1126 for existing_file in QUEUE_DIR.glob("*.json"):
1127 if existing_file.name == "processed_notifications.json":
1128 continue
1129 try:
1130 with open(existing_file, 'r') as f:
1131 existing_data = json.load(f)
1132 if existing_data.get('uri') == notification_uri:
1133 logger.debug(f"Notification already queued (URI: {notification_uri})")
1134 return False
1135 except:
1136 continue
1137
1138 # Write to file
1139 with open(filepath, 'w') as f:
1140 json.dump(notif_dict, f, indent=2)
1141
1142 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal"
1143 logger.info(f"Queued notification ({priority_label}): {filename}")
1144 return True
1145
1146 except Exception as e:
1147 logger.error(f"Error saving notification to queue: {e}")
1148 return False
1149
1150
1151def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False):
1152 """Load and process all notifications from the queue in priority order."""
1153 try:
1154 # Get all JSON files in queue directory (excluding processed_notifications.json)
1155 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix)
1156 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
1157
1158 # Filter out and delete like notifications immediately
1159 queue_files = []
1160 likes_deleted = 0
1161
1162 for filepath in all_queue_files:
1163 try:
1164 with open(filepath, 'r') as f:
1165 notif_data = json.load(f)
1166
1167 # If it's a like, delete it immediately and don't process
1168 if notif_data.get('reason') == 'like':
1169 filepath.unlink()
1170 likes_deleted += 1
1171 logger.debug(f"Deleted like notification: {filepath.name}")
1172 else:
1173 queue_files.append(filepath)
1174 except Exception as e:
1175 logger.warning(f"Error checking notification file {filepath.name}: {e}")
1176 queue_files.append(filepath) # Keep it in case it's valid
1177
1178 if likes_deleted > 0:
1179 logger.info(f"Deleted {likes_deleted} like notifications from queue")
1180
1181 if not queue_files:
1182 return
1183
1184 logger.info(f"Processing {len(queue_files)} queued notifications")
1185
1186 # Log current statistics
1187 elapsed_time = time.time() - start_time
1188 total_messages = sum(message_counters.values())
1189 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1190
1191 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min")
1192
1193 for i, filepath in enumerate(queue_files, 1):
1194 # Determine if this is a priority notification
1195 is_priority = filepath.name.startswith("0_")
1196
1197 # Check for new notifications periodically during queue processing
1198 # Also check immediately after processing each priority item
1199 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1)
1200
1201 # If we just processed a priority item, immediately check for new priority notifications
1202 if is_priority and i > 1:
1203 should_check_notifications = True
1204
1205 if should_check_notifications:
1206 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)")
1207 try:
1208 # Fetch and queue new notifications without processing them
1209 new_count = fetch_and_queue_new_notifications(atproto_client)
1210
1211 if new_count > 0:
1212 logger.info(f"Added {new_count} new notifications to queue")
1213 # Reload the queue files to include the new items
1214 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
1215 queue_files = updated_queue_files
1216 logger.info(f"Queue updated: now {len(queue_files)} total items")
1217 except Exception as e:
1218 logger.error(f"Error checking for new notifications: {e}")
1219
1220 priority_label = " [PRIORITY]" if is_priority else ""
1221 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}")
1222 try:
1223 # Load notification data
1224 with open(filepath, 'r') as f:
1225 notif_data = json.load(f)
1226
1227 # Process based on type using dict data directly
1228 success = False
1229 if notif_data['reason'] == "mention":
1230 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
1231 if success:
1232 message_counters['mentions'] += 1
1233 elif notif_data['reason'] == "reply":
1234 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
1235 if success:
1236 message_counters['replies'] += 1
1237 elif notif_data['reason'] == "follow":
1238 # author_handle = notif_data['author']['handle']
1239 # author_display_name = notif_data['author'].get('display_name', 'no display name')
1240 # follow_update = f"@{author_handle} ({author_display_name}) started following you."
1241 # follow_message = f"Update: {follow_update}"
1242 # logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars")
1243 # CLIENT.agents.messages.create(
1244 # agent_id = void_agent.id,
1245 # messages = [{"role":"user", "content": follow_message}]
1246 # )
1247 success = True # Follow updates are always successful
1248 # if success:
1249 # message_counters['follows'] += 1
1250 elif notif_data['reason'] == "repost":
1251 # Skip reposts silently
1252 success = True # Skip reposts but mark as successful to remove from queue
1253 if success:
1254 message_counters['reposts_skipped'] += 1
1255 elif notif_data['reason'] == "like":
1256 # Skip likes silently
1257 success = True # Skip likes but mark as successful to remove from queue
1258 if success:
1259 message_counters.setdefault('likes_skipped', 0)
1260 message_counters['likes_skipped'] += 1
1261 else:
1262 logger.warning(f"Unknown notification type: {notif_data['reason']}")
1263 success = True # Remove unknown types from queue
1264
1265 # Handle file based on processing result
1266 if success:
1267 if testing_mode:
1268 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}")
1269 else:
1270 filepath.unlink()
1271 logger.info(f"Successfully processed and removed: {filepath.name}")
1272
1273 # Mark as processed to avoid reprocessing
1274 if NOTIFICATION_DB:
1275 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='processed')
1276 else:
1277 processed_uris = load_processed_notifications()
1278 processed_uris.add(notif_data['uri'])
1279 save_processed_notifications(processed_uris)
1280
1281 elif success is None: # Special case for moving to error directory
1282 error_path = QUEUE_ERROR_DIR / filepath.name
1283 filepath.rename(error_path)
1284 logger.warning(f"Moved {filepath.name} to errors directory")
1285
1286 # Also mark as processed to avoid retrying
1287 if NOTIFICATION_DB:
1288 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1289 else:
1290 processed_uris = load_processed_notifications()
1291 processed_uris.add(notif_data['uri'])
1292 save_processed_notifications(processed_uris)
1293
1294 elif success == "no_reply": # Special case for moving to no_reply directory
1295 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name
1296 filepath.rename(no_reply_path)
1297 logger.info(f"Moved {filepath.name} to no_reply directory")
1298
1299 # Also mark as processed to avoid retrying
1300 if NOTIFICATION_DB:
1301 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1302 else:
1303 processed_uris = load_processed_notifications()
1304 processed_uris.add(notif_data['uri'])
1305 save_processed_notifications(processed_uris)
1306
1307 elif success == "ignored": # Special case for explicitly ignored notifications
1308 # For ignored notifications, we just delete them (not move to no_reply)
1309 filepath.unlink()
1310 logger.info(f"🚫 Deleted ignored notification: {filepath.name}")
1311
1312 # Also mark as processed to avoid retrying
1313 if NOTIFICATION_DB:
1314 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1315 else:
1316 processed_uris = load_processed_notifications()
1317 processed_uris.add(notif_data['uri'])
1318 save_processed_notifications(processed_uris)
1319
1320 else:
1321 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry")
1322
1323 except Exception as e:
1324 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}")
1325 # Keep the file for retry later
1326
1327 except Exception as e:
1328 logger.error(f"Error loading queued notifications: {e}")
1329
1330
1331def fetch_and_queue_new_notifications(atproto_client):
1332 """Fetch new notifications and queue them without processing."""
1333 try:
1334 global NOTIFICATION_DB
1335
1336 # Get current time for marking notifications as seen
1337 logger.debug("Getting current time for notification marking...")
1338 last_seen_at = atproto_client.get_current_time_iso()
1339
1340 # Get timestamp of last processed notification for filtering
1341 last_processed_time = None
1342 if NOTIFICATION_DB:
1343 last_processed_time = NOTIFICATION_DB.get_latest_processed_time()
1344 if last_processed_time:
1345 logger.debug(f"Last processed notification was at: {last_processed_time}")
1346
1347 # Fetch ALL notifications using pagination
1348 all_notifications = []
1349 cursor = None
1350 page_count = 0
1351 max_pages = 20 # Safety limit to prevent infinite loops
1352
1353 while page_count < max_pages:
1354 try:
1355 # Fetch notifications page
1356 if cursor:
1357 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1358 params={'cursor': cursor, 'limit': 100}
1359 )
1360 else:
1361 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1362 params={'limit': 100}
1363 )
1364
1365 page_count += 1
1366 page_notifications = notifications_response.notifications
1367
1368 if not page_notifications:
1369 break
1370
1371 all_notifications.extend(page_notifications)
1372
1373 # Check if there are more pages
1374 cursor = getattr(notifications_response, 'cursor', None)
1375 if not cursor:
1376 break
1377
1378 except Exception as e:
1379 logger.error(f"Error fetching notifications page {page_count}: {e}")
1380 break
1381
1382 # Now process all fetched notifications
1383 new_count = 0
1384 if all_notifications:
1385 logger.info(f"📥 Fetched {len(all_notifications)} total notifications from API")
1386
1387 # Mark as seen first
1388 try:
1389 atproto_client.app.bsky.notification.update_seen(
1390 data={'seenAt': last_seen_at}
1391 )
1392 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}")
1393 except Exception as e:
1394 logger.error(f"Error marking notifications as seen: {e}")
1395
1396 # Debug counters
1397 skipped_read = 0
1398 skipped_likes = 0
1399 skipped_processed = 0
1400 skipped_old_timestamp = 0
1401 processed_uris = load_processed_notifications()
1402
1403 # Queue all new notifications (except likes)
1404 for notif in all_notifications:
1405 # Skip if older than last processed (when we have timestamp filtering)
1406 if last_processed_time and hasattr(notif, 'indexed_at'):
1407 if notif.indexed_at <= last_processed_time:
1408 skipped_old_timestamp += 1
1409 logger.debug(f"Skipping old notification (indexed_at {notif.indexed_at} <= {last_processed_time})")
1410 continue
1411
1412 # Debug: Log is_read status but DON'T skip based on it
1413 if hasattr(notif, 'is_read') and notif.is_read:
1414 skipped_read += 1
1415 logger.debug(f"Notification has is_read=True (but processing anyway): {notif.uri if hasattr(notif, 'uri') else 'unknown'}")
1416
1417 # Skip likes
1418 if hasattr(notif, 'reason') and notif.reason == 'like':
1419 skipped_likes += 1
1420 continue
1421
1422 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif
1423
1424 # Skip likes in dict form too
1425 if notif_dict.get('reason') == 'like':
1426 continue
1427
1428 # Check if already processed
1429 notif_uri = notif_dict.get('uri', '')
1430 if notif_uri in processed_uris:
1431 skipped_processed += 1
1432 logger.debug(f"Skipping already processed: {notif_uri}")
1433 continue
1434
1435 # Check if it's a priority notification
1436 is_priority = False
1437
1438 # Priority for cameron.pfiffer.org notifications
1439 author_handle = notif_dict.get('author', {}).get('handle', '')
1440 if author_handle == "cameron.pfiffer.org":
1441 is_priority = True
1442
1443 # Also check for priority keywords in mentions
1444 if notif_dict.get('reason') == 'mention':
1445 # Get the mention text to check for priority keywords
1446 record = notif_dict.get('record', {})
1447 text = record.get('text', '')
1448 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']):
1449 is_priority = True
1450
1451 if save_notification_to_queue(notif_dict, is_priority=is_priority):
1452 new_count += 1
1453 logger.debug(f"Queued notification from @{author_handle}: {notif_dict.get('reason', 'unknown')}")
1454
1455 # Log summary of filtering
1456 logger.info(f"📊 Notification processing summary:")
1457 logger.info(f" • Total fetched: {len(all_notifications)}")
1458 logger.info(f" • Had is_read=True: {skipped_read} (not skipped)")
1459 logger.info(f" • Skipped (likes): {skipped_likes}")
1460 logger.info(f" • Skipped (old timestamp): {skipped_old_timestamp}")
1461 logger.info(f" • Skipped (already processed): {skipped_processed}")
1462 logger.info(f" • Queued for processing: {new_count}")
1463 else:
1464 logger.debug("No new notifications to queue")
1465
1466 return new_count
1467
1468 except Exception as e:
1469 logger.error(f"Error fetching and queueing notifications: {e}")
1470 return 0
1471
1472
1473def process_notifications(void_agent, atproto_client, testing_mode=False):
1474 """Fetch new notifications, queue them, and process the queue."""
1475 try:
1476 # Fetch and queue new notifications
1477 new_count = fetch_and_queue_new_notifications(atproto_client)
1478
1479 if new_count > 0:
1480 logger.info(f"Found {new_count} new notifications to process")
1481
1482 # Now process the entire queue (old + new notifications)
1483 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode)
1484
1485 except Exception as e:
1486 logger.error(f"Error processing notifications: {e}")
1487
1488
1489def send_synthesis_message(client: Letta, agent_id: str, agent_name: str = "void", atproto_client=None) -> None:
1490 """
1491 Send a synthesis message to the agent every 10 minutes.
1492 This prompts the agent to synthesize its recent experiences.
1493
1494 Args:
1495 client: Letta client
1496 agent_id: Agent ID to send synthesis to
1497 agent_name: Agent name for temporal block labels
1498 atproto_client: Optional AT Protocol client for posting synthesis results
1499 """
1500 # Track attached temporal blocks for cleanup
1501 attached_temporal_labels = []
1502
1503 try:
1504 logger.info("🧠 Preparing synthesis with temporal journal blocks")
1505
1506 # Attach temporal blocks before synthesis
1507 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id, agent_name)
1508 if not success:
1509 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway")
1510
1511 # Create synthesis prompt with agent-specific block names
1512 today = date.today()
1513 synthesis_prompt = f"""Time for synthesis.
1514
1515This is your periodic opportunity to reflect on recent experiences and update your memory.
1516
1517The following temporal journal blocks are temporarily available for this session:
1518- {agent_name}_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')})
1519- {agent_name}_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')})
1520- {agent_name}_year_{today.year}: This year's journal ({today.year})
1521
1522You may use these blocks as you see fit. Synthesize your recent experiences into your memory as appropriate."""
1523
1524 logger.info("🧠 Sending enhanced synthesis prompt to agent")
1525
1526 # Send synthesis message with streaming to show tool use
1527 message_stream = client.agents.messages.create_stream(
1528 agent_id=agent_id,
1529 messages=[{"role": "user", "content": synthesis_prompt}],
1530 stream_tokens=False,
1531 max_steps=100
1532 )
1533
1534 # Track synthesis content for potential posting
1535 synthesis_posts = []
1536 ack_note = None
1537
1538 # Process the streaming response
1539 for chunk in message_stream:
1540 if hasattr(chunk, 'message_type'):
1541 if chunk.message_type == 'reasoning_message':
1542 if SHOW_REASONING:
1543 print("\n◆ Reasoning")
1544 print(" ─────────")
1545 for line in chunk.reasoning.split('\n'):
1546 print(f" {line}")
1547
1548 # Create ATProto record for reasoning (if we have atproto client)
1549 if atproto_client and hasattr(chunk, 'reasoning'):
1550 try:
1551 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning)
1552 except Exception as e:
1553 logger.debug(f"Failed to create reasoning record during synthesis: {e}")
1554 elif chunk.message_type == 'tool_call_message':
1555 tool_name = chunk.tool_call.name
1556
1557 # Create ATProto record for tool call (if we have atproto client)
1558 if atproto_client:
1559 try:
1560 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None
1561 bsky_utils.create_tool_call_record(
1562 atproto_client,
1563 tool_name,
1564 chunk.tool_call.arguments,
1565 tool_call_id
1566 )
1567 except Exception as e:
1568 logger.debug(f"Failed to create tool call record during synthesis: {e}")
1569 try:
1570 args = json.loads(chunk.tool_call.arguments)
1571 if tool_name == 'archival_memory_search':
1572 query = args.get('query', 'unknown')
1573 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue")
1574 elif tool_name == 'archival_memory_insert':
1575 content = args.get('content', '')
1576 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue")
1577
1578 # Record archival memory insert to AT Protocol
1579 if atproto_client:
1580 try:
1581 tags_str = args.get('tags', None)
1582 tags = None
1583 if tags_str:
1584 try:
1585 tags = json.loads(tags_str) if isinstance(tags_str, str) else tags_str
1586 except json.JSONDecodeError:
1587 logger.warning(f"Failed to parse tags from archival_memory_insert: {tags_str[:50]}...")
1588
1589 memory_result = bsky_utils.create_memory_record(atproto_client, content, tags)
1590 if memory_result:
1591 tags_info = f" ({len(tags)} tags)" if tags else ""
1592 logger.info(f"📝 Recorded archival memory to AT Protocol{tags_info}")
1593 except Exception as e:
1594 logger.debug(f"Failed to create memory record during synthesis: {e}")
1595 elif tool_name == 'update_block':
1596 label = args.get('label', 'unknown')
1597 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', ''))
1598 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue")
1599 elif tool_name == 'annotate_ack':
1600 note = args.get('note', '')
1601 if note:
1602 ack_note = note
1603 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue")
1604 elif tool_name == 'add_post_to_bluesky_reply_thread':
1605 text = args.get('text', '')
1606 synthesis_posts.append(text)
1607 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue")
1608 else:
1609 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
1610 if len(args_str) > 150:
1611 args_str = args_str[:150] + "..."
1612 log_with_panel(args_str, f"Tool call: {tool_name}", "blue")
1613 except:
1614 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
1615 elif chunk.message_type == 'tool_return_message':
1616 if chunk.status == 'success':
1617 log_with_panel("Success", f"Tool result: {chunk.name} ✓", "green")
1618 else:
1619 log_with_panel("Error", f"Tool result: {chunk.name} ✗", "red")
1620 elif chunk.message_type == 'assistant_message':
1621 print("\n▶ Synthesis Response")
1622 print(" ──────────────────")
1623 for line in chunk.content.split('\n'):
1624 print(f" {line}")
1625 elif chunk.message_type == 'error_message':
1626 # Dump full error object
1627 logger.error(f"Synthesis error_message: {chunk}")
1628 if hasattr(chunk, 'model_dump'):
1629 logger.error(f"Synthesis error (dict): {chunk.model_dump()}")
1630 elif hasattr(chunk, '__dict__'):
1631 logger.error(f"Synthesis error (vars): {vars(chunk)}")
1632
1633 if str(chunk) == 'done':
1634 break
1635
1636 logger.info("🧠 Synthesis message processed successfully")
1637
1638 # Handle synthesis acknowledgments if we have an atproto client
1639 if atproto_client and ack_note:
1640 try:
1641 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note)
1642 if result:
1643 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...")
1644 else:
1645 logger.warning("Failed to create synthesis acknowledgment")
1646 except Exception as e:
1647 logger.error(f"Error creating synthesis acknowledgment: {e}")
1648
1649 # Handle synthesis posts if any were generated
1650 if atproto_client and synthesis_posts:
1651 try:
1652 for post_text in synthesis_posts:
1653 cleaned_text = bsky_utils.remove_outside_quotes(post_text)
1654 response = bsky_utils.send_post(atproto_client, cleaned_text)
1655 if response:
1656 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...")
1657 else:
1658 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...")
1659 except Exception as e:
1660 logger.error(f"Error posting synthesis content: {e}")
1661
1662 except Exception as e:
1663 logger.error(f"Error sending synthesis message: {e}")
1664 finally:
1665 # Always detach temporal blocks after synthesis
1666 if attached_temporal_labels:
1667 logger.info("🧠 Detaching temporal journal blocks after synthesis")
1668 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels, agent_name)
1669 if not detach_success:
1670 logger.warning("Some temporal blocks may not have been detached properly")
1671
1672
1673def attach_temporal_blocks(client: Letta, agent_id: str, agent_name: str = "void") -> tuple:
1674 """
1675 Attach temporal journal blocks (day, month, year) to the agent for synthesis.
1676 Creates blocks if they don't exist.
1677
1678 Args:
1679 client: Letta client
1680 agent_id: Agent ID
1681 agent_name: Agent name for prefixing block labels (prevents collision across agents)
1682
1683 Returns:
1684 Tuple of (success: bool, attached_labels: list)
1685 """
1686 try:
1687 today = date.today()
1688
1689 # Generate temporal block labels with agent-specific prefix
1690 day_label = f"{agent_name}_day_{today.strftime('%Y_%m_%d')}"
1691 month_label = f"{agent_name}_month_{today.strftime('%Y_%m')}"
1692 year_label = f"{agent_name}_year_{today.year}"
1693
1694 temporal_labels = [day_label, month_label, year_label]
1695 attached_labels = []
1696
1697 # Get current blocks attached to agent
1698 current_blocks = client.agents.blocks.list(agent_id=agent_id)
1699 current_block_labels = {block.label for block in current_blocks}
1700 current_block_ids = {str(block.id) for block in current_blocks}
1701
1702 for label in temporal_labels:
1703 try:
1704 # Skip if already attached
1705 if label in current_block_labels:
1706 logger.debug(f"Temporal block already attached: {label}")
1707 attached_labels.append(label)
1708 continue
1709
1710 # Check if block exists globally
1711 blocks = client.blocks.list(label=label)
1712
1713 if blocks and len(blocks) > 0:
1714 block = blocks[0]
1715 # Check if already attached by ID
1716 if str(block.id) in current_block_ids:
1717 logger.debug(f"Temporal block already attached by ID: {label}")
1718 attached_labels.append(label)
1719 continue
1720 else:
1721 # Create new temporal block with appropriate header
1722 if "day" in label:
1723 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}"
1724 initial_content = f"{header}\n\nNo entries yet for today."
1725 elif "month" in label:
1726 header = f"# Monthly Journal - {today.strftime('%B %Y')}"
1727 initial_content = f"{header}\n\nNo entries yet for this month."
1728 else: # year
1729 header = f"# Yearly Journal - {today.year}"
1730 initial_content = f"{header}\n\nNo entries yet for this year."
1731
1732 block = client.blocks.create(
1733 label=label,
1734 value=initial_content,
1735 limit=10000 # Larger limit for journal blocks
1736 )
1737 logger.info(f"Created new temporal block: {label}")
1738
1739 # Attach the block
1740 client.agents.blocks.attach(
1741 agent_id=agent_id,
1742 block_id=str(block.id)
1743 )
1744 attached_labels.append(label)
1745 logger.info(f"Attached temporal block: {label}")
1746
1747 except Exception as e:
1748 # Check for duplicate constraint errors
1749 error_str = str(e)
1750 if "duplicate key value violates unique constraint" in error_str:
1751 logger.debug(f"Temporal block already attached (constraint): {label}")
1752 attached_labels.append(label)
1753 else:
1754 logger.warning(f"Failed to attach temporal block {label}: {e}")
1755
1756 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}")
1757 return True, attached_labels
1758
1759 except Exception as e:
1760 logger.error(f"Error attaching temporal blocks: {e}")
1761 return False, []
1762
1763
1764def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: list = None, agent_name: str = "void") -> bool:
1765 """
1766 Detach temporal journal blocks from the agent after synthesis.
1767
1768 Args:
1769 client: Letta client
1770 agent_id: Agent ID
1771 labels_to_detach: Optional list of specific labels to detach.
1772 If None, detaches all temporal blocks for this agent.
1773 agent_name: Agent name for prefixing block labels (prevents collision across agents)
1774
1775 Returns:
1776 bool: Success status
1777 """
1778 try:
1779 # If no specific labels provided, generate today's labels
1780 if labels_to_detach is None:
1781 today = date.today()
1782 labels_to_detach = [
1783 f"{agent_name}_day_{today.strftime('%Y_%m_%d')}",
1784 f"{agent_name}_month_{today.strftime('%Y_%m')}",
1785 f"{agent_name}_year_{today.year}"
1786 ]
1787
1788 # Get current blocks and build label to ID mapping
1789 current_blocks = client.agents.blocks.list(agent_id=agent_id)
1790 block_label_to_id = {block.label: str(block.id) for block in current_blocks}
1791
1792 detached_count = 0
1793 for label in labels_to_detach:
1794 if label in block_label_to_id:
1795 try:
1796 client.agents.blocks.detach(
1797 agent_id=agent_id,
1798 block_id=block_label_to_id[label]
1799 )
1800 detached_count += 1
1801 logger.debug(f"Detached temporal block: {label}")
1802 except Exception as e:
1803 logger.warning(f"Failed to detach temporal block {label}: {e}")
1804 else:
1805 logger.debug(f"Temporal block not attached: {label}")
1806
1807 logger.info(f"Detached {detached_count} temporal blocks")
1808 return True
1809
1810 except Exception as e:
1811 logger.error(f"Error detaching temporal blocks: {e}")
1812 return False
1813
1814
1815def main():
1816 # Parse command line arguments
1817 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent')
1818 parser.add_argument('--config', type=str, default='configs/config.yaml', help='Path to config file (default: configs/config.yaml)')
1819 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)')
1820 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state')
1821 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)')
1822 # --rich option removed as we now use simple text formatting
1823 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO')
1824 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)')
1825 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)')
1826 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)')
1827 parser.add_argument('--debug', action='store_true', help='Enable debug logging')
1828 args = parser.parse_args()
1829
1830 # Initialize configuration with custom path
1831 global letta_config, CLIENT, QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR, PROCESSED_NOTIFICATIONS_FILE, NOTIFICATION_DB
1832 get_config(args.config) # Initialize the global config instance
1833 letta_config = get_letta_config()
1834
1835 # Initialize queue paths from config
1836 queue_config = get_queue_config()
1837 QUEUE_DIR = Path(queue_config['base_dir'])
1838 QUEUE_ERROR_DIR = Path(queue_config['error_dir'])
1839 QUEUE_NO_REPLY_DIR = Path(queue_config['no_reply_dir'])
1840 PROCESSED_NOTIFICATIONS_FILE = Path(queue_config['processed_file'])
1841
1842 # Get bot name for logging
1843 bot_name = queue_config['bot_name']
1844
1845 # Create queue directories
1846 QUEUE_DIR.mkdir(exist_ok=True)
1847 QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True)
1848 QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True)
1849
1850 # Create Letta client with configuration
1851 CLIENT_PARAMS = {
1852 'token': letta_config['api_key'],
1853 'timeout': letta_config['timeout']
1854 }
1855 if letta_config.get('base_url'):
1856 CLIENT_PARAMS['base_url'] = letta_config['base_url']
1857 CLIENT = Letta(**CLIENT_PARAMS)
1858
1859 # Configure logging based on command line arguments
1860 if args.simple_logs:
1861 log_format = f"{bot_name} - %(levelname)s - %(message)s"
1862 else:
1863 # Create custom formatter with symbols
1864 class SymbolFormatter(logging.Formatter):
1865 """Custom formatter that adds symbols for different log levels"""
1866
1867 SYMBOLS = {
1868 logging.DEBUG: '◇',
1869 logging.INFO: '✓',
1870 logging.WARNING: '⚠',
1871 logging.ERROR: '✗',
1872 logging.CRITICAL: '‼'
1873 }
1874
1875 def __init__(self, bot_name):
1876 super().__init__()
1877 self.bot_name = bot_name
1878
1879 def format(self, record):
1880 # Get the symbol for this log level
1881 symbol = self.SYMBOLS.get(record.levelno, '•')
1882
1883 # Format time as HH:MM:SS
1884 timestamp = self.formatTime(record, "%H:%M:%S")
1885
1886 # Build the formatted message
1887 level_name = f"{record.levelname:<5}" # Left-align, 5 chars
1888
1889 # Use vertical bar as separator
1890 parts = [symbol, timestamp, '│', self.bot_name, '│', level_name, '│', record.getMessage()]
1891
1892 return ' '.join(parts)
1893
1894 # Reset logging configuration
1895 for handler in logging.root.handlers[:]:
1896 logging.root.removeHandler(handler)
1897
1898 # Create handler with custom formatter
1899 handler = logging.StreamHandler()
1900 if not args.simple_logs:
1901 handler.setFormatter(SymbolFormatter(bot_name))
1902 else:
1903 handler.setFormatter(logging.Formatter(log_format))
1904
1905 # Configure root logger
1906 logging.root.setLevel(logging.INFO)
1907 logging.root.addHandler(handler)
1908
1909 global logger, prompt_logger, console
1910 logger = logging.getLogger("void_bot")
1911 logger.setLevel(logging.DEBUG if args.debug else logging.INFO)
1912
1913 # Create a separate logger for prompts (set to WARNING to hide by default)
1914 prompt_logger = logging.getLogger("void_bot.prompts")
1915 if args.reasoning:
1916 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used
1917 else:
1918 prompt_logger.setLevel(logging.WARNING) # Hide by default
1919
1920 # Disable httpx logging completely
1921 logging.getLogger("httpx").setLevel(logging.CRITICAL)
1922
1923 # Create Rich console for pretty printing
1924 # Console no longer used - simple text formatting
1925
1926 global TESTING_MODE, SKIP_GIT, SHOW_REASONING
1927 TESTING_MODE = args.test
1928
1929 # Store no-git flag globally for use in export_agent_state calls
1930 SKIP_GIT = args.no_git
1931
1932 # Store rich flag globally
1933 # Rich formatting no longer used
1934
1935 # Store reasoning flag globally
1936 SHOW_REASONING = args.reasoning
1937
1938 if TESTING_MODE:
1939 logger.info("=== RUNNING IN TESTING MODE ===")
1940 logger.info(" - No messages will be sent to Bluesky")
1941 logger.info(" - Queue files will not be deleted")
1942 logger.info(" - Notifications will not be marked as seen")
1943 print("\n")
1944
1945 # Check for synthesis-only mode
1946 SYNTHESIS_ONLY = args.synthesis_only
1947 if SYNTHESIS_ONLY:
1948 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===")
1949 logger.info(" - Only synthesis messages will be sent")
1950 logger.info(" - No notification processing")
1951 logger.info(" - No Bluesky client needed")
1952 print("\n")
1953 """Main bot loop that continuously monitors for notifications."""
1954 global start_time
1955 start_time = time.time()
1956 logger.info("=== STARTING VOID BOT ===")
1957 void_agent = initialize_void()
1958 logger.info(f"Void agent initialized: {void_agent.id}")
1959
1960 # Initialize notification database with config-based path
1961 logger.info("Initializing notification database...")
1962 NOTIFICATION_DB = NotificationDB(db_path=queue_config['db_path'])
1963
1964 # Migrate from old JSON format if it exists
1965 if PROCESSED_NOTIFICATIONS_FILE.exists():
1966 logger.info("Found old processed_notifications.json, migrating to database...")
1967 NOTIFICATION_DB.migrate_from_json(str(PROCESSED_NOTIFICATIONS_FILE))
1968
1969 # Log database stats
1970 db_stats = NOTIFICATION_DB.get_stats()
1971 logger.info(f"Database initialized - Total notifications: {db_stats.get('total', 0)}, Recent (24h): {db_stats.get('recent_24h', 0)}")
1972
1973 # Clean up old records
1974 NOTIFICATION_DB.cleanup_old_records(days=7)
1975
1976 # Ensure correct tools are attached for Bluesky
1977 logger.info("Configuring tools for Bluesky platform...")
1978 try:
1979 from tool_manager import ensure_platform_tools
1980 ensure_platform_tools('bluesky', void_agent.id)
1981 except Exception as e:
1982 logger.error(f"Failed to configure platform tools: {e}")
1983 logger.warning("Continuing with existing tool configuration")
1984
1985 # Check if agent has required tools
1986 if hasattr(void_agent, 'tools') and void_agent.tools:
1987 tool_names = [tool.name for tool in void_agent.tools]
1988 # Check for bluesky-related tools
1989 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()]
1990 if not bluesky_tools:
1991 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.")
1992 else:
1993 logger.warning("Agent has no tools registered!")
1994
1995 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts)
1996 if not SYNTHESIS_ONLY:
1997 atproto_client = bsky_utils.default_login()
1998 logger.info("Connected to Bluesky")
1999 else:
2000 # In synthesis-only mode, still connect for acks and posts (unless in test mode)
2001 if not args.test:
2002 atproto_client = bsky_utils.default_login()
2003 logger.info("Connected to Bluesky (for synthesis acks/posts)")
2004 else:
2005 atproto_client = None
2006 logger.info("Skipping Bluesky connection (test mode)")
2007
2008 # Configure intervals
2009 CLEANUP_INTERVAL = args.cleanup_interval
2010 SYNTHESIS_INTERVAL = args.synthesis_interval
2011
2012 # Synthesis-only mode
2013 if SYNTHESIS_ONLY:
2014 if SYNTHESIS_INTERVAL <= 0:
2015 logger.error("Synthesis-only mode requires --synthesis-interval > 0")
2016 return
2017
2018 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)")
2019
2020 while True:
2021 try:
2022 # Send synthesis message immediately on first run
2023 logger.info("🧠 Sending synthesis message")
2024 send_synthesis_message(CLIENT, void_agent.id, void_agent.name, atproto_client)
2025
2026 # Wait for next interval
2027 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...")
2028 sleep(SYNTHESIS_INTERVAL)
2029
2030 except KeyboardInterrupt:
2031 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===")
2032 break
2033 except Exception as e:
2034 logger.error(f"Error in synthesis loop: {e}")
2035 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...")
2036 sleep(SYNTHESIS_INTERVAL)
2037
2038 # Normal mode with notification processing
2039 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds")
2040
2041 cycle_count = 0
2042
2043 if CLEANUP_INTERVAL > 0:
2044 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles")
2045 else:
2046 logger.info("User block cleanup disabled")
2047
2048 if SYNTHESIS_INTERVAL > 0:
2049 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)")
2050 else:
2051 logger.info("Synthesis messages disabled")
2052
2053 while True:
2054 try:
2055 cycle_count += 1
2056 process_notifications(void_agent, atproto_client, TESTING_MODE)
2057
2058 # Check if synthesis interval has passed
2059 if SYNTHESIS_INTERVAL > 0:
2060 current_time = time.time()
2061 global last_synthesis_time
2062 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL:
2063 logger.info(f"⏰ {SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis")
2064 send_synthesis_message(CLIENT, void_agent.id, void_agent.name, atproto_client)
2065 last_synthesis_time = current_time
2066
2067 # Run periodic cleanup every N cycles
2068 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0:
2069 # Check if autofollow is enabled and sync followers
2070 from config_loader import get_bluesky_config
2071 bluesky_config = get_bluesky_config()
2072 if bluesky_config.get('autofollow', False):
2073 logger.info("🔄 Syncing followers (autofollow enabled)")
2074 try:
2075 sync_result = bsky_utils.sync_followers(atproto_client, dry_run=False)
2076 if 'error' in sync_result:
2077 logger.error(f"Autofollow failed: {sync_result['error']}")
2078 else:
2079 if sync_result['newly_followed']:
2080 logger.info(f"✓ Followed {len(sync_result['newly_followed'])} new users: {', '.join(sync_result['newly_followed'])}")
2081 else:
2082 logger.debug(f"No new followers to follow back ({sync_result['followers_count']} followers, {sync_result['following_count']} following)")
2083 if sync_result.get('errors'):
2084 logger.warning(f"Some follows failed: {len(sync_result['errors'])} errors")
2085 except Exception as e:
2086 logger.error(f"Error during autofollow sync: {e}")
2087
2088 # Also check database health when doing cleanup
2089 if NOTIFICATION_DB:
2090 db_stats = NOTIFICATION_DB.get_stats()
2091 pending = db_stats.get('status_pending', 0)
2092 errors = db_stats.get('status_error', 0)
2093
2094 if pending > 50:
2095 logger.warning(f"⚠️ Queue health check: {pending} pending notifications (may be stuck)")
2096 if errors > 20:
2097 logger.warning(f"⚠️ Queue health check: {errors} error notifications")
2098
2099 # Periodic cleanup of old records
2100 if cycle_count % (CLEANUP_INTERVAL * 10) == 0: # Every 100 cycles
2101 logger.info("Running database cleanup of old records...")
2102 NOTIFICATION_DB.cleanup_old_records(days=7)
2103
2104 # Log cycle completion with stats
2105 elapsed_time = time.time() - start_time
2106 total_messages = sum(message_counters.values())
2107 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
2108
2109 if total_messages > 0:
2110 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min")
2111 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
2112
2113 except KeyboardInterrupt:
2114 # Final stats
2115 elapsed_time = time.time() - start_time
2116 total_messages = sum(message_counters.values())
2117 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
2118
2119 logger.info("=== BOT STOPPED BY USER ===")
2120 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes")
2121 logger.info(f" - {message_counters['mentions']} mentions")
2122 logger.info(f" - {message_counters['replies']} replies")
2123 logger.info(f" - {message_counters['follows']} follows")
2124 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped")
2125 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute")
2126
2127 # Close database connection
2128 if NOTIFICATION_DB:
2129 logger.info("Closing database connection...")
2130 NOTIFICATION_DB.close()
2131
2132 break
2133 except Exception as e:
2134 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")
2135 logger.error(f"Error details: {e}")
2136 # Wait a bit longer on errors
2137 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...")
2138 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
2139
2140
2141if __name__ == "__main__":
2142 main()