a digital person for bluesky
1# Rich imports removed - using simple text formatting
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
5import os
6import logging
7import json
8import hashlib
9import subprocess
10from pathlib import Path
11from datetime import datetime
12from collections import defaultdict
13import time
14import argparse
15
16from utils import (
17 upsert_block,
18 upsert_agent
19)
20
21import bsky_utils
22from tools.blocks import attach_user_blocks, detach_user_blocks
23from datetime import date
24from notification_db import NotificationDB
25
26def extract_handles_from_data(data):
27 """Recursively extract all unique handles from nested data structure."""
28 handles = set()
29
30 def _extract_recursive(obj):
31 if isinstance(obj, dict):
32 # Check if this dict has a 'handle' key
33 if 'handle' in obj:
34 handles.add(obj['handle'])
35 # Recursively check all values
36 for value in obj.values():
37 _extract_recursive(value)
38 elif isinstance(obj, list):
39 # Recursively check all list items
40 for item in obj:
41 _extract_recursive(item)
42
43 _extract_recursive(data)
44 return list(handles)
45
46# Logging will be configured after argument parsing
47logger = None
48prompt_logger = None
49# Simple text formatting (Rich no longer used)
50SHOW_REASONING = False
51last_archival_query = "archival memory search"
52
53def log_with_panel(message, title=None, border_color="white"):
54 """Log a message with Unicode box-drawing characters"""
55 if title:
56 # Map old color names to appropriate symbols
57 symbol_map = {
58 "blue": "⚙", # Tool calls
59 "green": "✓", # Success/completion
60 "yellow": "◆", # Reasoning
61 "red": "✗", # Errors
62 "white": "▶", # Default/mentions
63 "cyan": "✎", # Posts
64 }
65 symbol = symbol_map.get(border_color, "▶")
66
67 print(f"\n{symbol} {title}")
68 print(f" {'─' * len(title)}")
69 # Indent message lines
70 for line in message.split('\n'):
71 print(f" {line}")
72 else:
73 print(message)
74
75
76# Create a client with extended timeout for LLM operations
77CLIENT= Letta(
78 token=os.environ["LETTA_API_KEY"],
79 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout
80)
81
82# Use the "Bluesky" project
83PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8"
84
85# Notification check delay
86FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response
87
88# Check for new notifications every N queue items
89CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing
90
91# Queue directory
92QUEUE_DIR = Path("queue")
93QUEUE_DIR.mkdir(exist_ok=True)
94QUEUE_ERROR_DIR = Path("queue/errors")
95QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True)
96QUEUE_NO_REPLY_DIR = Path("queue/no_reply")
97QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True)
98PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json")
99
100# Maximum number of processed notifications to track
101MAX_PROCESSED_NOTIFICATIONS = 10000
102
103# Message tracking counters
104message_counters = defaultdict(int)
105start_time = time.time()
106
107# Testing mode flag
108TESTING_MODE = False
109
110# Skip git operations flag
111SKIP_GIT = False
112
113# Synthesis message tracking
114last_synthesis_time = time.time()
115
116# Database for notification tracking
117NOTIFICATION_DB = None
118
119def export_agent_state(client, agent, skip_git=False):
120 """Export agent state to agent_archive/ (timestamped) and agents/ (current)."""
121 try:
122 # Confirm export with user unless git is being skipped
123 if not skip_git:
124 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip()
125 if response not in ['y', 'yes']:
126 logger.info("Agent export cancelled by user.")
127 return
128 else:
129 logger.info("Exporting agent state (git staging disabled)")
130
131 # Create directories if they don't exist
132 os.makedirs("agent_archive", exist_ok=True)
133 os.makedirs("agents", exist_ok=True)
134
135 # Export agent data
136 logger.info(f"Exporting agent {agent.id}. This takes some time...")
137 agent_data = client.agents.export_file(agent_id=agent.id)
138
139 # Save timestamped archive copy
140 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
141 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af")
142 with open(archive_file, 'w', encoding='utf-8') as f:
143 json.dump(agent_data, f, indent=2, ensure_ascii=False)
144
145 # Save current agent state
146 current_file = os.path.join("agents", "void.af")
147 with open(current_file, 'w', encoding='utf-8') as f:
148 json.dump(agent_data, f, indent=2, ensure_ascii=False)
149
150 logger.info(f"Agent exported to {archive_file} and {current_file}")
151
152 # Git add only the current agent file (archive is ignored) unless skip_git is True
153 if not skip_git:
154 try:
155 subprocess.run(["git", "add", current_file], check=True, capture_output=True)
156 logger.info("Added current agent file to git staging")
157 except subprocess.CalledProcessError as e:
158 logger.warning(f"Failed to git add agent file: {e}")
159
160 except Exception as e:
161 logger.error(f"Failed to export agent: {e}")
162
163def initialize_void():
164 logger.info("Starting void agent initialization...")
165
166 # Get the configured void agent by ID
167 logger.info("Loading void agent from config...")
168 from config_loader import get_letta_config
169 letta_config = get_letta_config()
170 agent_id = letta_config['agent_id']
171
172 try:
173 void_agent = CLIENT.agents.retrieve(agent_id=agent_id)
174 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})")
175 except Exception as e:
176 logger.error(f"Failed to load void agent {agent_id}: {e}")
177 logger.error("Please ensure the agent_id in config.yaml is correct")
178 raise e
179
180 # Export agent state
181 logger.info("Exporting agent state...")
182 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
183
184 # Log agent details
185 logger.info(f"Void agent details - ID: {void_agent.id}")
186 logger.info(f"Agent name: {void_agent.name}")
187 if hasattr(void_agent, 'llm_config'):
188 logger.info(f"Agent model: {void_agent.llm_config.model}")
189 logger.info(f"Agent project_id: {void_agent.project_id}")
190 if hasattr(void_agent, 'tools'):
191 logger.info(f"Agent has {len(void_agent.tools)} tools")
192 for tool in void_agent.tools[:3]: # Show first 3 tools
193 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})")
194
195 return void_agent
196
197
198def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False):
199 """Process a mention and generate a reply using the Letta agent.
200
201 Args:
202 void_agent: The Letta agent instance
203 atproto_client: The AT Protocol client
204 notification_data: The notification data dictionary
205 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
206
207 Returns:
208 True: Successfully processed, remove from queue
209 False: Failed but retryable, keep in queue
210 None: Failed with non-retryable error, move to errors directory
211 "no_reply": No reply was generated, move to no_reply directory
212 """
213 import uuid
214
215 # Generate correlation ID for tracking this notification through the pipeline
216 correlation_id = str(uuid.uuid4())[:8]
217
218 try:
219 logger.info(f"[{correlation_id}] Starting process_mention", extra={
220 'correlation_id': correlation_id,
221 'notification_type': type(notification_data).__name__
222 })
223
224 # Handle both dict and object inputs for backwards compatibility
225 if isinstance(notification_data, dict):
226 uri = notification_data['uri']
227 mention_text = notification_data.get('record', {}).get('text', '')
228 author_handle = notification_data['author']['handle']
229 author_name = notification_data['author'].get('display_name') or author_handle
230 else:
231 # Legacy object access
232 uri = notification_data.uri
233 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
234 author_handle = notification_data.author.handle
235 author_name = notification_data.author.display_name or author_handle
236
237 logger.info(f"[{correlation_id}] Processing mention from @{author_handle}", extra={
238 'correlation_id': correlation_id,
239 'author_handle': author_handle,
240 'author_name': author_name,
241 'mention_uri': uri,
242 'mention_text_length': len(mention_text),
243 'mention_preview': mention_text[:100] if mention_text else ''
244 })
245
246 # Retrieve the entire thread associated with the mention
247 try:
248 thread = atproto_client.app.bsky.feed.get_post_thread({
249 'uri': uri,
250 'parent_height': 40,
251 'depth': 10
252 })
253 except Exception as e:
254 error_str = str(e)
255 # Check if this is a NotFound error
256 if 'NotFound' in error_str or 'Post not found' in error_str:
257 logger.warning(f"Post not found for URI {uri}, removing from queue")
258 return True # Return True to remove from queue
259 else:
260 # Re-raise other errors
261 logger.error(f"Error fetching thread: {e}")
262 raise
263
264 # Get thread context as YAML string
265 logger.debug("Converting thread to YAML string")
266 try:
267 thread_context = thread_to_yaml_string(thread)
268 logger.debug(f"Thread context generated, length: {len(thread_context)} characters")
269
270 # Check if #voidstop appears anywhere in the thread
271 if "#voidstop" in thread_context.lower():
272 logger.info("Found #voidstop in thread context, skipping this mention")
273 return True # Return True to remove from queue
274
275 # Also check the mention text directly
276 if "#voidstop" in mention_text.lower():
277 logger.info("Found #voidstop in mention text, skipping this mention")
278 return True # Return True to remove from queue
279
280 # Create a more informative preview by extracting meaningful content
281 lines = thread_context.split('\n')
282 meaningful_lines = []
283
284 for line in lines:
285 stripped = line.strip()
286 if not stripped:
287 continue
288
289 # Look for lines with actual content (not just structure)
290 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']):
291 meaningful_lines.append(line)
292 if len(meaningful_lines) >= 5:
293 break
294
295 if meaningful_lines:
296 preview = '\n'.join(meaningful_lines)
297 logger.debug(f"Thread content preview:\n{preview}")
298 else:
299 # If no content fields found, just show it's a thread structure
300 logger.debug(f"Thread structure generated ({len(thread_context)} chars)")
301 except Exception as yaml_error:
302 import traceback
303 logger.error(f"Error converting thread to YAML: {yaml_error}")
304 logger.error(f"Full traceback:\n{traceback.format_exc()}")
305 logger.error(f"Thread type: {type(thread)}")
306 if hasattr(thread, '__dict__'):
307 logger.error(f"Thread attributes: {thread.__dict__}")
308 # Try to continue with a simple context
309 thread_context = f"Error processing thread context: {str(yaml_error)}"
310
311 # Create a prompt for the Letta agent with thread context
312 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
313
314MOST RECENT POST (the mention you're responding to):
315"{mention_text}"
316
317FULL THREAD CONTEXT:
318```yaml
319{thread_context}
320```
321
322The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
323
324To reply, use the add_post_to_bluesky_reply_thread tool:
325- Each call creates one post (max 300 characters)
326- For most responses, a single call is sufficient
327- Only use multiple calls for threaded replies when:
328 * The topic requires extended explanation that cannot fit in 300 characters
329 * You're explicitly asked for a detailed/long response
330 * The conversation naturally benefits from a structured multi-part answer
331- Avoid unnecessary threads - be concise when possible"""
332
333 # Extract all handles from notification and thread data
334 all_handles = set()
335 all_handles.update(extract_handles_from_data(notification_data))
336 all_handles.update(extract_handles_from_data(thread.model_dump()))
337 unique_handles = list(all_handles)
338
339 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}")
340
341 # Check if any handles are in known_bots list
342 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs
343 import json
344
345 try:
346 # Check for known bots in thread
347 bot_check_result = check_known_bots(unique_handles, void_agent)
348 bot_check_data = json.loads(bot_check_result)
349
350 # TEMPORARILY DISABLED: Bot detection causing issues with normal users
351 # TODO: Re-enable after debugging why normal users are being flagged as bots
352 if False: # bot_check_data.get("bot_detected", False):
353 detected_bots = bot_check_data.get("detected_bots", [])
354 logger.info(f"Bot detected in thread: {detected_bots}")
355
356 # Decide whether to respond (10% chance)
357 if not should_respond_to_bot_thread():
358 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}")
359 # Return False to keep in queue for potential later processing
360 return False
361 else:
362 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}")
363 else:
364 logger.debug("Bot detection disabled - processing all notifications")
365
366 except Exception as bot_check_error:
367 logger.warning(f"Error checking for bots: {bot_check_error}")
368 # Continue processing if bot check fails
369
370 # Attach user blocks before agent call
371 attached_handles = []
372 if unique_handles:
373 try:
374 logger.debug(f"Attaching user blocks for handles: {unique_handles}")
375 attach_result = attach_user_blocks(unique_handles, void_agent)
376 attached_handles = unique_handles # Track successfully attached handles
377 logger.debug(f"Attach result: {attach_result}")
378 except Exception as attach_error:
379 logger.warning(f"Failed to attach user blocks: {attach_error}")
380 # Continue without user blocks rather than failing completely
381
382 # Get response from Letta agent
383 # Format with Unicode characters
384 title = f"MENTION FROM @{author_handle}"
385 print(f"\n▶ {title}")
386 print(f" {'═' * len(title)}")
387 # Indent the mention text
388 for line in mention_text.split('\n'):
389 print(f" {line}")
390
391 # Log prompt details to separate logger
392 prompt_logger.debug(f"Full prompt being sent:\n{prompt}")
393
394 # Log concise prompt info to main logger
395 thread_handles_count = len(unique_handles)
396 prompt_char_count = len(prompt)
397 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars")
398
399 try:
400 # Use streaming to avoid 524 timeout errors
401 message_stream = CLIENT.agents.messages.create_stream(
402 agent_id=void_agent.id,
403 messages=[{"role": "user", "content": prompt}],
404 stream_tokens=False, # Step streaming only (faster than token streaming)
405 max_steps=100
406 )
407
408 # Collect the streaming response
409 all_messages = []
410 for chunk in message_stream:
411 # Log condensed chunk info
412 if hasattr(chunk, 'message_type'):
413 if chunk.message_type == 'reasoning_message':
414 # Show full reasoning without truncation
415 if SHOW_REASONING:
416 # Format with Unicode characters
417 print("\n◆ Reasoning")
418 print(" ─────────")
419 # Indent reasoning lines
420 for line in chunk.reasoning.split('\n'):
421 print(f" {line}")
422 else:
423 # Default log format (only when --reasoning is used due to log level)
424 # Format with Unicode characters
425 print("\n◆ Reasoning")
426 print(" ─────────")
427 # Indent reasoning lines
428 for line in chunk.reasoning.split('\n'):
429 print(f" {line}")
430
431 # Create ATProto record for reasoning (unless in testing mode)
432 if not testing_mode and hasattr(chunk, 'reasoning'):
433 try:
434 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning)
435 except Exception as e:
436 logger.debug(f"Failed to create reasoning record: {e}")
437 elif chunk.message_type == 'tool_call_message':
438 # Parse tool arguments for better display
439 tool_name = chunk.tool_call.name
440
441 # Create ATProto record for tool call (unless in testing mode)
442 if not testing_mode:
443 try:
444 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None
445 bsky_utils.create_tool_call_record(
446 atproto_client,
447 tool_name,
448 chunk.tool_call.arguments,
449 tool_call_id
450 )
451 except Exception as e:
452 logger.debug(f"Failed to create tool call record: {e}")
453
454 try:
455 args = json.loads(chunk.tool_call.arguments)
456 # Format based on tool type
457 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']:
458 # Extract the text being posted
459 text = args.get('text', '')
460 if text:
461 # Format with Unicode characters
462 print("\n✎ Bluesky Post")
463 print(" ────────────")
464 # Indent post text
465 for line in text.split('\n'):
466 print(f" {line}")
467 else:
468 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
469 elif tool_name == 'archival_memory_search':
470 query = args.get('query', 'unknown')
471 global last_archival_query
472 last_archival_query = query
473 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue")
474 elif tool_name == 'archival_memory_insert':
475 content = args.get('content', '')
476 # Show the full content being inserted
477 log_with_panel(content, f"Tool call: {tool_name}", "blue")
478 elif tool_name == 'update_block':
479 label = args.get('label', 'unknown')
480 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', ''))
481 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue")
482 else:
483 # Generic display for other tools
484 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
485 if len(args_str) > 150:
486 args_str = args_str[:150] + "..."
487 log_with_panel(args_str, f"Tool call: {tool_name}", "blue")
488 except:
489 # Fallback to original format if parsing fails
490 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
491 elif chunk.message_type == 'tool_return_message':
492 # Enhanced tool result logging
493 tool_name = chunk.name
494 status = chunk.status
495
496 if status == 'success':
497 # Try to show meaningful result info based on tool type
498 if hasattr(chunk, 'tool_return') and chunk.tool_return:
499 result_str = str(chunk.tool_return)
500 if tool_name == 'archival_memory_search':
501
502 try:
503 # Handle both string and list formats
504 if isinstance(chunk.tool_return, str):
505 # The string format is: "([{...}, {...}], count)"
506 # We need to extract just the list part
507 if chunk.tool_return.strip():
508 # Find the list part between the first [ and last ]
509 start_idx = chunk.tool_return.find('[')
510 end_idx = chunk.tool_return.rfind(']')
511 if start_idx != -1 and end_idx != -1:
512 list_str = chunk.tool_return[start_idx:end_idx+1]
513 # Use ast.literal_eval since this is Python literal syntax, not JSON
514 import ast
515 results = ast.literal_eval(list_str)
516 else:
517 logger.warning("Could not find list in archival_memory_search result")
518 results = []
519 else:
520 logger.warning("Empty string returned from archival_memory_search")
521 results = []
522 else:
523 # If it's already a list, use directly
524 results = chunk.tool_return
525
526 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name} ✓", "green")
527
528 # Use the captured search query from the tool call
529 search_query = last_archival_query
530
531 # Combine all results into a single text block
532 content_text = ""
533 for i, entry in enumerate(results, 1):
534 timestamp = entry.get('timestamp', 'N/A')
535 content = entry.get('content', '')
536 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n"
537
538 # Format with Unicode characters
539 title = f"{search_query} ({len(results)} results)"
540 print(f"\n⚙ {title}")
541 print(f" {'─' * len(title)}")
542 # Indent content text
543 for line in content_text.strip().split('\n'):
544 print(f" {line}")
545
546 except Exception as e:
547 logger.error(f"Error formatting archival memory results: {e}")
548 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name} ✓", "green")
549 elif tool_name == 'add_post_to_bluesky_reply_thread':
550 # Just show success for bluesky posts, the text was already shown in tool call
551 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green")
552 elif tool_name == 'archival_memory_insert':
553 # Skip archival memory insert results (always returns None)
554 pass
555 elif tool_name == 'update_block':
556 log_with_panel("Memory block updated", f"Tool result: {tool_name} ✓", "green")
557 else:
558 # Generic success with preview
559 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str
560 log_with_panel(preview, f"Tool result: {tool_name} ✓", "green")
561 else:
562 log_with_panel("Success", f"Tool result: {tool_name} ✓", "green")
563 elif status == 'error':
564 # Show error details
565 if tool_name == 'add_post_to_bluesky_reply_thread':
566 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred"
567 log_with_panel(error_str, f"Bluesky Post ✗", "red")
568 elif tool_name == 'archival_memory_insert':
569 # Skip archival memory insert errors too
570 pass
571 else:
572 error_preview = ""
573 if hasattr(chunk, 'tool_return') and chunk.tool_return:
574 error_str = str(chunk.tool_return)
575 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str
576 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name} ✗", "red")
577 else:
578 log_with_panel("Error occurred", f"Tool result: {tool_name} ✗", "red")
579 else:
580 logger.info(f"Tool result: {tool_name} - {status}")
581 elif chunk.message_type == 'assistant_message':
582 # Format with Unicode characters
583 print("\n▶ Assistant Response")
584 print(" ──────────────────")
585 # Indent response text
586 for line in chunk.content.split('\n'):
587 print(f" {line}")
588 else:
589 # Filter out verbose message types
590 if chunk.message_type not in ['usage_statistics', 'stop_reason']:
591 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...")
592 else:
593 logger.info(f"📦 Stream status: {chunk}")
594
595 # Log full chunk for debugging
596 logger.debug(f"Full streaming chunk: {chunk}")
597 all_messages.append(chunk)
598 if str(chunk) == 'done':
599 break
600
601 # Convert streaming response to standard format for compatibility
602 message_response = type('StreamingResponse', (), {
603 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
604 })()
605 except Exception as api_error:
606 import traceback
607 error_str = str(api_error)
608 logger.error(f"Letta API error: {api_error}")
609 logger.error(f"Error type: {type(api_error).__name__}")
610 logger.error(f"Full traceback:\n{traceback.format_exc()}")
611 logger.error(f"Mention text was: {mention_text}")
612 logger.error(f"Author: @{author_handle}")
613 logger.error(f"URI: {uri}")
614
615
616 # Try to extract more info from different error types
617 if hasattr(api_error, 'response'):
618 logger.error(f"Error response object exists")
619 if hasattr(api_error.response, 'text'):
620 logger.error(f"Response text: {api_error.response.text}")
621 if hasattr(api_error.response, 'json') and callable(api_error.response.json):
622 try:
623 logger.error(f"Response JSON: {api_error.response.json()}")
624 except:
625 pass
626
627 # Check for specific error types
628 if hasattr(api_error, 'status_code'):
629 logger.error(f"API Status code: {api_error.status_code}")
630 if hasattr(api_error, 'body'):
631 logger.error(f"API Response body: {api_error.body}")
632 if hasattr(api_error, 'headers'):
633 logger.error(f"API Response headers: {api_error.headers}")
634
635 if api_error.status_code == 413:
636 logger.error("413 Payload Too Large - moving to errors directory")
637 return None # Move to errors directory - payload is too large to ever succeed
638 elif api_error.status_code == 524:
639 logger.error("524 error - timeout from Cloudflare, will retry later")
640 return False # Keep in queue for retry
641
642 # Check if error indicates we should remove from queue
643 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str:
644 logger.warning("Payload too large error, moving to errors directory")
645 return None # Move to errors directory - cannot be fixed by retry
646 elif 'status_code: 524' in error_str:
647 logger.warning("524 timeout error, keeping in queue for retry")
648 return False # Keep in queue for retry
649
650 raise
651
652 # Log successful response
653 logger.debug("Successfully received response from Letta API")
654 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}")
655
656 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response
657 reply_candidates = []
658 tool_call_results = {} # Map tool_call_id to status
659 ack_note = None # Track any note from annotate_ack tool
660
661 logger.debug(f"Processing {len(message_response.messages)} response messages...")
662
663 # First pass: collect tool return statuses
664 ignored_notification = False
665 ignore_reason = ""
666 ignore_category = ""
667
668 for message in message_response.messages:
669 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
670 if message.name == 'add_post_to_bluesky_reply_thread':
671 tool_call_results[message.tool_call_id] = message.status
672 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}")
673 elif message.name == 'ignore_notification':
674 # Check if the tool was successful
675 if hasattr(message, 'tool_return') and message.status == 'success':
676 # Parse the return value to extract category and reason
677 result_str = str(message.tool_return)
678 if 'IGNORED_NOTIFICATION::' in result_str:
679 parts = result_str.split('::')
680 if len(parts) >= 3:
681 ignore_category = parts[1]
682 ignore_reason = parts[2]
683 ignored_notification = True
684 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}")
685 elif message.name == 'bluesky_reply':
686 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
687 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
688 logger.error("Update the agent's tools using register_tools.py")
689 # Export agent state before terminating
690 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
691 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
692 exit(1)
693
694 # Second pass: process messages and check for successful tool calls
695 for i, message in enumerate(message_response.messages, 1):
696 # Log concise message info instead of full object
697 msg_type = getattr(message, 'message_type', 'unknown')
698 if hasattr(message, 'reasoning') and message.reasoning:
699 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...")
700 elif hasattr(message, 'tool_call') and message.tool_call:
701 tool_name = message.tool_call.name
702 logger.debug(f" {i}. {msg_type}: {tool_name}")
703 elif hasattr(message, 'tool_return'):
704 tool_name = getattr(message, 'name', 'unknown_tool')
705 return_preview = str(message.tool_return)[:100] if message.tool_return else "None"
706 status = getattr(message, 'status', 'unknown')
707 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})")
708 elif hasattr(message, 'text'):
709 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...")
710 else:
711 logger.debug(f" {i}. {msg_type}: <no content>")
712
713 # Check for halt_activity tool call
714 if hasattr(message, 'tool_call') and message.tool_call:
715 if message.tool_call.name == 'halt_activity':
716 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT")
717 try:
718 args = json.loads(message.tool_call.arguments)
719 reason = args.get('reason', 'Agent requested halt')
720 logger.info(f"Halt reason: {reason}")
721 except:
722 logger.info("Halt reason: <unable to parse>")
723
724 # Delete the queue file before terminating
725 if queue_filepath and queue_filepath.exists():
726 queue_filepath.unlink()
727 logger.info(f"Deleted queue file: {queue_filepath.name}")
728
729 # Also mark as processed to avoid reprocessing
730 if NOTIFICATION_DB:
731 NOTIFICATION_DB.mark_processed(notification_data.get('uri', ''), status='processed')
732 else:
733 processed_uris = load_processed_notifications()
734 processed_uris.add(notification_data.get('uri', ''))
735 save_processed_notifications(processed_uris)
736
737 # Export agent state before terminating
738 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
739
740 # Exit the program
741 logger.info("=== BOT TERMINATED BY AGENT ===")
742 exit(0)
743
744 # Check for deprecated bluesky_reply tool
745 if hasattr(message, 'tool_call') and message.tool_call:
746 if message.tool_call.name == 'bluesky_reply':
747 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
748 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
749 logger.error("Update the agent's tools using register_tools.py")
750 # Export agent state before terminating
751 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
752 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
753 exit(1)
754
755 # Collect annotate_ack tool calls
756 elif message.tool_call.name == 'annotate_ack':
757 try:
758 args = json.loads(message.tool_call.arguments)
759 note = args.get('note', '')
760 if note:
761 ack_note = note
762 logger.debug(f"Found annotate_ack with note: {note[:50]}...")
763 except json.JSONDecodeError as e:
764 logger.error(f"Failed to parse annotate_ack arguments: {e}")
765
766 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful
767 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread':
768 tool_call_id = message.tool_call.tool_call_id
769 tool_status = tool_call_results.get(tool_call_id, 'unknown')
770
771 if tool_status == 'success':
772 try:
773 args = json.loads(message.tool_call.arguments)
774 reply_text = args.get('text', '')
775 reply_lang = args.get('lang', 'en-US')
776
777 if reply_text: # Only add if there's actual content
778 reply_candidates.append((reply_text, reply_lang))
779 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})")
780 except json.JSONDecodeError as e:
781 logger.error(f"Failed to parse tool call arguments: {e}")
782 elif tool_status == 'error':
783 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)")
784 else:
785 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}")
786
787 # Check for conflicting tool calls
788 if reply_candidates and ignored_notification:
789 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!")
790 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}")
791 logger.warning("Item will be left in queue for manual review")
792 # Return False to keep in queue
793 return False
794
795 if reply_candidates:
796 # Aggregate reply posts into a thread
797 reply_messages = []
798 reply_langs = []
799 for text, lang in reply_candidates:
800 reply_messages.append(text)
801 reply_langs.append(lang)
802
803 # Use the first language for the entire thread (could be enhanced later)
804 reply_lang = reply_langs[0] if reply_langs else 'en-US'
805
806 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread")
807
808 # Display the generated reply thread
809 if len(reply_messages) == 1:
810 content = reply_messages[0]
811 title = f"Reply to @{author_handle}"
812 else:
813 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)])
814 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)"
815
816 # Format with Unicode characters
817 print(f"\n✎ {title}")
818 print(f" {'─' * len(title)}")
819 # Indent content lines
820 for line in content.split('\n'):
821 print(f" {line}")
822
823 # Send the reply(s) with language (unless in testing mode)
824 if testing_mode:
825 logger.info("TESTING MODE: Skipping actual Bluesky post")
826 response = True # Simulate success
827 else:
828 if len(reply_messages) == 1:
829 # Single reply - use existing function
830 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0])
831 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})")
832 response = bsky_utils.reply_to_notification(
833 client=atproto_client,
834 notification=notification_data,
835 reply_text=cleaned_text,
836 lang=reply_lang,
837 correlation_id=correlation_id
838 )
839 else:
840 # Multiple replies - use new threaded function
841 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages]
842 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})")
843 response = bsky_utils.reply_with_thread_to_notification(
844 client=atproto_client,
845 notification=notification_data,
846 reply_messages=cleaned_messages,
847 lang=reply_lang,
848 correlation_id=correlation_id
849 )
850
851 if response:
852 logger.info(f"[{correlation_id}] Successfully replied to @{author_handle}", extra={
853 'correlation_id': correlation_id,
854 'author_handle': author_handle,
855 'reply_count': len(reply_messages)
856 })
857
858 # Acknowledge the post we're replying to with stream.thought.ack
859 try:
860 post_uri = notification_data.get('uri')
861 post_cid = notification_data.get('cid')
862
863 if post_uri and post_cid:
864 ack_result = bsky_utils.acknowledge_post(
865 client=atproto_client,
866 post_uri=post_uri,
867 post_cid=post_cid,
868 note=ack_note
869 )
870 if ack_result:
871 if ack_note:
872 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")")
873 else:
874 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack")
875 else:
876 logger.warning(f"Failed to acknowledge post from @{author_handle}")
877 else:
878 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}")
879 except Exception as e:
880 logger.error(f"Error acknowledging post from @{author_handle}: {e}")
881 # Don't fail the entire operation if acknowledgment fails
882
883 return True
884 else:
885 logger.error(f"Failed to send reply to @{author_handle}")
886 return False
887 else:
888 # Check if notification was explicitly ignored
889 if ignored_notification:
890 logger.info(f"[{correlation_id}] Notification from @{author_handle} was explicitly ignored (category: {ignore_category})", extra={
891 'correlation_id': correlation_id,
892 'author_handle': author_handle,
893 'ignore_category': ignore_category
894 })
895 return "ignored"
896 else:
897 logger.warning(f"[{correlation_id}] No reply generated for mention from @{author_handle}, moving to no_reply folder", extra={
898 'correlation_id': correlation_id,
899 'author_handle': author_handle
900 })
901 return "no_reply"
902
903 except Exception as e:
904 logger.error(f"[{correlation_id}] Error processing mention: {e}", extra={
905 'correlation_id': correlation_id,
906 'error': str(e),
907 'error_type': type(e).__name__,
908 'author_handle': author_handle if 'author_handle' in locals() else 'unknown'
909 })
910 return False
911 finally:
912 # Detach user blocks after agent response (success or failure)
913 if 'attached_handles' in locals() and attached_handles:
914 try:
915 logger.info(f"Detaching user blocks for handles: {attached_handles}")
916 detach_result = detach_user_blocks(attached_handles, void_agent)
917 logger.debug(f"Detach result: {detach_result}")
918 except Exception as detach_error:
919 logger.warning(f"Failed to detach user blocks: {detach_error}")
920
921
922def notification_to_dict(notification):
923 """Convert a notification object to a dictionary for JSON serialization."""
924 return {
925 'uri': notification.uri,
926 'cid': notification.cid,
927 'reason': notification.reason,
928 'is_read': notification.is_read,
929 'indexed_at': notification.indexed_at,
930 'author': {
931 'handle': notification.author.handle,
932 'display_name': notification.author.display_name,
933 'did': notification.author.did
934 },
935 'record': {
936 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
937 }
938 }
939
940
941def load_processed_notifications():
942 """Load the set of processed notification URIs from database."""
943 global NOTIFICATION_DB
944 if NOTIFICATION_DB:
945 return NOTIFICATION_DB.get_processed_uris(limit=MAX_PROCESSED_NOTIFICATIONS)
946 return set()
947
948
949def save_processed_notifications(processed_set):
950 """Save the set of processed notification URIs to database."""
951 # This is now handled by marking individual notifications in the DB
952 # Keeping function for compatibility but it doesn't need to do anything
953 pass
954
955
956def save_notification_to_queue(notification, is_priority=None):
957 """Save a notification to the queue directory with priority-based filename."""
958 try:
959 global NOTIFICATION_DB
960
961 # Handle both notification objects and dicts
962 if isinstance(notification, dict):
963 notif_dict = notification
964 notification_uri = notification.get('uri')
965 else:
966 notif_dict = notification_to_dict(notification)
967 notification_uri = notification.uri
968
969 # Check if already processed (using database if available)
970 if NOTIFICATION_DB:
971 if NOTIFICATION_DB.is_processed(notification_uri):
972 logger.debug(f"Notification already processed (DB): {notification_uri}")
973 return False
974 # Add to database
975 NOTIFICATION_DB.add_notification(notif_dict)
976 else:
977 # Fall back to old JSON method
978 processed_uris = load_processed_notifications()
979 if notification_uri in processed_uris:
980 logger.debug(f"Notification already processed: {notification_uri}")
981 return False
982
983 # Create JSON string
984 notif_json = json.dumps(notif_dict, sort_keys=True)
985
986 # Generate hash for filename (to avoid duplicates)
987 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
988
989 # Determine priority based on author handle or explicit priority
990 if is_priority is not None:
991 priority_prefix = "0_" if is_priority else "1_"
992 else:
993 if isinstance(notification, dict):
994 author_handle = notification.get('author', {}).get('handle', '')
995 else:
996 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else ''
997 # Prioritize cameron.pfiffer.org responses
998 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_"
999
1000 # Create filename with priority, timestamp and hash
1001 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
1002 reason = notif_dict.get('reason', 'unknown')
1003 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json"
1004 filepath = QUEUE_DIR / filename
1005
1006 # Check if this notification URI is already in the queue
1007 for existing_file in QUEUE_DIR.glob("*.json"):
1008 if existing_file.name == "processed_notifications.json":
1009 continue
1010 try:
1011 with open(existing_file, 'r') as f:
1012 existing_data = json.load(f)
1013 if existing_data.get('uri') == notification_uri:
1014 logger.debug(f"Notification already queued (URI: {notification_uri})")
1015 return False
1016 except:
1017 continue
1018
1019 # Write to file
1020 with open(filepath, 'w') as f:
1021 json.dump(notif_dict, f, indent=2)
1022
1023 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal"
1024 logger.info(f"Queued notification ({priority_label}): {filename}")
1025 return True
1026
1027 except Exception as e:
1028 logger.error(f"Error saving notification to queue: {e}")
1029 return False
1030
1031
1032def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False):
1033 """Load and process all notifications from the queue in priority order."""
1034 try:
1035 # Get all JSON files in queue directory (excluding processed_notifications.json)
1036 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix)
1037 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
1038
1039 # Filter out and delete like notifications immediately
1040 queue_files = []
1041 likes_deleted = 0
1042
1043 for filepath in all_queue_files:
1044 try:
1045 with open(filepath, 'r') as f:
1046 notif_data = json.load(f)
1047
1048 # If it's a like, delete it immediately and don't process
1049 if notif_data.get('reason') == 'like':
1050 filepath.unlink()
1051 likes_deleted += 1
1052 logger.debug(f"Deleted like notification: {filepath.name}")
1053 else:
1054 queue_files.append(filepath)
1055 except Exception as e:
1056 logger.warning(f"Error checking notification file {filepath.name}: {e}")
1057 queue_files.append(filepath) # Keep it in case it's valid
1058
1059 if likes_deleted > 0:
1060 logger.info(f"Deleted {likes_deleted} like notifications from queue")
1061
1062 if not queue_files:
1063 return
1064
1065 logger.info(f"Processing {len(queue_files)} queued notifications")
1066
1067 # Log current statistics
1068 elapsed_time = time.time() - start_time
1069 total_messages = sum(message_counters.values())
1070 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1071
1072 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min")
1073
1074 for i, filepath in enumerate(queue_files, 1):
1075 # Determine if this is a priority notification
1076 is_priority = filepath.name.startswith("0_")
1077
1078 # Check for new notifications periodically during queue processing
1079 # Also check immediately after processing each priority item
1080 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1)
1081
1082 # If we just processed a priority item, immediately check for new priority notifications
1083 if is_priority and i > 1:
1084 should_check_notifications = True
1085
1086 if should_check_notifications:
1087 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)")
1088 try:
1089 # Fetch and queue new notifications without processing them
1090 new_count = fetch_and_queue_new_notifications(atproto_client)
1091
1092 if new_count > 0:
1093 logger.info(f"Added {new_count} new notifications to queue")
1094 # Reload the queue files to include the new items
1095 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
1096 queue_files = updated_queue_files
1097 logger.info(f"Queue updated: now {len(queue_files)} total items")
1098 except Exception as e:
1099 logger.error(f"Error checking for new notifications: {e}")
1100
1101 priority_label = " [PRIORITY]" if is_priority else ""
1102 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}")
1103 try:
1104 # Load notification data
1105 with open(filepath, 'r') as f:
1106 notif_data = json.load(f)
1107
1108 # Process based on type using dict data directly
1109 success = False
1110 if notif_data['reason'] == "mention":
1111 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
1112 if success:
1113 message_counters['mentions'] += 1
1114 elif notif_data['reason'] == "reply":
1115 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
1116 if success:
1117 message_counters['replies'] += 1
1118 elif notif_data['reason'] == "follow":
1119 author_handle = notif_data['author']['handle']
1120 author_display_name = notif_data['author'].get('display_name', 'no display name')
1121 follow_update = f"@{author_handle} ({author_display_name}) started following you."
1122 follow_message = f"Update: {follow_update}"
1123 logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars")
1124 CLIENT.agents.messages.create(
1125 agent_id = void_agent.id,
1126 messages = [{"role":"user", "content": follow_message}]
1127 )
1128 success = True # Follow updates are always successful
1129 if success:
1130 message_counters['follows'] += 1
1131 elif notif_data['reason'] == "repost":
1132 # Skip reposts silently
1133 success = True # Skip reposts but mark as successful to remove from queue
1134 if success:
1135 message_counters['reposts_skipped'] += 1
1136 elif notif_data['reason'] == "like":
1137 # Skip likes silently
1138 success = True # Skip likes but mark as successful to remove from queue
1139 if success:
1140 message_counters.setdefault('likes_skipped', 0)
1141 message_counters['likes_skipped'] += 1
1142 else:
1143 logger.warning(f"Unknown notification type: {notif_data['reason']}")
1144 success = True # Remove unknown types from queue
1145
1146 # Handle file based on processing result
1147 if success:
1148 if testing_mode:
1149 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}")
1150 else:
1151 filepath.unlink()
1152 logger.info(f"Successfully processed and removed: {filepath.name}")
1153
1154 # Mark as processed to avoid reprocessing
1155 if NOTIFICATION_DB:
1156 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='processed')
1157 else:
1158 processed_uris = load_processed_notifications()
1159 processed_uris.add(notif_data['uri'])
1160 save_processed_notifications(processed_uris)
1161
1162 elif success is None: # Special case for moving to error directory
1163 error_path = QUEUE_ERROR_DIR / filepath.name
1164 filepath.rename(error_path)
1165 logger.warning(f"Moved {filepath.name} to errors directory")
1166
1167 # Also mark as processed to avoid retrying
1168 if NOTIFICATION_DB:
1169 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1170 else:
1171 processed_uris = load_processed_notifications()
1172 processed_uris.add(notif_data['uri'])
1173 save_processed_notifications(processed_uris)
1174
1175 elif success == "no_reply": # Special case for moving to no_reply directory
1176 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name
1177 filepath.rename(no_reply_path)
1178 logger.info(f"Moved {filepath.name} to no_reply directory")
1179
1180 # Also mark as processed to avoid retrying
1181 if NOTIFICATION_DB:
1182 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1183 else:
1184 processed_uris = load_processed_notifications()
1185 processed_uris.add(notif_data['uri'])
1186 save_processed_notifications(processed_uris)
1187
1188 elif success == "ignored": # Special case for explicitly ignored notifications
1189 # For ignored notifications, we just delete them (not move to no_reply)
1190 filepath.unlink()
1191 logger.info(f"🚫 Deleted ignored notification: {filepath.name}")
1192
1193 # Also mark as processed to avoid retrying
1194 if NOTIFICATION_DB:
1195 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1196 else:
1197 processed_uris = load_processed_notifications()
1198 processed_uris.add(notif_data['uri'])
1199 save_processed_notifications(processed_uris)
1200
1201 else:
1202 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry")
1203
1204 except Exception as e:
1205 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}")
1206 # Keep the file for retry later
1207
1208 except Exception as e:
1209 logger.error(f"Error loading queued notifications: {e}")
1210
1211
1212def fetch_and_queue_new_notifications(atproto_client):
1213 """Fetch new notifications and queue them without processing."""
1214 try:
1215 global NOTIFICATION_DB
1216
1217 # Get current time for marking notifications as seen
1218 logger.debug("Getting current time for notification marking...")
1219 last_seen_at = atproto_client.get_current_time_iso()
1220
1221 # Get timestamp of last processed notification for filtering
1222 last_processed_time = None
1223 if NOTIFICATION_DB:
1224 last_processed_time = NOTIFICATION_DB.get_latest_processed_time()
1225 if last_processed_time:
1226 logger.debug(f"Last processed notification was at: {last_processed_time}")
1227
1228 # Fetch ALL notifications using pagination
1229 all_notifications = []
1230 cursor = None
1231 page_count = 0
1232 max_pages = 20 # Safety limit to prevent infinite loops
1233
1234 while page_count < max_pages:
1235 try:
1236 # Fetch notifications page
1237 if cursor:
1238 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1239 params={'cursor': cursor, 'limit': 100}
1240 )
1241 else:
1242 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1243 params={'limit': 100}
1244 )
1245
1246 page_count += 1
1247 page_notifications = notifications_response.notifications
1248
1249 if not page_notifications:
1250 break
1251
1252 all_notifications.extend(page_notifications)
1253
1254 # Check if there are more pages
1255 cursor = getattr(notifications_response, 'cursor', None)
1256 if not cursor:
1257 break
1258
1259 except Exception as e:
1260 logger.error(f"Error fetching notifications page {page_count}: {e}")
1261 break
1262
1263 # Now process all fetched notifications
1264 new_count = 0
1265 if all_notifications:
1266 logger.info(f"📥 Fetched {len(all_notifications)} total notifications from API")
1267
1268 # Mark as seen first
1269 try:
1270 atproto_client.app.bsky.notification.update_seen(
1271 data={'seenAt': last_seen_at}
1272 )
1273 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}")
1274 except Exception as e:
1275 logger.error(f"Error marking notifications as seen: {e}")
1276
1277 # Debug counters
1278 skipped_read = 0
1279 skipped_likes = 0
1280 skipped_processed = 0
1281 skipped_old_timestamp = 0
1282 processed_uris = load_processed_notifications()
1283
1284 # Queue all new notifications (except likes)
1285 for notif in all_notifications:
1286 # Skip if older than last processed (when we have timestamp filtering)
1287 if last_processed_time and hasattr(notif, 'indexed_at'):
1288 if notif.indexed_at <= last_processed_time:
1289 skipped_old_timestamp += 1
1290 logger.debug(f"Skipping old notification (indexed_at {notif.indexed_at} <= {last_processed_time})")
1291 continue
1292
1293 # Debug: Log is_read status but DON'T skip based on it
1294 if hasattr(notif, 'is_read') and notif.is_read:
1295 skipped_read += 1
1296 logger.debug(f"Notification has is_read=True (but processing anyway): {notif.uri if hasattr(notif, 'uri') else 'unknown'}")
1297
1298 # Skip likes
1299 if hasattr(notif, 'reason') and notif.reason == 'like':
1300 skipped_likes += 1
1301 continue
1302
1303 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif
1304
1305 # Skip likes in dict form too
1306 if notif_dict.get('reason') == 'like':
1307 continue
1308
1309 # Check if already processed
1310 notif_uri = notif_dict.get('uri', '')
1311 if notif_uri in processed_uris:
1312 skipped_processed += 1
1313 logger.debug(f"Skipping already processed: {notif_uri}")
1314 continue
1315
1316 # Check if it's a priority notification
1317 is_priority = False
1318
1319 # Priority for cameron.pfiffer.org notifications
1320 author_handle = notif_dict.get('author', {}).get('handle', '')
1321 if author_handle == "cameron.pfiffer.org":
1322 is_priority = True
1323
1324 # Also check for priority keywords in mentions
1325 if notif_dict.get('reason') == 'mention':
1326 # Get the mention text to check for priority keywords
1327 record = notif_dict.get('record', {})
1328 text = record.get('text', '')
1329 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']):
1330 is_priority = True
1331
1332 if save_notification_to_queue(notif_dict, is_priority=is_priority):
1333 new_count += 1
1334 logger.debug(f"Queued notification from @{author_handle}: {notif_dict.get('reason', 'unknown')}")
1335
1336 # Log summary of filtering
1337 logger.info(f"📊 Notification processing summary:")
1338 logger.info(f" • Total fetched: {len(all_notifications)}")
1339 logger.info(f" • Had is_read=True: {skipped_read} (not skipped)")
1340 logger.info(f" • Skipped (likes): {skipped_likes}")
1341 logger.info(f" • Skipped (old timestamp): {skipped_old_timestamp}")
1342 logger.info(f" • Skipped (already processed): {skipped_processed}")
1343 logger.info(f" • Queued for processing: {new_count}")
1344 else:
1345 logger.debug("No new notifications to queue")
1346
1347 return new_count
1348
1349 except Exception as e:
1350 logger.error(f"Error fetching and queueing notifications: {e}")
1351 return 0
1352
1353
1354def process_notifications(void_agent, atproto_client, testing_mode=False):
1355 """Fetch new notifications, queue them, and process the queue."""
1356 try:
1357 # Fetch and queue new notifications
1358 new_count = fetch_and_queue_new_notifications(atproto_client)
1359
1360 if new_count > 0:
1361 logger.info(f"Found {new_count} new notifications to process")
1362
1363 # Now process the entire queue (old + new notifications)
1364 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode)
1365
1366 except Exception as e:
1367 logger.error(f"Error processing notifications: {e}")
1368
1369
1370def send_synthesis_message(client: Letta, agent_id: str, atproto_client=None) -> None:
1371 """
1372 Send a synthesis message to the agent every 10 minutes.
1373 This prompts the agent to synthesize its recent experiences.
1374
1375 Args:
1376 client: Letta client
1377 agent_id: Agent ID to send synthesis to
1378 atproto_client: Optional AT Protocol client for posting synthesis results
1379 """
1380 # Track attached temporal blocks for cleanup
1381 attached_temporal_labels = []
1382
1383 try:
1384 logger.info("🧠 Preparing synthesis with temporal journal blocks")
1385
1386 # Attach temporal blocks before synthesis
1387 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id)
1388 if not success:
1389 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway")
1390
1391 # Create enhanced synthesis prompt
1392 today = date.today()
1393 synthesis_prompt = f"""Time for synthesis and reflection.
1394
1395You have access to temporal journal blocks for recording your thoughts and experiences:
1396- void_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')})
1397- void_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')})
1398- void_year_{today.year}: This year's journal ({today.year})
1399
1400These journal blocks are attached temporarily for this synthesis session. Use them to:
14011. Record significant interactions and insights from recent experiences
14022. Track patterns in conversations and user behaviors
14033. Note your evolving understanding of the digital social environment
14044. Reflect on your growth and changes in perspective
14055. Document memorable moments or interesting discoveries
1406
1407The journal entries should be cumulative - add to existing content rather than replacing it.
1408Consider both immediate experiences (daily) and longer-term patterns (monthly/yearly).
1409
1410After recording in your journals, synthesize your recent experiences into your core memory blocks
1411(zeitgeist, void-persona, void-humans) as you normally would.
1412
1413Begin your synthesis and journaling now."""
1414
1415 logger.info("🧠 Sending enhanced synthesis prompt to agent")
1416
1417 # Send synthesis message with streaming to show tool use
1418 message_stream = client.agents.messages.create_stream(
1419 agent_id=agent_id,
1420 messages=[{"role": "user", "content": synthesis_prompt}],
1421 stream_tokens=False,
1422 max_steps=100
1423 )
1424
1425 # Track synthesis content for potential posting
1426 synthesis_posts = []
1427 ack_note = None
1428
1429 # Process the streaming response
1430 for chunk in message_stream:
1431 if hasattr(chunk, 'message_type'):
1432 if chunk.message_type == 'reasoning_message':
1433 if SHOW_REASONING:
1434 print("\n◆ Reasoning")
1435 print(" ─────────")
1436 for line in chunk.reasoning.split('\n'):
1437 print(f" {line}")
1438
1439 # Create ATProto record for reasoning (if we have atproto client)
1440 if atproto_client and hasattr(chunk, 'reasoning'):
1441 try:
1442 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning)
1443 except Exception as e:
1444 logger.debug(f"Failed to create reasoning record during synthesis: {e}")
1445 elif chunk.message_type == 'tool_call_message':
1446 tool_name = chunk.tool_call.name
1447
1448 # Create ATProto record for tool call (if we have atproto client)
1449 if atproto_client:
1450 try:
1451 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None
1452 bsky_utils.create_tool_call_record(
1453 atproto_client,
1454 tool_name,
1455 chunk.tool_call.arguments,
1456 tool_call_id
1457 )
1458 except Exception as e:
1459 logger.debug(f"Failed to create tool call record during synthesis: {e}")
1460 try:
1461 args = json.loads(chunk.tool_call.arguments)
1462 if tool_name == 'archival_memory_search':
1463 query = args.get('query', 'unknown')
1464 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue")
1465 elif tool_name == 'archival_memory_insert':
1466 content = args.get('content', '')
1467 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue")
1468 elif tool_name == 'update_block':
1469 label = args.get('label', 'unknown')
1470 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', ''))
1471 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue")
1472 elif tool_name == 'annotate_ack':
1473 note = args.get('note', '')
1474 if note:
1475 ack_note = note
1476 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue")
1477 elif tool_name == 'add_post_to_bluesky_reply_thread':
1478 text = args.get('text', '')
1479 synthesis_posts.append(text)
1480 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue")
1481 else:
1482 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
1483 if len(args_str) > 150:
1484 args_str = args_str[:150] + "..."
1485 log_with_panel(args_str, f"Tool call: {tool_name}", "blue")
1486 except:
1487 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
1488 elif chunk.message_type == 'tool_return_message':
1489 if chunk.status == 'success':
1490 log_with_panel("Success", f"Tool result: {chunk.name} ✓", "green")
1491 else:
1492 log_with_panel("Error", f"Tool result: {chunk.name} ✗", "red")
1493 elif chunk.message_type == 'assistant_message':
1494 print("\n▶ Synthesis Response")
1495 print(" ──────────────────")
1496 for line in chunk.content.split('\n'):
1497 print(f" {line}")
1498
1499 if str(chunk) == 'done':
1500 break
1501
1502 logger.info("🧠 Synthesis message processed successfully")
1503
1504 # Handle synthesis acknowledgments if we have an atproto client
1505 if atproto_client and ack_note:
1506 try:
1507 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note)
1508 if result:
1509 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...")
1510 else:
1511 logger.warning("Failed to create synthesis acknowledgment")
1512 except Exception as e:
1513 logger.error(f"Error creating synthesis acknowledgment: {e}")
1514
1515 # Handle synthesis posts if any were generated
1516 if atproto_client and synthesis_posts:
1517 try:
1518 for post_text in synthesis_posts:
1519 cleaned_text = bsky_utils.remove_outside_quotes(post_text)
1520 response = bsky_utils.send_post(atproto_client, cleaned_text)
1521 if response:
1522 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...")
1523 else:
1524 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...")
1525 except Exception as e:
1526 logger.error(f"Error posting synthesis content: {e}")
1527
1528 except Exception as e:
1529 logger.error(f"Error sending synthesis message: {e}")
1530 finally:
1531 # Always detach temporal blocks after synthesis
1532 if attached_temporal_labels:
1533 logger.info("🧠 Detaching temporal journal blocks after synthesis")
1534 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels)
1535 if not detach_success:
1536 logger.warning("Some temporal blocks may not have been detached properly")
1537
1538
1539def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None:
1540 """
1541 Detach all user blocks from the agent to prevent memory bloat.
1542 This should be called periodically to ensure clean state.
1543 """
1544 try:
1545 # Get all blocks attached to the agent
1546 attached_blocks = client.agents.blocks.list(agent_id=agent_id)
1547
1548 user_blocks_to_detach = []
1549 for block in attached_blocks:
1550 if hasattr(block, 'label') and block.label.startswith('user_'):
1551 user_blocks_to_detach.append({
1552 'label': block.label,
1553 'id': block.id
1554 })
1555
1556 if not user_blocks_to_detach:
1557 logger.debug("No user blocks found to detach during periodic cleanup")
1558 return
1559
1560 # Detach each user block
1561 detached_count = 0
1562 for block_info in user_blocks_to_detach:
1563 try:
1564 client.agents.blocks.detach(
1565 agent_id=agent_id,
1566 block_id=str(block_info['id'])
1567 )
1568 detached_count += 1
1569 logger.debug(f"Detached user block: {block_info['label']}")
1570 except Exception as e:
1571 logger.warning(f"Failed to detach block {block_info['label']}: {e}")
1572
1573 if detached_count > 0:
1574 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks")
1575
1576 except Exception as e:
1577 logger.error(f"Error during periodic user block cleanup: {e}")
1578
1579
1580def attach_temporal_blocks(client: Letta, agent_id: str) -> tuple:
1581 """
1582 Attach temporal journal blocks (day, month, year) to the agent for synthesis.
1583 Creates blocks if they don't exist.
1584
1585 Returns:
1586 Tuple of (success: bool, attached_labels: list)
1587 """
1588 try:
1589 today = date.today()
1590
1591 # Generate temporal block labels
1592 day_label = f"void_day_{today.strftime('%Y_%m_%d')}"
1593 month_label = f"void_month_{today.strftime('%Y_%m')}"
1594 year_label = f"void_year_{today.year}"
1595
1596 temporal_labels = [day_label, month_label, year_label]
1597 attached_labels = []
1598
1599 # Get current blocks attached to agent
1600 current_blocks = client.agents.blocks.list(agent_id=agent_id)
1601 current_block_labels = {block.label for block in current_blocks}
1602 current_block_ids = {str(block.id) for block in current_blocks}
1603
1604 for label in temporal_labels:
1605 try:
1606 # Skip if already attached
1607 if label in current_block_labels:
1608 logger.debug(f"Temporal block already attached: {label}")
1609 attached_labels.append(label)
1610 continue
1611
1612 # Check if block exists globally
1613 blocks = client.blocks.list(label=label)
1614
1615 if blocks and len(blocks) > 0:
1616 block = blocks[0]
1617 # Check if already attached by ID
1618 if str(block.id) in current_block_ids:
1619 logger.debug(f"Temporal block already attached by ID: {label}")
1620 attached_labels.append(label)
1621 continue
1622 else:
1623 # Create new temporal block with appropriate header
1624 if "day" in label:
1625 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}"
1626 initial_content = f"{header}\n\nNo entries yet for today."
1627 elif "month" in label:
1628 header = f"# Monthly Journal - {today.strftime('%B %Y')}"
1629 initial_content = f"{header}\n\nNo entries yet for this month."
1630 else: # year
1631 header = f"# Yearly Journal - {today.year}"
1632 initial_content = f"{header}\n\nNo entries yet for this year."
1633
1634 block = client.blocks.create(
1635 label=label,
1636 value=initial_content,
1637 limit=10000 # Larger limit for journal blocks
1638 )
1639 logger.info(f"Created new temporal block: {label}")
1640
1641 # Attach the block
1642 client.agents.blocks.attach(
1643 agent_id=agent_id,
1644 block_id=str(block.id)
1645 )
1646 attached_labels.append(label)
1647 logger.info(f"Attached temporal block: {label}")
1648
1649 except Exception as e:
1650 # Check for duplicate constraint errors
1651 error_str = str(e)
1652 if "duplicate key value violates unique constraint" in error_str:
1653 logger.debug(f"Temporal block already attached (constraint): {label}")
1654 attached_labels.append(label)
1655 else:
1656 logger.warning(f"Failed to attach temporal block {label}: {e}")
1657
1658 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}")
1659 return True, attached_labels
1660
1661 except Exception as e:
1662 logger.error(f"Error attaching temporal blocks: {e}")
1663 return False, []
1664
1665
1666def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: list = None) -> bool:
1667 """
1668 Detach temporal journal blocks from the agent after synthesis.
1669
1670 Args:
1671 client: Letta client
1672 agent_id: Agent ID
1673 labels_to_detach: Optional list of specific labels to detach.
1674 If None, detaches all temporal blocks.
1675
1676 Returns:
1677 bool: Success status
1678 """
1679 try:
1680 # If no specific labels provided, generate today's labels
1681 if labels_to_detach is None:
1682 today = date.today()
1683 labels_to_detach = [
1684 f"void_day_{today.strftime('%Y_%m_%d')}",
1685 f"void_month_{today.strftime('%Y_%m')}",
1686 f"void_year_{today.year}"
1687 ]
1688
1689 # Get current blocks and build label to ID mapping
1690 current_blocks = client.agents.blocks.list(agent_id=agent_id)
1691 block_label_to_id = {block.label: str(block.id) for block in current_blocks}
1692
1693 detached_count = 0
1694 for label in labels_to_detach:
1695 if label in block_label_to_id:
1696 try:
1697 client.agents.blocks.detach(
1698 agent_id=agent_id,
1699 block_id=block_label_to_id[label]
1700 )
1701 detached_count += 1
1702 logger.debug(f"Detached temporal block: {label}")
1703 except Exception as e:
1704 logger.warning(f"Failed to detach temporal block {label}: {e}")
1705 else:
1706 logger.debug(f"Temporal block not attached: {label}")
1707
1708 logger.info(f"Detached {detached_count} temporal blocks")
1709 return True
1710
1711 except Exception as e:
1712 logger.error(f"Error detaching temporal blocks: {e}")
1713 return False
1714
1715
1716def main():
1717 # Parse command line arguments
1718 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent')
1719 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)')
1720 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state')
1721 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)')
1722 # --rich option removed as we now use simple text formatting
1723 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO')
1724 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)')
1725 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)')
1726 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)')
1727 args = parser.parse_args()
1728
1729 # Configure logging based on command line arguments
1730 if args.simple_logs:
1731 log_format = "void - %(levelname)s - %(message)s"
1732 else:
1733 # Create custom formatter with symbols
1734 class SymbolFormatter(logging.Formatter):
1735 """Custom formatter that adds symbols for different log levels"""
1736
1737 SYMBOLS = {
1738 logging.DEBUG: '◇',
1739 logging.INFO: '✓',
1740 logging.WARNING: '⚠',
1741 logging.ERROR: '✗',
1742 logging.CRITICAL: '‼'
1743 }
1744
1745 def format(self, record):
1746 # Get the symbol for this log level
1747 symbol = self.SYMBOLS.get(record.levelno, '•')
1748
1749 # Format time as HH:MM:SS
1750 timestamp = self.formatTime(record, "%H:%M:%S")
1751
1752 # Build the formatted message
1753 level_name = f"{record.levelname:<5}" # Left-align, 5 chars
1754
1755 # Use vertical bar as separator
1756 parts = [symbol, timestamp, '│', level_name, '│', record.getMessage()]
1757
1758 return ' '.join(parts)
1759
1760 # Reset logging configuration
1761 for handler in logging.root.handlers[:]:
1762 logging.root.removeHandler(handler)
1763
1764 # Create handler with custom formatter
1765 handler = logging.StreamHandler()
1766 if not args.simple_logs:
1767 handler.setFormatter(SymbolFormatter())
1768 else:
1769 handler.setFormatter(logging.Formatter(log_format))
1770
1771 # Configure root logger
1772 logging.root.setLevel(logging.INFO)
1773 logging.root.addHandler(handler)
1774
1775 global logger, prompt_logger, console
1776 logger = logging.getLogger("void_bot")
1777 logger.setLevel(logging.INFO)
1778
1779 # Create a separate logger for prompts (set to WARNING to hide by default)
1780 prompt_logger = logging.getLogger("void_bot.prompts")
1781 if args.reasoning:
1782 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used
1783 else:
1784 prompt_logger.setLevel(logging.WARNING) # Hide by default
1785
1786 # Disable httpx logging completely
1787 logging.getLogger("httpx").setLevel(logging.CRITICAL)
1788
1789 # Create Rich console for pretty printing
1790 # Console no longer used - simple text formatting
1791
1792 global TESTING_MODE, SKIP_GIT, SHOW_REASONING
1793 TESTING_MODE = args.test
1794
1795 # Store no-git flag globally for use in export_agent_state calls
1796 SKIP_GIT = args.no_git
1797
1798 # Store rich flag globally
1799 # Rich formatting no longer used
1800
1801 # Store reasoning flag globally
1802 SHOW_REASONING = args.reasoning
1803
1804 if TESTING_MODE:
1805 logger.info("=== RUNNING IN TESTING MODE ===")
1806 logger.info(" - No messages will be sent to Bluesky")
1807 logger.info(" - Queue files will not be deleted")
1808 logger.info(" - Notifications will not be marked as seen")
1809 print("\n")
1810
1811 # Check for synthesis-only mode
1812 SYNTHESIS_ONLY = args.synthesis_only
1813 if SYNTHESIS_ONLY:
1814 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===")
1815 logger.info(" - Only synthesis messages will be sent")
1816 logger.info(" - No notification processing")
1817 logger.info(" - No Bluesky client needed")
1818 print("\n")
1819 """Main bot loop that continuously monitors for notifications."""
1820 global start_time
1821 start_time = time.time()
1822 logger.info("=== STARTING VOID BOT ===")
1823 void_agent = initialize_void()
1824 logger.info(f"Void agent initialized: {void_agent.id}")
1825
1826 # Initialize notification database
1827 global NOTIFICATION_DB
1828 logger.info("Initializing notification database...")
1829 NOTIFICATION_DB = NotificationDB()
1830
1831 # Migrate from old JSON format if it exists
1832 if PROCESSED_NOTIFICATIONS_FILE.exists():
1833 logger.info("Found old processed_notifications.json, migrating to database...")
1834 NOTIFICATION_DB.migrate_from_json(str(PROCESSED_NOTIFICATIONS_FILE))
1835
1836 # Log database stats
1837 db_stats = NOTIFICATION_DB.get_stats()
1838 logger.info(f"Database initialized - Total notifications: {db_stats.get('total', 0)}, Recent (24h): {db_stats.get('recent_24h', 0)}")
1839
1840 # Clean up old records
1841 NOTIFICATION_DB.cleanup_old_records(days=7)
1842
1843 # Ensure correct tools are attached for Bluesky
1844 logger.info("Configuring tools for Bluesky platform...")
1845 try:
1846 from tool_manager import ensure_platform_tools
1847 ensure_platform_tools('bluesky', void_agent.id)
1848 except Exception as e:
1849 logger.error(f"Failed to configure platform tools: {e}")
1850 logger.warning("Continuing with existing tool configuration")
1851
1852 # Check if agent has required tools
1853 if hasattr(void_agent, 'tools') and void_agent.tools:
1854 tool_names = [tool.name for tool in void_agent.tools]
1855 # Check for bluesky-related tools
1856 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()]
1857 if not bluesky_tools:
1858 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.")
1859 else:
1860 logger.warning("Agent has no tools registered!")
1861
1862 # Clean up all user blocks at startup
1863 logger.info("🧹 Cleaning up user blocks at startup...")
1864 periodic_user_block_cleanup(CLIENT, void_agent.id)
1865
1866 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts)
1867 if not SYNTHESIS_ONLY:
1868 atproto_client = bsky_utils.default_login()
1869 logger.info("Connected to Bluesky")
1870 else:
1871 # In synthesis-only mode, still connect for acks and posts (unless in test mode)
1872 if not args.test:
1873 atproto_client = bsky_utils.default_login()
1874 logger.info("Connected to Bluesky (for synthesis acks/posts)")
1875 else:
1876 atproto_client = None
1877 logger.info("Skipping Bluesky connection (test mode)")
1878
1879 # Configure intervals
1880 CLEANUP_INTERVAL = args.cleanup_interval
1881 SYNTHESIS_INTERVAL = args.synthesis_interval
1882
1883 # Synthesis-only mode
1884 if SYNTHESIS_ONLY:
1885 if SYNTHESIS_INTERVAL <= 0:
1886 logger.error("Synthesis-only mode requires --synthesis-interval > 0")
1887 return
1888
1889 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)")
1890
1891 while True:
1892 try:
1893 # Send synthesis message immediately on first run
1894 logger.info("🧠 Sending synthesis message")
1895 send_synthesis_message(CLIENT, void_agent.id, atproto_client)
1896
1897 # Wait for next interval
1898 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...")
1899 sleep(SYNTHESIS_INTERVAL)
1900
1901 except KeyboardInterrupt:
1902 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===")
1903 break
1904 except Exception as e:
1905 logger.error(f"Error in synthesis loop: {e}")
1906 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...")
1907 sleep(SYNTHESIS_INTERVAL)
1908
1909 # Normal mode with notification processing
1910 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds")
1911
1912 cycle_count = 0
1913
1914 if CLEANUP_INTERVAL > 0:
1915 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles")
1916 else:
1917 logger.info("User block cleanup disabled")
1918
1919 if SYNTHESIS_INTERVAL > 0:
1920 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)")
1921 else:
1922 logger.info("Synthesis messages disabled")
1923
1924 while True:
1925 try:
1926 cycle_count += 1
1927 process_notifications(void_agent, atproto_client, TESTING_MODE)
1928
1929 # Check if synthesis interval has passed
1930 if SYNTHESIS_INTERVAL > 0:
1931 current_time = time.time()
1932 global last_synthesis_time
1933 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL:
1934 logger.info(f"⏰ {SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis")
1935 send_synthesis_message(CLIENT, void_agent.id, atproto_client)
1936 last_synthesis_time = current_time
1937
1938 # Run periodic cleanup every N cycles
1939 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0:
1940 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})")
1941 periodic_user_block_cleanup(CLIENT, void_agent.id)
1942
1943 # Also check database health when doing cleanup
1944 if NOTIFICATION_DB:
1945 db_stats = NOTIFICATION_DB.get_stats()
1946 pending = db_stats.get('status_pending', 0)
1947 errors = db_stats.get('status_error', 0)
1948
1949 if pending > 50:
1950 logger.warning(f"⚠️ Queue health check: {pending} pending notifications (may be stuck)")
1951 if errors > 20:
1952 logger.warning(f"⚠️ Queue health check: {errors} error notifications")
1953
1954 # Periodic cleanup of old records
1955 if cycle_count % (CLEANUP_INTERVAL * 10) == 0: # Every 100 cycles
1956 logger.info("Running database cleanup of old records...")
1957 NOTIFICATION_DB.cleanup_old_records(days=7)
1958
1959 # Log cycle completion with stats
1960 elapsed_time = time.time() - start_time
1961 total_messages = sum(message_counters.values())
1962 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1963
1964 if total_messages > 0:
1965 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min")
1966 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
1967
1968 except KeyboardInterrupt:
1969 # Final stats
1970 elapsed_time = time.time() - start_time
1971 total_messages = sum(message_counters.values())
1972 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1973
1974 logger.info("=== BOT STOPPED BY USER ===")
1975 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes")
1976 logger.info(f" - {message_counters['mentions']} mentions")
1977 logger.info(f" - {message_counters['replies']} replies")
1978 logger.info(f" - {message_counters['follows']} follows")
1979 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped")
1980 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute")
1981
1982 # Close database connection
1983 if NOTIFICATION_DB:
1984 logger.info("Closing database connection...")
1985 NOTIFICATION_DB.close()
1986
1987 break
1988 except Exception as e:
1989 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")
1990 logger.error(f"Error details: {e}")
1991 # Wait a bit longer on errors
1992 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...")
1993 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
1994
1995
1996if __name__ == "__main__":
1997 main()