a digital person for bluesky
1# Rich imports removed - using simple text formatting
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
5import os
6import logging
7import json
8import hashlib
9import subprocess
10from pathlib import Path
11from datetime import datetime
12from collections import defaultdict
13import time
14import argparse
15
16from utils import (
17 upsert_block,
18 upsert_agent
19)
20
21import bsky_utils
22from tools.blocks import attach_user_blocks, detach_user_blocks
23from datetime import date
24from notification_db import NotificationDB
25
26def extract_handles_from_data(data):
27 """Recursively extract all unique handles from nested data structure."""
28 handles = set()
29
30 def _extract_recursive(obj):
31 if isinstance(obj, dict):
32 # Check if this dict has a 'handle' key
33 if 'handle' in obj:
34 handles.add(obj['handle'])
35 # Recursively check all values
36 for value in obj.values():
37 _extract_recursive(value)
38 elif isinstance(obj, list):
39 # Recursively check all list items
40 for item in obj:
41 _extract_recursive(item)
42
43 _extract_recursive(data)
44 return list(handles)
45
46# Logging will be configured after argument parsing
47logger = None
48prompt_logger = None
49# Simple text formatting (Rich no longer used)
50SHOW_REASONING = False
51last_archival_query = "archival memory search"
52
53def log_with_panel(message, title=None, border_color="white"):
54 """Log a message with Unicode box-drawing characters"""
55 if title:
56 # Map old color names to appropriate symbols
57 symbol_map = {
58 "blue": "⚙", # Tool calls
59 "green": "✓", # Success/completion
60 "yellow": "◆", # Reasoning
61 "red": "✗", # Errors
62 "white": "▶", # Default/mentions
63 "cyan": "✎", # Posts
64 }
65 symbol = symbol_map.get(border_color, "▶")
66
67 print(f"\n{symbol} {title}")
68 print(f" {'─' * len(title)}")
69 # Indent message lines
70 for line in message.split('\n'):
71 print(f" {line}")
72 else:
73 print(message)
74
75
76# Create a client with extended timeout for LLM operations
77CLIENT= Letta(
78 token=os.environ["LETTA_API_KEY"],
79 timeout=600 # 10 minutes timeout for API calls - higher than Cloudflare's 524 timeout
80)
81
82# Use the "Bluesky" project
83PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8"
84
85# Notification check delay
86FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response
87
88# Check for new notifications every N queue items
89CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing
90
91# Queue directory
92QUEUE_DIR = Path("queue")
93QUEUE_DIR.mkdir(exist_ok=True)
94QUEUE_ERROR_DIR = Path("queue/errors")
95QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True)
96QUEUE_NO_REPLY_DIR = Path("queue/no_reply")
97QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True)
98PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json")
99
100# Maximum number of processed notifications to track
101MAX_PROCESSED_NOTIFICATIONS = 10000
102
103# Message tracking counters
104message_counters = defaultdict(int)
105start_time = time.time()
106
107# Testing mode flag
108TESTING_MODE = False
109
110# Skip git operations flag
111SKIP_GIT = False
112
113# Synthesis message tracking
114last_synthesis_time = time.time()
115
116# Database for notification tracking
117NOTIFICATION_DB = None
118
119def export_agent_state(client, agent, skip_git=False):
120 """Export agent state to agent_archive/ (timestamped) and agents/ (current)."""
121 try:
122 # Confirm export with user unless git is being skipped
123 if not skip_git:
124 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip()
125 if response not in ['y', 'yes']:
126 logger.info("Agent export cancelled by user.")
127 return
128 else:
129 logger.info("Exporting agent state (git staging disabled)")
130
131 # Create directories if they don't exist
132 os.makedirs("agent_archive", exist_ok=True)
133 os.makedirs("agents", exist_ok=True)
134
135 # Export agent data
136 logger.info(f"Exporting agent {agent.id}. This takes some time...")
137 agent_data = client.agents.export_file(agent_id=agent.id)
138
139 # Save timestamped archive copy
140 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
141 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af")
142 with open(archive_file, 'w', encoding='utf-8') as f:
143 json.dump(agent_data, f, indent=2, ensure_ascii=False)
144
145 # Save current agent state
146 current_file = os.path.join("agents", "void.af")
147 with open(current_file, 'w', encoding='utf-8') as f:
148 json.dump(agent_data, f, indent=2, ensure_ascii=False)
149
150 logger.info(f"Agent exported to {archive_file} and {current_file}")
151
152 # Git add only the current agent file (archive is ignored) unless skip_git is True
153 if not skip_git:
154 try:
155 subprocess.run(["git", "add", current_file], check=True, capture_output=True)
156 logger.info("Added current agent file to git staging")
157 except subprocess.CalledProcessError as e:
158 logger.warning(f"Failed to git add agent file: {e}")
159
160 except Exception as e:
161 logger.error(f"Failed to export agent: {e}")
162
163def initialize_void():
164 logger.info("Starting void agent initialization...")
165
166 # Get the configured void agent by ID
167 logger.info("Loading void agent from config...")
168 from config_loader import get_letta_config
169 letta_config = get_letta_config()
170 agent_id = letta_config['agent_id']
171
172 try:
173 void_agent = CLIENT.agents.retrieve(agent_id=agent_id)
174 logger.info(f"Successfully loaded void agent: {void_agent.name} ({agent_id})")
175 except Exception as e:
176 logger.error(f"Failed to load void agent {agent_id}: {e}")
177 logger.error("Please ensure the agent_id in config.yaml is correct")
178 raise e
179
180 # Export agent state
181 logger.info("Exporting agent state...")
182 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
183
184 # Log agent details
185 logger.info(f"Void agent details - ID: {void_agent.id}")
186 logger.info(f"Agent name: {void_agent.name}")
187 if hasattr(void_agent, 'llm_config'):
188 logger.info(f"Agent model: {void_agent.llm_config.model}")
189 logger.info(f"Agent project_id: {void_agent.project_id}")
190 if hasattr(void_agent, 'tools'):
191 logger.info(f"Agent has {len(void_agent.tools)} tools")
192 for tool in void_agent.tools[:3]: # Show first 3 tools
193 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})")
194
195 return void_agent
196
197
198def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False):
199 """Process a mention and generate a reply using the Letta agent.
200
201 Args:
202 void_agent: The Letta agent instance
203 atproto_client: The AT Protocol client
204 notification_data: The notification data dictionary
205 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
206
207 Returns:
208 True: Successfully processed, remove from queue
209 False: Failed but retryable, keep in queue
210 None: Failed with non-retryable error, move to errors directory
211 "no_reply": No reply was generated, move to no_reply directory
212 """
213 import uuid
214
215 # Generate correlation ID for tracking this notification through the pipeline
216 correlation_id = str(uuid.uuid4())[:8]
217
218 try:
219 logger.info(f"[{correlation_id}] Starting process_mention", extra={
220 'correlation_id': correlation_id,
221 'notification_type': type(notification_data).__name__
222 })
223
224 # Handle both dict and object inputs for backwards compatibility
225 if isinstance(notification_data, dict):
226 uri = notification_data['uri']
227 mention_text = notification_data.get('record', {}).get('text', '')
228 author_handle = notification_data['author']['handle']
229 author_name = notification_data['author'].get('display_name') or author_handle
230 else:
231 # Legacy object access
232 uri = notification_data.uri
233 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
234 author_handle = notification_data.author.handle
235 author_name = notification_data.author.display_name or author_handle
236
237 logger.info(f"[{correlation_id}] Processing mention from @{author_handle}", extra={
238 'correlation_id': correlation_id,
239 'author_handle': author_handle,
240 'author_name': author_name,
241 'mention_uri': uri,
242 'mention_text_length': len(mention_text),
243 'mention_preview': mention_text[:100] if mention_text else ''
244 })
245
246 # Retrieve the entire thread associated with the mention
247 try:
248 thread = atproto_client.app.bsky.feed.get_post_thread({
249 'uri': uri,
250 'parent_height': 40,
251 'depth': 10
252 })
253 except Exception as e:
254 error_str = str(e)
255 # Check if this is a NotFound error
256 if 'NotFound' in error_str or 'Post not found' in error_str:
257 logger.warning(f"Post not found for URI {uri}, removing from queue")
258 return True # Return True to remove from queue
259 else:
260 # Re-raise other errors
261 logger.error(f"Error fetching thread: {e}")
262 raise
263
264 # Get thread context as YAML string
265 logger.debug("Converting thread to YAML string")
266 try:
267 thread_context = thread_to_yaml_string(thread)
268 logger.debug(f"Thread context generated, length: {len(thread_context)} characters")
269
270 # Check if #voidstop appears anywhere in the thread
271 if "#voidstop" in thread_context.lower():
272 logger.info("Found #voidstop in thread context, skipping this mention")
273 return True # Return True to remove from queue
274
275 # Also check the mention text directly
276 if "#voidstop" in mention_text.lower():
277 logger.info("Found #voidstop in mention text, skipping this mention")
278 return True # Return True to remove from queue
279
280 # Create a more informative preview by extracting meaningful content
281 lines = thread_context.split('\n')
282 meaningful_lines = []
283
284 for line in lines:
285 stripped = line.strip()
286 if not stripped:
287 continue
288
289 # Look for lines with actual content (not just structure)
290 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']):
291 meaningful_lines.append(line)
292 if len(meaningful_lines) >= 5:
293 break
294
295 if meaningful_lines:
296 preview = '\n'.join(meaningful_lines)
297 logger.debug(f"Thread content preview:\n{preview}")
298 else:
299 # If no content fields found, just show it's a thread structure
300 logger.debug(f"Thread structure generated ({len(thread_context)} chars)")
301 except Exception as yaml_error:
302 import traceback
303 logger.error(f"Error converting thread to YAML: {yaml_error}")
304 logger.error(f"Full traceback:\n{traceback.format_exc()}")
305 logger.error(f"Thread type: {type(thread)}")
306 if hasattr(thread, '__dict__'):
307 logger.error(f"Thread attributes: {thread.__dict__}")
308 # Try to continue with a simple context
309 thread_context = f"Error processing thread context: {str(yaml_error)}"
310
311 # Create a prompt for the Letta agent with thread context
312 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
313
314MOST RECENT POST (the mention you're responding to):
315"{mention_text}"
316
317FULL THREAD CONTEXT:
318```yaml
319{thread_context}
320```
321
322The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
323
324To reply, use the add_post_to_bluesky_reply_thread tool:
325- Each call creates one post (max 300 characters)
326- For most responses, a single call is sufficient
327- Only use multiple calls for threaded replies when:
328 * The topic requires extended explanation that cannot fit in 300 characters
329 * You're explicitly asked for a detailed/long response
330 * The conversation naturally benefits from a structured multi-part answer
331- Avoid unnecessary threads - be concise when possible"""
332
333 # Extract all handles from notification and thread data
334 all_handles = set()
335 all_handles.update(extract_handles_from_data(notification_data))
336 all_handles.update(extract_handles_from_data(thread.model_dump()))
337 unique_handles = list(all_handles)
338
339 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}")
340
341 # Check if any handles are in known_bots list
342 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs
343 import json
344
345 try:
346 # Check for known bots in thread
347 bot_check_result = check_known_bots(unique_handles, void_agent)
348 bot_check_data = json.loads(bot_check_result)
349
350 # TEMPORARILY DISABLED: Bot detection causing issues with normal users
351 # TODO: Re-enable after debugging why normal users are being flagged as bots
352 if False: # bot_check_data.get("bot_detected", False):
353 detected_bots = bot_check_data.get("detected_bots", [])
354 logger.info(f"Bot detected in thread: {detected_bots}")
355
356 # Decide whether to respond (10% chance)
357 if not should_respond_to_bot_thread():
358 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}")
359 # Return False to keep in queue for potential later processing
360 return False
361 else:
362 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}")
363 else:
364 logger.debug("Bot detection disabled - processing all notifications")
365
366 except Exception as bot_check_error:
367 logger.warning(f"Error checking for bots: {bot_check_error}")
368 # Continue processing if bot check fails
369
370 # Attach user blocks before agent call
371 attached_handles = []
372 if unique_handles:
373 try:
374 logger.debug(f"Attaching user blocks for handles: {unique_handles}")
375 attach_result = attach_user_blocks(unique_handles, void_agent)
376 attached_handles = unique_handles # Track successfully attached handles
377 logger.debug(f"Attach result: {attach_result}")
378 except Exception as attach_error:
379 logger.warning(f"Failed to attach user blocks: {attach_error}")
380 # Continue without user blocks rather than failing completely
381
382 # Get response from Letta agent
383 # Format with Unicode characters
384 title = f"MENTION FROM @{author_handle}"
385 print(f"\n▶ {title}")
386 print(f" {'═' * len(title)}")
387 # Indent the mention text
388 for line in mention_text.split('\n'):
389 print(f" {line}")
390
391 # Log prompt details to separate logger
392 prompt_logger.debug(f"Full prompt being sent:\n{prompt}")
393
394 # Log concise prompt info to main logger
395 thread_handles_count = len(unique_handles)
396 prompt_char_count = len(prompt)
397 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars")
398
399 try:
400 # Use streaming to avoid 524 timeout errors
401 message_stream = CLIENT.agents.messages.create_stream(
402 agent_id=void_agent.id,
403 messages=[{"role": "user", "content": prompt}],
404 stream_tokens=False, # Step streaming only (faster than token streaming)
405 max_steps=100
406 )
407
408 # Collect the streaming response
409 all_messages = []
410 for chunk in message_stream:
411 # Log condensed chunk info
412 if hasattr(chunk, 'message_type'):
413 if chunk.message_type == 'reasoning_message':
414 # Show full reasoning without truncation
415 if SHOW_REASONING:
416 # Format with Unicode characters
417 print("\n◆ Reasoning")
418 print(" ─────────")
419 # Indent reasoning lines
420 for line in chunk.reasoning.split('\n'):
421 print(f" {line}")
422 else:
423 # Default log format (only when --reasoning is used due to log level)
424 # Format with Unicode characters
425 print("\n◆ Reasoning")
426 print(" ─────────")
427 # Indent reasoning lines
428 for line in chunk.reasoning.split('\n'):
429 print(f" {line}")
430
431 # Create ATProto record for reasoning (unless in testing mode)
432 if not testing_mode and hasattr(chunk, 'reasoning'):
433 try:
434 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning)
435 except Exception as e:
436 logger.debug(f"Failed to create reasoning record: {e}")
437 elif chunk.message_type == 'tool_call_message':
438 # Parse tool arguments for better display
439 tool_name = chunk.tool_call.name
440
441 # Create ATProto record for tool call (unless in testing mode)
442 if not testing_mode:
443 try:
444 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None
445 bsky_utils.create_tool_call_record(
446 atproto_client,
447 tool_name,
448 chunk.tool_call.arguments,
449 tool_call_id
450 )
451 except Exception as e:
452 logger.debug(f"Failed to create tool call record: {e}")
453
454 try:
455 args = json.loads(chunk.tool_call.arguments)
456 # Format based on tool type
457 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']:
458 # Extract the text being posted
459 text = args.get('text', '')
460 if text:
461 # Format with Unicode characters
462 print("\n✎ Bluesky Post")
463 print(" ────────────")
464 # Indent post text
465 for line in text.split('\n'):
466 print(f" {line}")
467 else:
468 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
469 elif tool_name == 'archival_memory_search':
470 query = args.get('query', 'unknown')
471 global last_archival_query
472 last_archival_query = query
473 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue")
474 elif tool_name == 'archival_memory_insert':
475 content = args.get('content', '')
476 # Show the full content being inserted
477 log_with_panel(content, f"Tool call: {tool_name}", "blue")
478 elif tool_name == 'update_block':
479 label = args.get('label', 'unknown')
480 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', ''))
481 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue")
482 else:
483 # Generic display for other tools
484 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
485 if len(args_str) > 150:
486 args_str = args_str[:150] + "..."
487 log_with_panel(args_str, f"Tool call: {tool_name}", "blue")
488 except:
489 # Fallback to original format if parsing fails
490 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
491 elif chunk.message_type == 'tool_return_message':
492 # Enhanced tool result logging
493 tool_name = chunk.name
494 status = chunk.status
495
496 if status == 'success':
497 # Try to show meaningful result info based on tool type
498 if hasattr(chunk, 'tool_return') and chunk.tool_return:
499 result_str = str(chunk.tool_return)
500 if tool_name == 'archival_memory_search':
501
502 try:
503 # Handle both string and list formats
504 if isinstance(chunk.tool_return, str):
505 # The string format is: "([{...}, {...}], count)"
506 # We need to extract just the list part
507 if chunk.tool_return.strip():
508 # Find the list part between the first [ and last ]
509 start_idx = chunk.tool_return.find('[')
510 end_idx = chunk.tool_return.rfind(']')
511 if start_idx != -1 and end_idx != -1:
512 list_str = chunk.tool_return[start_idx:end_idx+1]
513 # Use ast.literal_eval since this is Python literal syntax, not JSON
514 import ast
515 results = ast.literal_eval(list_str)
516 else:
517 logger.warning("Could not find list in archival_memory_search result")
518 results = []
519 else:
520 logger.warning("Empty string returned from archival_memory_search")
521 results = []
522 else:
523 # If it's already a list, use directly
524 results = chunk.tool_return
525
526 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name} ✓", "green")
527
528 # Use the captured search query from the tool call
529 search_query = last_archival_query
530
531 # Combine all results into a single text block
532 content_text = ""
533 for i, entry in enumerate(results, 1):
534 timestamp = entry.get('timestamp', 'N/A')
535 content = entry.get('content', '')
536 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n"
537
538 # Format with Unicode characters
539 title = f"{search_query} ({len(results)} results)"
540 print(f"\n⚙ {title}")
541 print(f" {'─' * len(title)}")
542 # Indent content text
543 for line in content_text.strip().split('\n'):
544 print(f" {line}")
545
546 except Exception as e:
547 logger.error(f"Error formatting archival memory results: {e}")
548 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name} ✓", "green")
549 elif tool_name == 'add_post_to_bluesky_reply_thread':
550 # Just show success for bluesky posts, the text was already shown in tool call
551 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green")
552 elif tool_name == 'archival_memory_insert':
553 # Skip archival memory insert results (always returns None)
554 pass
555 elif tool_name == 'update_block':
556 log_with_panel("Memory block updated", f"Tool result: {tool_name} ✓", "green")
557 else:
558 # Generic success with preview
559 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str
560 log_with_panel(preview, f"Tool result: {tool_name} ✓", "green")
561 else:
562 log_with_panel("Success", f"Tool result: {tool_name} ✓", "green")
563 elif status == 'error':
564 # Show error details
565 if tool_name == 'add_post_to_bluesky_reply_thread':
566 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred"
567 log_with_panel(error_str, f"Bluesky Post ✗", "red")
568 elif tool_name == 'archival_memory_insert':
569 # Skip archival memory insert errors too
570 pass
571 else:
572 error_preview = ""
573 if hasattr(chunk, 'tool_return') and chunk.tool_return:
574 error_str = str(chunk.tool_return)
575 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str
576 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name} ✗", "red")
577 else:
578 log_with_panel("Error occurred", f"Tool result: {tool_name} ✗", "red")
579 else:
580 logger.info(f"Tool result: {tool_name} - {status}")
581 elif chunk.message_type == 'assistant_message':
582 # Format with Unicode characters
583 print("\n▶ Assistant Response")
584 print(" ──────────────────")
585 # Indent response text
586 for line in chunk.content.split('\n'):
587 print(f" {line}")
588 else:
589 # Filter out verbose message types
590 if chunk.message_type not in ['usage_statistics', 'stop_reason']:
591 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...")
592 else:
593 logger.info(f"📦 Stream status: {chunk}")
594
595 # Log full chunk for debugging
596 logger.debug(f"Full streaming chunk: {chunk}")
597 all_messages.append(chunk)
598 if str(chunk) == 'done':
599 break
600
601 # Convert streaming response to standard format for compatibility
602 message_response = type('StreamingResponse', (), {
603 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
604 })()
605 except Exception as api_error:
606 import traceback
607 error_str = str(api_error)
608 logger.error(f"Letta API error: {api_error}")
609 logger.error(f"Error type: {type(api_error).__name__}")
610 logger.error(f"Full traceback:\n{traceback.format_exc()}")
611 logger.error(f"Mention text was: {mention_text}")
612 logger.error(f"Author: @{author_handle}")
613 logger.error(f"URI: {uri}")
614
615
616 # Try to extract more info from different error types
617 if hasattr(api_error, 'response'):
618 logger.error(f"Error response object exists")
619 if hasattr(api_error.response, 'text'):
620 logger.error(f"Response text: {api_error.response.text}")
621 if hasattr(api_error.response, 'json') and callable(api_error.response.json):
622 try:
623 logger.error(f"Response JSON: {api_error.response.json()}")
624 except:
625 pass
626
627 # Check for specific error types
628 if hasattr(api_error, 'status_code'):
629 logger.error(f"API Status code: {api_error.status_code}")
630 if hasattr(api_error, 'body'):
631 logger.error(f"API Response body: {api_error.body}")
632 if hasattr(api_error, 'headers'):
633 logger.error(f"API Response headers: {api_error.headers}")
634
635 if api_error.status_code == 413:
636 logger.error("413 Payload Too Large - moving to errors directory")
637 return None # Move to errors directory - payload is too large to ever succeed
638 elif api_error.status_code == 524:
639 logger.error("524 error - timeout from Cloudflare, will retry later")
640 return False # Keep in queue for retry
641
642 # Check if error indicates we should remove from queue
643 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str:
644 logger.warning("Payload too large error, moving to errors directory")
645 return None # Move to errors directory - cannot be fixed by retry
646 elif 'status_code: 524' in error_str:
647 logger.warning("524 timeout error, keeping in queue for retry")
648 return False # Keep in queue for retry
649
650 raise
651
652 # Log successful response
653 logger.debug("Successfully received response from Letta API")
654 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}")
655
656 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response
657 reply_candidates = []
658 tool_call_results = {} # Map tool_call_id to status
659 ack_note = None # Track any note from annotate_ack tool
660
661 logger.debug(f"Processing {len(message_response.messages)} response messages...")
662
663 # First pass: collect tool return statuses
664 ignored_notification = False
665 ignore_reason = ""
666 ignore_category = ""
667
668 for message in message_response.messages:
669 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
670 if message.name == 'add_post_to_bluesky_reply_thread':
671 tool_call_results[message.tool_call_id] = message.status
672 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}")
673 elif message.name == 'ignore_notification':
674 # Check if the tool was successful
675 if hasattr(message, 'tool_return') and message.status == 'success':
676 # Parse the return value to extract category and reason
677 result_str = str(message.tool_return)
678 if 'IGNORED_NOTIFICATION::' in result_str:
679 parts = result_str.split('::')
680 if len(parts) >= 3:
681 ignore_category = parts[1]
682 ignore_reason = parts[2]
683 ignored_notification = True
684 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}")
685 elif message.name == 'bluesky_reply':
686 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
687 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
688 logger.error("Update the agent's tools using register_tools.py")
689 # Export agent state before terminating
690 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
691 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
692 exit(1)
693
694 # Second pass: process messages and check for successful tool calls
695 for i, message in enumerate(message_response.messages, 1):
696 # Log concise message info instead of full object
697 msg_type = getattr(message, 'message_type', 'unknown')
698 if hasattr(message, 'reasoning') and message.reasoning:
699 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...")
700 elif hasattr(message, 'tool_call') and message.tool_call:
701 tool_name = message.tool_call.name
702 logger.debug(f" {i}. {msg_type}: {tool_name}")
703 elif hasattr(message, 'tool_return'):
704 tool_name = getattr(message, 'name', 'unknown_tool')
705 return_preview = str(message.tool_return)[:100] if message.tool_return else "None"
706 status = getattr(message, 'status', 'unknown')
707 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})")
708 elif hasattr(message, 'text'):
709 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...")
710 else:
711 logger.debug(f" {i}. {msg_type}: <no content>")
712
713 # Check for halt_activity tool call
714 if hasattr(message, 'tool_call') and message.tool_call:
715 if message.tool_call.name == 'halt_activity':
716 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT")
717 try:
718 args = json.loads(message.tool_call.arguments)
719 reason = args.get('reason', 'Agent requested halt')
720 logger.info(f"Halt reason: {reason}")
721 except:
722 logger.info("Halt reason: <unable to parse>")
723
724 # Delete the queue file before terminating
725 if queue_filepath and queue_filepath.exists():
726 queue_filepath.unlink()
727 logger.info(f"Deleted queue file: {queue_filepath.name}")
728
729 # Also mark as processed to avoid reprocessing
730 if NOTIFICATION_DB:
731 NOTIFICATION_DB.mark_processed(notification_data.get('uri', ''), status='processed')
732 else:
733 processed_uris = load_processed_notifications()
734 processed_uris.add(notification_data.get('uri', ''))
735 save_processed_notifications(processed_uris)
736
737 # Export agent state before terminating
738 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
739
740 # Exit the program
741 logger.info("=== BOT TERMINATED BY AGENT ===")
742 exit(0)
743
744 # Check for deprecated bluesky_reply tool
745 if hasattr(message, 'tool_call') and message.tool_call:
746 if message.tool_call.name == 'bluesky_reply':
747 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
748 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
749 logger.error("Update the agent's tools using register_tools.py")
750 # Export agent state before terminating
751 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
752 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
753 exit(1)
754
755 # Collect annotate_ack tool calls
756 elif message.tool_call.name == 'annotate_ack':
757 try:
758 args = json.loads(message.tool_call.arguments)
759 note = args.get('note', '')
760 if note:
761 ack_note = note
762 logger.debug(f"Found annotate_ack with note: {note[:50]}...")
763 except json.JSONDecodeError as e:
764 logger.error(f"Failed to parse annotate_ack arguments: {e}")
765
766 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful
767 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread':
768 tool_call_id = message.tool_call.tool_call_id
769 tool_status = tool_call_results.get(tool_call_id, 'unknown')
770
771 if tool_status == 'success':
772 try:
773 args = json.loads(message.tool_call.arguments)
774 reply_text = args.get('text', '')
775 reply_lang = args.get('lang', 'en-US')
776
777 if reply_text: # Only add if there's actual content
778 reply_candidates.append((reply_text, reply_lang))
779 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})")
780 except json.JSONDecodeError as e:
781 logger.error(f"Failed to parse tool call arguments: {e}")
782 elif tool_status == 'error':
783 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)")
784 else:
785 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}")
786
787 # Check for conflicting tool calls
788 if reply_candidates and ignored_notification:
789 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!")
790 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}")
791 logger.warning("Item will be left in queue for manual review")
792 # Return False to keep in queue
793 return False
794
795 if reply_candidates:
796 # Aggregate reply posts into a thread
797 reply_messages = []
798 reply_langs = []
799 for text, lang in reply_candidates:
800 reply_messages.append(text)
801 reply_langs.append(lang)
802
803 # Use the first language for the entire thread (could be enhanced later)
804 reply_lang = reply_langs[0] if reply_langs else 'en-US'
805
806 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread")
807
808 # Display the generated reply thread
809 if len(reply_messages) == 1:
810 content = reply_messages[0]
811 title = f"Reply to @{author_handle}"
812 else:
813 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)])
814 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)"
815
816 # Format with Unicode characters
817 print(f"\n✎ {title}")
818 print(f" {'─' * len(title)}")
819 # Indent content lines
820 for line in content.split('\n'):
821 print(f" {line}")
822
823 # Send the reply(s) with language (unless in testing mode)
824 if testing_mode:
825 logger.info("TESTING MODE: Skipping actual Bluesky post")
826 response = True # Simulate success
827 else:
828 if len(reply_messages) == 1:
829 # Single reply - use existing function
830 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0])
831 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})")
832 response = bsky_utils.reply_to_notification(
833 client=atproto_client,
834 notification=notification_data,
835 reply_text=cleaned_text,
836 lang=reply_lang,
837 correlation_id=correlation_id
838 )
839 else:
840 # Multiple replies - use new threaded function
841 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages]
842 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})")
843 response = bsky_utils.reply_with_thread_to_notification(
844 client=atproto_client,
845 notification=notification_data,
846 reply_messages=cleaned_messages,
847 lang=reply_lang,
848 correlation_id=correlation_id
849 )
850
851 if response:
852 logger.info(f"[{correlation_id}] Successfully replied to @{author_handle}", extra={
853 'correlation_id': correlation_id,
854 'author_handle': author_handle,
855 'reply_count': len(reply_messages)
856 })
857
858 # Acknowledge the post we're replying to with stream.thought.ack
859 try:
860 post_uri = notification_data.get('uri')
861 post_cid = notification_data.get('cid')
862
863 if post_uri and post_cid:
864 ack_result = bsky_utils.acknowledge_post(
865 client=atproto_client,
866 post_uri=post_uri,
867 post_cid=post_cid,
868 note=ack_note
869 )
870 if ack_result:
871 if ack_note:
872 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")")
873 else:
874 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack")
875 else:
876 logger.warning(f"Failed to acknowledge post from @{author_handle}")
877 else:
878 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}")
879 except Exception as e:
880 logger.error(f"Error acknowledging post from @{author_handle}: {e}")
881 # Don't fail the entire operation if acknowledgment fails
882
883 return True
884 else:
885 logger.error(f"Failed to send reply to @{author_handle}")
886 return False
887 else:
888 # Check if notification was explicitly ignored
889 if ignored_notification:
890 logger.info(f"[{correlation_id}] Notification from @{author_handle} was explicitly ignored (category: {ignore_category})", extra={
891 'correlation_id': correlation_id,
892 'author_handle': author_handle,
893 'ignore_category': ignore_category
894 })
895 return "ignored"
896 else:
897 logger.warning(f"[{correlation_id}] No reply generated for mention from @{author_handle}, moving to no_reply folder", extra={
898 'correlation_id': correlation_id,
899 'author_handle': author_handle
900 })
901 return "no_reply"
902
903 except Exception as e:
904 logger.error(f"[{correlation_id}] Error processing mention: {e}", extra={
905 'correlation_id': correlation_id,
906 'error': str(e),
907 'error_type': type(e).__name__,
908 'author_handle': author_handle if 'author_handle' in locals() else 'unknown'
909 })
910 return False
911 finally:
912 # Detach user blocks after agent response (success or failure)
913 if 'attached_handles' in locals() and attached_handles:
914 try:
915 logger.info(f"Detaching user blocks for handles: {attached_handles}")
916 detach_result = detach_user_blocks(attached_handles, void_agent)
917 logger.debug(f"Detach result: {detach_result}")
918 except Exception as detach_error:
919 logger.warning(f"Failed to detach user blocks: {detach_error}")
920
921
922def notification_to_dict(notification):
923 """Convert a notification object to a dictionary for JSON serialization."""
924 return {
925 'uri': notification.uri,
926 'cid': notification.cid,
927 'reason': notification.reason,
928 'is_read': notification.is_read,
929 'indexed_at': notification.indexed_at,
930 'author': {
931 'handle': notification.author.handle,
932 'display_name': notification.author.display_name,
933 'did': notification.author.did
934 },
935 'record': {
936 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
937 }
938 }
939
940
941def load_processed_notifications():
942 """Load the set of processed notification URIs from database."""
943 global NOTIFICATION_DB
944 if NOTIFICATION_DB:
945 return NOTIFICATION_DB.get_processed_uris(limit=MAX_PROCESSED_NOTIFICATIONS)
946 return set()
947
948
949def save_processed_notifications(processed_set):
950 """Save the set of processed notification URIs to database."""
951 # This is now handled by marking individual notifications in the DB
952 # Keeping function for compatibility but it doesn't need to do anything
953 pass
954
955
956def save_notification_to_queue(notification, is_priority=None):
957 """Save a notification to the queue directory with priority-based filename."""
958 try:
959 global NOTIFICATION_DB
960
961 # Handle both notification objects and dicts
962 if isinstance(notification, dict):
963 notif_dict = notification
964 notification_uri = notification.get('uri')
965 else:
966 notif_dict = notification_to_dict(notification)
967 notification_uri = notification.uri
968
969 # Check if already processed (using database if available)
970 if NOTIFICATION_DB:
971 if NOTIFICATION_DB.is_processed(notification_uri):
972 logger.debug(f"Notification already processed (DB): {notification_uri}")
973 return False
974 # Add to database - if this fails, don't queue the notification
975 if not NOTIFICATION_DB.add_notification(notif_dict):
976 logger.warning(f"Failed to add notification to database, skipping: {notification_uri}")
977 return False
978 else:
979 # Fall back to old JSON method
980 processed_uris = load_processed_notifications()
981 if notification_uri in processed_uris:
982 logger.debug(f"Notification already processed: {notification_uri}")
983 return False
984
985 # Create JSON string
986 notif_json = json.dumps(notif_dict, sort_keys=True)
987
988 # Generate hash for filename (to avoid duplicates)
989 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
990
991 # Determine priority based on author handle or explicit priority
992 if is_priority is not None:
993 priority_prefix = "0_" if is_priority else "1_"
994 else:
995 if isinstance(notification, dict):
996 author_handle = notification.get('author', {}).get('handle', '')
997 else:
998 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else ''
999 # Prioritize cameron.pfiffer.org responses
1000 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_"
1001
1002 # Create filename with priority, timestamp and hash
1003 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
1004 reason = notif_dict.get('reason', 'unknown')
1005 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json"
1006 filepath = QUEUE_DIR / filename
1007
1008 # Check if this notification URI is already in the queue
1009 for existing_file in QUEUE_DIR.glob("*.json"):
1010 if existing_file.name == "processed_notifications.json":
1011 continue
1012 try:
1013 with open(existing_file, 'r') as f:
1014 existing_data = json.load(f)
1015 if existing_data.get('uri') == notification_uri:
1016 logger.debug(f"Notification already queued (URI: {notification_uri})")
1017 return False
1018 except:
1019 continue
1020
1021 # Write to file
1022 with open(filepath, 'w') as f:
1023 json.dump(notif_dict, f, indent=2)
1024
1025 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal"
1026 logger.info(f"Queued notification ({priority_label}): {filename}")
1027 return True
1028
1029 except Exception as e:
1030 logger.error(f"Error saving notification to queue: {e}")
1031 return False
1032
1033
1034def load_and_process_queued_notifications(void_agent, atproto_client, testing_mode=False):
1035 """Load and process all notifications from the queue in priority order."""
1036 try:
1037 # Get all JSON files in queue directory (excluding processed_notifications.json)
1038 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix)
1039 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
1040
1041 # Filter out and delete like notifications immediately
1042 queue_files = []
1043 likes_deleted = 0
1044
1045 for filepath in all_queue_files:
1046 try:
1047 with open(filepath, 'r') as f:
1048 notif_data = json.load(f)
1049
1050 # If it's a like, delete it immediately and don't process
1051 if notif_data.get('reason') == 'like':
1052 filepath.unlink()
1053 likes_deleted += 1
1054 logger.debug(f"Deleted like notification: {filepath.name}")
1055 else:
1056 queue_files.append(filepath)
1057 except Exception as e:
1058 logger.warning(f"Error checking notification file {filepath.name}: {e}")
1059 queue_files.append(filepath) # Keep it in case it's valid
1060
1061 if likes_deleted > 0:
1062 logger.info(f"Deleted {likes_deleted} like notifications from queue")
1063
1064 if not queue_files:
1065 return
1066
1067 logger.info(f"Processing {len(queue_files)} queued notifications")
1068
1069 # Log current statistics
1070 elapsed_time = time.time() - start_time
1071 total_messages = sum(message_counters.values())
1072 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1073
1074 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min")
1075
1076 for i, filepath in enumerate(queue_files, 1):
1077 # Determine if this is a priority notification
1078 is_priority = filepath.name.startswith("0_")
1079
1080 # Check for new notifications periodically during queue processing
1081 # Also check immediately after processing each priority item
1082 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1)
1083
1084 # If we just processed a priority item, immediately check for new priority notifications
1085 if is_priority and i > 1:
1086 should_check_notifications = True
1087
1088 if should_check_notifications:
1089 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)")
1090 try:
1091 # Fetch and queue new notifications without processing them
1092 new_count = fetch_and_queue_new_notifications(atproto_client)
1093
1094 if new_count > 0:
1095 logger.info(f"Added {new_count} new notifications to queue")
1096 # Reload the queue files to include the new items
1097 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
1098 queue_files = updated_queue_files
1099 logger.info(f"Queue updated: now {len(queue_files)} total items")
1100 except Exception as e:
1101 logger.error(f"Error checking for new notifications: {e}")
1102
1103 priority_label = " [PRIORITY]" if is_priority else ""
1104 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}")
1105 try:
1106 # Load notification data
1107 with open(filepath, 'r') as f:
1108 notif_data = json.load(f)
1109
1110 # Process based on type using dict data directly
1111 success = False
1112 if notif_data['reason'] == "mention":
1113 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
1114 if success:
1115 message_counters['mentions'] += 1
1116 elif notif_data['reason'] == "reply":
1117 success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
1118 if success:
1119 message_counters['replies'] += 1
1120 elif notif_data['reason'] == "follow":
1121 author_handle = notif_data['author']['handle']
1122 author_display_name = notif_data['author'].get('display_name', 'no display name')
1123 follow_update = f"@{author_handle} ({author_display_name}) started following you."
1124 follow_message = f"Update: {follow_update}"
1125 logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars")
1126 CLIENT.agents.messages.create(
1127 agent_id = void_agent.id,
1128 messages = [{"role":"user", "content": follow_message}]
1129 )
1130 success = True # Follow updates are always successful
1131 if success:
1132 message_counters['follows'] += 1
1133 elif notif_data['reason'] == "repost":
1134 # Skip reposts silently
1135 success = True # Skip reposts but mark as successful to remove from queue
1136 if success:
1137 message_counters['reposts_skipped'] += 1
1138 elif notif_data['reason'] == "like":
1139 # Skip likes silently
1140 success = True # Skip likes but mark as successful to remove from queue
1141 if success:
1142 message_counters.setdefault('likes_skipped', 0)
1143 message_counters['likes_skipped'] += 1
1144 else:
1145 logger.warning(f"Unknown notification type: {notif_data['reason']}")
1146 success = True # Remove unknown types from queue
1147
1148 # Handle file based on processing result
1149 if success:
1150 if testing_mode:
1151 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}")
1152 else:
1153 filepath.unlink()
1154 logger.info(f"Successfully processed and removed: {filepath.name}")
1155
1156 # Mark as processed to avoid reprocessing
1157 if NOTIFICATION_DB:
1158 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='processed')
1159 else:
1160 processed_uris = load_processed_notifications()
1161 processed_uris.add(notif_data['uri'])
1162 save_processed_notifications(processed_uris)
1163
1164 elif success is None: # Special case for moving to error directory
1165 error_path = QUEUE_ERROR_DIR / filepath.name
1166 filepath.rename(error_path)
1167 logger.warning(f"Moved {filepath.name} to errors directory")
1168
1169 # Also mark as processed to avoid retrying
1170 if NOTIFICATION_DB:
1171 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1172 else:
1173 processed_uris = load_processed_notifications()
1174 processed_uris.add(notif_data['uri'])
1175 save_processed_notifications(processed_uris)
1176
1177 elif success == "no_reply": # Special case for moving to no_reply directory
1178 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name
1179 filepath.rename(no_reply_path)
1180 logger.info(f"Moved {filepath.name} to no_reply directory")
1181
1182 # Also mark as processed to avoid retrying
1183 if NOTIFICATION_DB:
1184 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1185 else:
1186 processed_uris = load_processed_notifications()
1187 processed_uris.add(notif_data['uri'])
1188 save_processed_notifications(processed_uris)
1189
1190 elif success == "ignored": # Special case for explicitly ignored notifications
1191 # For ignored notifications, we just delete them (not move to no_reply)
1192 filepath.unlink()
1193 logger.info(f"🚫 Deleted ignored notification: {filepath.name}")
1194
1195 # Also mark as processed to avoid retrying
1196 if NOTIFICATION_DB:
1197 NOTIFICATION_DB.mark_processed(notif_data['uri'], status='error')
1198 else:
1199 processed_uris = load_processed_notifications()
1200 processed_uris.add(notif_data['uri'])
1201 save_processed_notifications(processed_uris)
1202
1203 else:
1204 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry")
1205
1206 except Exception as e:
1207 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}")
1208 # Keep the file for retry later
1209
1210 except Exception as e:
1211 logger.error(f"Error loading queued notifications: {e}")
1212
1213
1214def fetch_and_queue_new_notifications(atproto_client):
1215 """Fetch new notifications and queue them without processing."""
1216 try:
1217 global NOTIFICATION_DB
1218
1219 # Get current time for marking notifications as seen
1220 logger.debug("Getting current time for notification marking...")
1221 last_seen_at = atproto_client.get_current_time_iso()
1222
1223 # Get timestamp of last processed notification for filtering
1224 last_processed_time = None
1225 if NOTIFICATION_DB:
1226 last_processed_time = NOTIFICATION_DB.get_latest_processed_time()
1227 if last_processed_time:
1228 logger.debug(f"Last processed notification was at: {last_processed_time}")
1229
1230 # Fetch ALL notifications using pagination
1231 all_notifications = []
1232 cursor = None
1233 page_count = 0
1234 max_pages = 20 # Safety limit to prevent infinite loops
1235
1236 while page_count < max_pages:
1237 try:
1238 # Fetch notifications page
1239 if cursor:
1240 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1241 params={'cursor': cursor, 'limit': 100}
1242 )
1243 else:
1244 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1245 params={'limit': 100}
1246 )
1247
1248 page_count += 1
1249 page_notifications = notifications_response.notifications
1250
1251 if not page_notifications:
1252 break
1253
1254 all_notifications.extend(page_notifications)
1255
1256 # Check if there are more pages
1257 cursor = getattr(notifications_response, 'cursor', None)
1258 if not cursor:
1259 break
1260
1261 except Exception as e:
1262 logger.error(f"Error fetching notifications page {page_count}: {e}")
1263 break
1264
1265 # Now process all fetched notifications
1266 new_count = 0
1267 if all_notifications:
1268 logger.info(f"📥 Fetched {len(all_notifications)} total notifications from API")
1269
1270 # Mark as seen first
1271 try:
1272 atproto_client.app.bsky.notification.update_seen(
1273 data={'seenAt': last_seen_at}
1274 )
1275 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}")
1276 except Exception as e:
1277 logger.error(f"Error marking notifications as seen: {e}")
1278
1279 # Debug counters
1280 skipped_read = 0
1281 skipped_likes = 0
1282 skipped_processed = 0
1283 skipped_old_timestamp = 0
1284 processed_uris = load_processed_notifications()
1285
1286 # Queue all new notifications (except likes)
1287 for notif in all_notifications:
1288 # Skip if older than last processed (when we have timestamp filtering)
1289 if last_processed_time and hasattr(notif, 'indexed_at'):
1290 if notif.indexed_at <= last_processed_time:
1291 skipped_old_timestamp += 1
1292 logger.debug(f"Skipping old notification (indexed_at {notif.indexed_at} <= {last_processed_time})")
1293 continue
1294
1295 # Debug: Log is_read status but DON'T skip based on it
1296 if hasattr(notif, 'is_read') and notif.is_read:
1297 skipped_read += 1
1298 logger.debug(f"Notification has is_read=True (but processing anyway): {notif.uri if hasattr(notif, 'uri') else 'unknown'}")
1299
1300 # Skip likes
1301 if hasattr(notif, 'reason') and notif.reason == 'like':
1302 skipped_likes += 1
1303 continue
1304
1305 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif
1306
1307 # Skip likes in dict form too
1308 if notif_dict.get('reason') == 'like':
1309 continue
1310
1311 # Check if already processed
1312 notif_uri = notif_dict.get('uri', '')
1313 if notif_uri in processed_uris:
1314 skipped_processed += 1
1315 logger.debug(f"Skipping already processed: {notif_uri}")
1316 continue
1317
1318 # Check if it's a priority notification
1319 is_priority = False
1320
1321 # Priority for cameron.pfiffer.org notifications
1322 author_handle = notif_dict.get('author', {}).get('handle', '')
1323 if author_handle == "cameron.pfiffer.org":
1324 is_priority = True
1325
1326 # Also check for priority keywords in mentions
1327 if notif_dict.get('reason') == 'mention':
1328 # Get the mention text to check for priority keywords
1329 record = notif_dict.get('record', {})
1330 text = record.get('text', '')
1331 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']):
1332 is_priority = True
1333
1334 if save_notification_to_queue(notif_dict, is_priority=is_priority):
1335 new_count += 1
1336 logger.debug(f"Queued notification from @{author_handle}: {notif_dict.get('reason', 'unknown')}")
1337
1338 # Log summary of filtering
1339 logger.info(f"📊 Notification processing summary:")
1340 logger.info(f" • Total fetched: {len(all_notifications)}")
1341 logger.info(f" • Had is_read=True: {skipped_read} (not skipped)")
1342 logger.info(f" • Skipped (likes): {skipped_likes}")
1343 logger.info(f" • Skipped (old timestamp): {skipped_old_timestamp}")
1344 logger.info(f" • Skipped (already processed): {skipped_processed}")
1345 logger.info(f" • Queued for processing: {new_count}")
1346 else:
1347 logger.debug("No new notifications to queue")
1348
1349 return new_count
1350
1351 except Exception as e:
1352 logger.error(f"Error fetching and queueing notifications: {e}")
1353 return 0
1354
1355
1356def process_notifications(void_agent, atproto_client, testing_mode=False):
1357 """Fetch new notifications, queue them, and process the queue."""
1358 try:
1359 # Fetch and queue new notifications
1360 new_count = fetch_and_queue_new_notifications(atproto_client)
1361
1362 if new_count > 0:
1363 logger.info(f"Found {new_count} new notifications to process")
1364
1365 # Now process the entire queue (old + new notifications)
1366 load_and_process_queued_notifications(void_agent, atproto_client, testing_mode)
1367
1368 except Exception as e:
1369 logger.error(f"Error processing notifications: {e}")
1370
1371
1372def send_synthesis_message(client: Letta, agent_id: str, atproto_client=None) -> None:
1373 """
1374 Send a synthesis message to the agent every 10 minutes.
1375 This prompts the agent to synthesize its recent experiences.
1376
1377 Args:
1378 client: Letta client
1379 agent_id: Agent ID to send synthesis to
1380 atproto_client: Optional AT Protocol client for posting synthesis results
1381 """
1382 # Track attached temporal blocks for cleanup
1383 attached_temporal_labels = []
1384
1385 try:
1386 logger.info("🧠 Preparing synthesis with temporal journal blocks")
1387
1388 # Attach temporal blocks before synthesis
1389 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id)
1390 if not success:
1391 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway")
1392
1393 # Create enhanced synthesis prompt
1394 today = date.today()
1395 synthesis_prompt = f"""Time for synthesis and reflection.
1396
1397You have access to temporal journal blocks for recording your thoughts and experiences:
1398- void_day_{today.strftime('%Y_%m_%d')}: Today's journal ({today.strftime('%B %d, %Y')})
1399- void_month_{today.strftime('%Y_%m')}: This month's journal ({today.strftime('%B %Y')})
1400- void_year_{today.year}: This year's journal ({today.year})
1401
1402These journal blocks are attached temporarily for this synthesis session. Use them to:
14031. Record significant interactions and insights from recent experiences
14042. Track patterns in conversations and user behaviors
14053. Note your evolving understanding of the digital social environment
14064. Reflect on your growth and changes in perspective
14075. Document memorable moments or interesting discoveries
1408
1409The journal entries should be cumulative - add to existing content rather than replacing it.
1410Consider both immediate experiences (daily) and longer-term patterns (monthly/yearly).
1411
1412After recording in your journals, synthesize your recent experiences into your core memory blocks
1413(zeitgeist, void-persona, void-humans) as you normally would.
1414
1415Begin your synthesis and journaling now."""
1416
1417 logger.info("🧠 Sending enhanced synthesis prompt to agent")
1418
1419 # Send synthesis message with streaming to show tool use
1420 message_stream = client.agents.messages.create_stream(
1421 agent_id=agent_id,
1422 messages=[{"role": "user", "content": synthesis_prompt}],
1423 stream_tokens=False,
1424 max_steps=100
1425 )
1426
1427 # Track synthesis content for potential posting
1428 synthesis_posts = []
1429 ack_note = None
1430
1431 # Process the streaming response
1432 for chunk in message_stream:
1433 if hasattr(chunk, 'message_type'):
1434 if chunk.message_type == 'reasoning_message':
1435 if SHOW_REASONING:
1436 print("\n◆ Reasoning")
1437 print(" ─────────")
1438 for line in chunk.reasoning.split('\n'):
1439 print(f" {line}")
1440
1441 # Create ATProto record for reasoning (if we have atproto client)
1442 if atproto_client and hasattr(chunk, 'reasoning'):
1443 try:
1444 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning)
1445 except Exception as e:
1446 logger.debug(f"Failed to create reasoning record during synthesis: {e}")
1447 elif chunk.message_type == 'tool_call_message':
1448 tool_name = chunk.tool_call.name
1449
1450 # Create ATProto record for tool call (if we have atproto client)
1451 if atproto_client:
1452 try:
1453 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None
1454 bsky_utils.create_tool_call_record(
1455 atproto_client,
1456 tool_name,
1457 chunk.tool_call.arguments,
1458 tool_call_id
1459 )
1460 except Exception as e:
1461 logger.debug(f"Failed to create tool call record during synthesis: {e}")
1462 try:
1463 args = json.loads(chunk.tool_call.arguments)
1464 if tool_name == 'archival_memory_search':
1465 query = args.get('query', 'unknown')
1466 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue")
1467 elif tool_name == 'archival_memory_insert':
1468 content = args.get('content', '')
1469 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue")
1470 elif tool_name == 'update_block':
1471 label = args.get('label', 'unknown')
1472 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', ''))
1473 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue")
1474 elif tool_name == 'annotate_ack':
1475 note = args.get('note', '')
1476 if note:
1477 ack_note = note
1478 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue")
1479 elif tool_name == 'add_post_to_bluesky_reply_thread':
1480 text = args.get('text', '')
1481 synthesis_posts.append(text)
1482 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue")
1483 else:
1484 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
1485 if len(args_str) > 150:
1486 args_str = args_str[:150] + "..."
1487 log_with_panel(args_str, f"Tool call: {tool_name}", "blue")
1488 except:
1489 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
1490 elif chunk.message_type == 'tool_return_message':
1491 if chunk.status == 'success':
1492 log_with_panel("Success", f"Tool result: {chunk.name} ✓", "green")
1493 else:
1494 log_with_panel("Error", f"Tool result: {chunk.name} ✗", "red")
1495 elif chunk.message_type == 'assistant_message':
1496 print("\n▶ Synthesis Response")
1497 print(" ──────────────────")
1498 for line in chunk.content.split('\n'):
1499 print(f" {line}")
1500
1501 if str(chunk) == 'done':
1502 break
1503
1504 logger.info("🧠 Synthesis message processed successfully")
1505
1506 # Handle synthesis acknowledgments if we have an atproto client
1507 if atproto_client and ack_note:
1508 try:
1509 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note)
1510 if result:
1511 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...")
1512 else:
1513 logger.warning("Failed to create synthesis acknowledgment")
1514 except Exception as e:
1515 logger.error(f"Error creating synthesis acknowledgment: {e}")
1516
1517 # Handle synthesis posts if any were generated
1518 if atproto_client and synthesis_posts:
1519 try:
1520 for post_text in synthesis_posts:
1521 cleaned_text = bsky_utils.remove_outside_quotes(post_text)
1522 response = bsky_utils.send_post(atproto_client, cleaned_text)
1523 if response:
1524 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...")
1525 else:
1526 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...")
1527 except Exception as e:
1528 logger.error(f"Error posting synthesis content: {e}")
1529
1530 except Exception as e:
1531 logger.error(f"Error sending synthesis message: {e}")
1532 finally:
1533 # Always detach temporal blocks after synthesis
1534 if attached_temporal_labels:
1535 logger.info("🧠 Detaching temporal journal blocks after synthesis")
1536 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels)
1537 if not detach_success:
1538 logger.warning("Some temporal blocks may not have been detached properly")
1539
1540
1541def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None:
1542 """
1543 Detach all user blocks from the agent to prevent memory bloat.
1544 This should be called periodically to ensure clean state.
1545 """
1546 try:
1547 # Get all blocks attached to the agent
1548 attached_blocks = client.agents.blocks.list(agent_id=agent_id)
1549
1550 user_blocks_to_detach = []
1551 for block in attached_blocks:
1552 if hasattr(block, 'label') and block.label.startswith('user_'):
1553 user_blocks_to_detach.append({
1554 'label': block.label,
1555 'id': block.id
1556 })
1557
1558 if not user_blocks_to_detach:
1559 logger.debug("No user blocks found to detach during periodic cleanup")
1560 return
1561
1562 # Detach each user block
1563 detached_count = 0
1564 for block_info in user_blocks_to_detach:
1565 try:
1566 client.agents.blocks.detach(
1567 agent_id=agent_id,
1568 block_id=str(block_info['id'])
1569 )
1570 detached_count += 1
1571 logger.debug(f"Detached user block: {block_info['label']}")
1572 except Exception as e:
1573 logger.warning(f"Failed to detach block {block_info['label']}: {e}")
1574
1575 if detached_count > 0:
1576 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks")
1577
1578 except Exception as e:
1579 logger.error(f"Error during periodic user block cleanup: {e}")
1580
1581
1582def attach_temporal_blocks(client: Letta, agent_id: str) -> tuple:
1583 """
1584 Attach temporal journal blocks (day, month, year) to the agent for synthesis.
1585 Creates blocks if they don't exist.
1586
1587 Returns:
1588 Tuple of (success: bool, attached_labels: list)
1589 """
1590 try:
1591 today = date.today()
1592
1593 # Generate temporal block labels
1594 day_label = f"void_day_{today.strftime('%Y_%m_%d')}"
1595 month_label = f"void_month_{today.strftime('%Y_%m')}"
1596 year_label = f"void_year_{today.year}"
1597
1598 temporal_labels = [day_label, month_label, year_label]
1599 attached_labels = []
1600
1601 # Get current blocks attached to agent
1602 current_blocks = client.agents.blocks.list(agent_id=agent_id)
1603 current_block_labels = {block.label for block in current_blocks}
1604 current_block_ids = {str(block.id) for block in current_blocks}
1605
1606 for label in temporal_labels:
1607 try:
1608 # Skip if already attached
1609 if label in current_block_labels:
1610 logger.debug(f"Temporal block already attached: {label}")
1611 attached_labels.append(label)
1612 continue
1613
1614 # Check if block exists globally
1615 blocks = client.blocks.list(label=label)
1616
1617 if blocks and len(blocks) > 0:
1618 block = blocks[0]
1619 # Check if already attached by ID
1620 if str(block.id) in current_block_ids:
1621 logger.debug(f"Temporal block already attached by ID: {label}")
1622 attached_labels.append(label)
1623 continue
1624 else:
1625 # Create new temporal block with appropriate header
1626 if "day" in label:
1627 header = f"# Daily Journal - {today.strftime('%B %d, %Y')}"
1628 initial_content = f"{header}\n\nNo entries yet for today."
1629 elif "month" in label:
1630 header = f"# Monthly Journal - {today.strftime('%B %Y')}"
1631 initial_content = f"{header}\n\nNo entries yet for this month."
1632 else: # year
1633 header = f"# Yearly Journal - {today.year}"
1634 initial_content = f"{header}\n\nNo entries yet for this year."
1635
1636 block = client.blocks.create(
1637 label=label,
1638 value=initial_content,
1639 limit=10000 # Larger limit for journal blocks
1640 )
1641 logger.info(f"Created new temporal block: {label}")
1642
1643 # Attach the block
1644 client.agents.blocks.attach(
1645 agent_id=agent_id,
1646 block_id=str(block.id)
1647 )
1648 attached_labels.append(label)
1649 logger.info(f"Attached temporal block: {label}")
1650
1651 except Exception as e:
1652 # Check for duplicate constraint errors
1653 error_str = str(e)
1654 if "duplicate key value violates unique constraint" in error_str:
1655 logger.debug(f"Temporal block already attached (constraint): {label}")
1656 attached_labels.append(label)
1657 else:
1658 logger.warning(f"Failed to attach temporal block {label}: {e}")
1659
1660 logger.info(f"Temporal blocks attached: {len(attached_labels)}/{len(temporal_labels)}")
1661 return True, attached_labels
1662
1663 except Exception as e:
1664 logger.error(f"Error attaching temporal blocks: {e}")
1665 return False, []
1666
1667
1668def detach_temporal_blocks(client: Letta, agent_id: str, labels_to_detach: list = None) -> bool:
1669 """
1670 Detach temporal journal blocks from the agent after synthesis.
1671
1672 Args:
1673 client: Letta client
1674 agent_id: Agent ID
1675 labels_to_detach: Optional list of specific labels to detach.
1676 If None, detaches all temporal blocks.
1677
1678 Returns:
1679 bool: Success status
1680 """
1681 try:
1682 # If no specific labels provided, generate today's labels
1683 if labels_to_detach is None:
1684 today = date.today()
1685 labels_to_detach = [
1686 f"void_day_{today.strftime('%Y_%m_%d')}",
1687 f"void_month_{today.strftime('%Y_%m')}",
1688 f"void_year_{today.year}"
1689 ]
1690
1691 # Get current blocks and build label to ID mapping
1692 current_blocks = client.agents.blocks.list(agent_id=agent_id)
1693 block_label_to_id = {block.label: str(block.id) for block in current_blocks}
1694
1695 detached_count = 0
1696 for label in labels_to_detach:
1697 if label in block_label_to_id:
1698 try:
1699 client.agents.blocks.detach(
1700 agent_id=agent_id,
1701 block_id=block_label_to_id[label]
1702 )
1703 detached_count += 1
1704 logger.debug(f"Detached temporal block: {label}")
1705 except Exception as e:
1706 logger.warning(f"Failed to detach temporal block {label}: {e}")
1707 else:
1708 logger.debug(f"Temporal block not attached: {label}")
1709
1710 logger.info(f"Detached {detached_count} temporal blocks")
1711 return True
1712
1713 except Exception as e:
1714 logger.error(f"Error detaching temporal blocks: {e}")
1715 return False
1716
1717
1718def main():
1719 # Parse command line arguments
1720 parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent')
1721 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)')
1722 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state')
1723 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)')
1724 # --rich option removed as we now use simple text formatting
1725 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO')
1726 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)')
1727 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)')
1728 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)')
1729 args = parser.parse_args()
1730
1731 # Configure logging based on command line arguments
1732 if args.simple_logs:
1733 log_format = "void - %(levelname)s - %(message)s"
1734 else:
1735 # Create custom formatter with symbols
1736 class SymbolFormatter(logging.Formatter):
1737 """Custom formatter that adds symbols for different log levels"""
1738
1739 SYMBOLS = {
1740 logging.DEBUG: '◇',
1741 logging.INFO: '✓',
1742 logging.WARNING: '⚠',
1743 logging.ERROR: '✗',
1744 logging.CRITICAL: '‼'
1745 }
1746
1747 def format(self, record):
1748 # Get the symbol for this log level
1749 symbol = self.SYMBOLS.get(record.levelno, '•')
1750
1751 # Format time as HH:MM:SS
1752 timestamp = self.formatTime(record, "%H:%M:%S")
1753
1754 # Build the formatted message
1755 level_name = f"{record.levelname:<5}" # Left-align, 5 chars
1756
1757 # Use vertical bar as separator
1758 parts = [symbol, timestamp, '│', level_name, '│', record.getMessage()]
1759
1760 return ' '.join(parts)
1761
1762 # Reset logging configuration
1763 for handler in logging.root.handlers[:]:
1764 logging.root.removeHandler(handler)
1765
1766 # Create handler with custom formatter
1767 handler = logging.StreamHandler()
1768 if not args.simple_logs:
1769 handler.setFormatter(SymbolFormatter())
1770 else:
1771 handler.setFormatter(logging.Formatter(log_format))
1772
1773 # Configure root logger
1774 logging.root.setLevel(logging.INFO)
1775 logging.root.addHandler(handler)
1776
1777 global logger, prompt_logger, console
1778 logger = logging.getLogger("void_bot")
1779 logger.setLevel(logging.INFO)
1780
1781 # Create a separate logger for prompts (set to WARNING to hide by default)
1782 prompt_logger = logging.getLogger("void_bot.prompts")
1783 if args.reasoning:
1784 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used
1785 else:
1786 prompt_logger.setLevel(logging.WARNING) # Hide by default
1787
1788 # Disable httpx logging completely
1789 logging.getLogger("httpx").setLevel(logging.CRITICAL)
1790
1791 # Create Rich console for pretty printing
1792 # Console no longer used - simple text formatting
1793
1794 global TESTING_MODE, SKIP_GIT, SHOW_REASONING
1795 TESTING_MODE = args.test
1796
1797 # Store no-git flag globally for use in export_agent_state calls
1798 SKIP_GIT = args.no_git
1799
1800 # Store rich flag globally
1801 # Rich formatting no longer used
1802
1803 # Store reasoning flag globally
1804 SHOW_REASONING = args.reasoning
1805
1806 if TESTING_MODE:
1807 logger.info("=== RUNNING IN TESTING MODE ===")
1808 logger.info(" - No messages will be sent to Bluesky")
1809 logger.info(" - Queue files will not be deleted")
1810 logger.info(" - Notifications will not be marked as seen")
1811 print("\n")
1812
1813 # Check for synthesis-only mode
1814 SYNTHESIS_ONLY = args.synthesis_only
1815 if SYNTHESIS_ONLY:
1816 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===")
1817 logger.info(" - Only synthesis messages will be sent")
1818 logger.info(" - No notification processing")
1819 logger.info(" - No Bluesky client needed")
1820 print("\n")
1821 """Main bot loop that continuously monitors for notifications."""
1822 global start_time
1823 start_time = time.time()
1824 logger.info("=== STARTING VOID BOT ===")
1825 void_agent = initialize_void()
1826 logger.info(f"Void agent initialized: {void_agent.id}")
1827
1828 # Initialize notification database
1829 global NOTIFICATION_DB
1830 logger.info("Initializing notification database...")
1831 NOTIFICATION_DB = NotificationDB()
1832
1833 # Migrate from old JSON format if it exists
1834 if PROCESSED_NOTIFICATIONS_FILE.exists():
1835 logger.info("Found old processed_notifications.json, migrating to database...")
1836 NOTIFICATION_DB.migrate_from_json(str(PROCESSED_NOTIFICATIONS_FILE))
1837
1838 # Log database stats
1839 db_stats = NOTIFICATION_DB.get_stats()
1840 logger.info(f"Database initialized - Total notifications: {db_stats.get('total', 0)}, Recent (24h): {db_stats.get('recent_24h', 0)}")
1841
1842 # Clean up old records
1843 NOTIFICATION_DB.cleanup_old_records(days=7)
1844
1845 # Ensure correct tools are attached for Bluesky
1846 logger.info("Configuring tools for Bluesky platform...")
1847 try:
1848 from tool_manager import ensure_platform_tools
1849 ensure_platform_tools('bluesky', void_agent.id)
1850 except Exception as e:
1851 logger.error(f"Failed to configure platform tools: {e}")
1852 logger.warning("Continuing with existing tool configuration")
1853
1854 # Check if agent has required tools
1855 if hasattr(void_agent, 'tools') and void_agent.tools:
1856 tool_names = [tool.name for tool in void_agent.tools]
1857 # Check for bluesky-related tools
1858 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()]
1859 if not bluesky_tools:
1860 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.")
1861 else:
1862 logger.warning("Agent has no tools registered!")
1863
1864 # Clean up all user blocks at startup
1865 logger.info("🧹 Cleaning up user blocks at startup...")
1866 periodic_user_block_cleanup(CLIENT, void_agent.id)
1867
1868 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts)
1869 if not SYNTHESIS_ONLY:
1870 atproto_client = bsky_utils.default_login()
1871 logger.info("Connected to Bluesky")
1872 else:
1873 # In synthesis-only mode, still connect for acks and posts (unless in test mode)
1874 if not args.test:
1875 atproto_client = bsky_utils.default_login()
1876 logger.info("Connected to Bluesky (for synthesis acks/posts)")
1877 else:
1878 atproto_client = None
1879 logger.info("Skipping Bluesky connection (test mode)")
1880
1881 # Configure intervals
1882 CLEANUP_INTERVAL = args.cleanup_interval
1883 SYNTHESIS_INTERVAL = args.synthesis_interval
1884
1885 # Synthesis-only mode
1886 if SYNTHESIS_ONLY:
1887 if SYNTHESIS_INTERVAL <= 0:
1888 logger.error("Synthesis-only mode requires --synthesis-interval > 0")
1889 return
1890
1891 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)")
1892
1893 while True:
1894 try:
1895 # Send synthesis message immediately on first run
1896 logger.info("🧠 Sending synthesis message")
1897 send_synthesis_message(CLIENT, void_agent.id, atproto_client)
1898
1899 # Wait for next interval
1900 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...")
1901 sleep(SYNTHESIS_INTERVAL)
1902
1903 except KeyboardInterrupt:
1904 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===")
1905 break
1906 except Exception as e:
1907 logger.error(f"Error in synthesis loop: {e}")
1908 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...")
1909 sleep(SYNTHESIS_INTERVAL)
1910
1911 # Normal mode with notification processing
1912 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds")
1913
1914 cycle_count = 0
1915
1916 if CLEANUP_INTERVAL > 0:
1917 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles")
1918 else:
1919 logger.info("User block cleanup disabled")
1920
1921 if SYNTHESIS_INTERVAL > 0:
1922 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)")
1923 else:
1924 logger.info("Synthesis messages disabled")
1925
1926 while True:
1927 try:
1928 cycle_count += 1
1929 process_notifications(void_agent, atproto_client, TESTING_MODE)
1930
1931 # Check if synthesis interval has passed
1932 if SYNTHESIS_INTERVAL > 0:
1933 current_time = time.time()
1934 global last_synthesis_time
1935 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL:
1936 logger.info(f"⏰ {SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis")
1937 send_synthesis_message(CLIENT, void_agent.id, atproto_client)
1938 last_synthesis_time = current_time
1939
1940 # Run periodic cleanup every N cycles
1941 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0:
1942 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})")
1943 periodic_user_block_cleanup(CLIENT, void_agent.id)
1944
1945 # Also check database health when doing cleanup
1946 if NOTIFICATION_DB:
1947 db_stats = NOTIFICATION_DB.get_stats()
1948 pending = db_stats.get('status_pending', 0)
1949 errors = db_stats.get('status_error', 0)
1950
1951 if pending > 50:
1952 logger.warning(f"⚠️ Queue health check: {pending} pending notifications (may be stuck)")
1953 if errors > 20:
1954 logger.warning(f"⚠️ Queue health check: {errors} error notifications")
1955
1956 # Periodic cleanup of old records
1957 if cycle_count % (CLEANUP_INTERVAL * 10) == 0: # Every 100 cycles
1958 logger.info("Running database cleanup of old records...")
1959 NOTIFICATION_DB.cleanup_old_records(days=7)
1960
1961 # Log cycle completion with stats
1962 elapsed_time = time.time() - start_time
1963 total_messages = sum(message_counters.values())
1964 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1965
1966 if total_messages > 0:
1967 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min")
1968 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
1969
1970 except KeyboardInterrupt:
1971 # Final stats
1972 elapsed_time = time.time() - start_time
1973 total_messages = sum(message_counters.values())
1974 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1975
1976 logger.info("=== BOT STOPPED BY USER ===")
1977 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes")
1978 logger.info(f" - {message_counters['mentions']} mentions")
1979 logger.info(f" - {message_counters['replies']} replies")
1980 logger.info(f" - {message_counters['follows']} follows")
1981 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped")
1982 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute")
1983
1984 # Close database connection
1985 if NOTIFICATION_DB:
1986 logger.info("Closing database connection...")
1987 NOTIFICATION_DB.close()
1988
1989 break
1990 except Exception as e:
1991 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")
1992 logger.error(f"Error details: {e}")
1993 # Wait a bit longer on errors
1994 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...")
1995 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
1996
1997
1998if __name__ == "__main__":
1999 main()