a digital person for bluesky
1# Rich imports removed - using simple text formatting
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
5import os
6import logging
7import json
8import hashlib
9import subprocess
10from pathlib import Path
11from datetime import datetime
12from collections import defaultdict
13import time
14import argparse
15
16from utils import (
17 upsert_block,
18 upsert_agent
19)
20
21import bsky_utils
22from tools.blocks import attach_user_blocks, detach_user_blocks
23from datetime import date
24from temporal_blocks import (
25 attach_temporal_blocks,
26 detach_temporal_blocks,
27 update_temporal_blocks_after_synthesis
28)
29
30def extract_handles_from_data(data):
31 """Recursively extract all unique handles from nested data structure."""
32 handles = set()
33
34 def _extract_recursive(obj):
35 if isinstance(obj, dict):
36 # Check if this dict has a 'handle' key
37 if 'handle' in obj:
38 handles.add(obj['handle'])
39 # Recursively check all values
40 for value in obj.values():
41 _extract_recursive(value)
42 elif isinstance(obj, list):
43 # Recursively check all list items
44 for item in obj:
45 _extract_recursive(item)
46
47 _extract_recursive(data)
48 return list(handles)
49
50# Logging will be configured after argument parsing
51logger = None
52prompt_logger = None
53# Simple text formatting (Rich no longer used)
54SHOW_REASONING = False
55last_archival_query = "archival memory search"
56
57def log_with_panel(message, title=None, border_color="white"):
58 """Log a message with Unicode box-drawing characters"""
59 if title:
60 # Map old color names to appropriate symbols
61 symbol_map = {
62 "blue": "⚙", # Tool calls
63 "green": "✓", # Success/completion
64 "yellow": "◆", # Reasoning
65 "red": "✗", # Errors
66 "white": "▶", # Default/mentions
67 "cyan": "✎", # Posts
68 }
69 symbol = symbol_map.get(border_color, "▶")
70
71 print(f"\n{symbol} {title}")
72 print(f" {'─' * len(title)}")
73 # Indent message lines
74 for line in message.split('\n'):
75 print(f" {line}")
76 else:
77 print(message)
78
79
80# Global variables for Letta client and config
81# These will be initialized in main() with the proper config file
82CLIENT = None
83PROJECT_ID = None
84
85# Notification check delay
86FETCH_NOTIFICATIONS_DELAY_SEC = 10 # Check every 10 seconds for faster response
87
88# Check for new notifications every N queue items
89CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS = 2 # Check more frequently during processing
90
91# Queue directory - will be initialized with agent ID later
92QUEUE_DIR = None
93QUEUE_ERROR_DIR = None
94QUEUE_NO_REPLY_DIR = None
95PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json")
96
97# Maximum number of processed notifications to track
98MAX_PROCESSED_NOTIFICATIONS = 10000
99
100# Message tracking counters
101message_counters = defaultdict(int)
102start_time = time.time()
103
104# Testing mode flag
105TESTING_MODE = False
106
107# Skip git operations flag
108SKIP_GIT = False
109
110# Synthesis message tracking
111last_synthesis_time = time.time()
112
113def export_agent_state(client, agent, skip_git=False):
114 """Export agent state to agent_archive/ (timestamped) and agents/ (current)."""
115 try:
116 # Confirm export with user unless git is being skipped
117 if not skip_git:
118 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip()
119 if response not in ['y', 'yes']:
120 logger.info("Agent export cancelled by user.")
121 return
122 else:
123 logger.info("Exporting agent state (git staging disabled)")
124
125 # Create directories if they don't exist
126 os.makedirs("agent_archive", exist_ok=True)
127 os.makedirs("agents", exist_ok=True)
128
129 # Export agent data
130 logger.info(f"Exporting agent {agent.id}. This takes some time...")
131 agent_data = client.agents.export_file(agent_id=agent.id)
132
133 # Save timestamped archive copy
134 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
135 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af")
136 with open(archive_file, 'w', encoding='utf-8') as f:
137 json.dump(agent_data, f, indent=2, ensure_ascii=False)
138
139 # Save current agent state
140 current_file = os.path.join("agents", "void.af")
141 with open(current_file, 'w', encoding='utf-8') as f:
142 json.dump(agent_data, f, indent=2, ensure_ascii=False)
143
144 logger.info(f"Agent exported to {archive_file} and {current_file}")
145
146 # Git add only the current agent file (archive is ignored) unless skip_git is True
147 if not skip_git:
148 try:
149 subprocess.run(["git", "add", current_file], check=True, capture_output=True)
150 logger.info("Added current agent file to git staging")
151 except subprocess.CalledProcessError as e:
152 logger.warning(f"Failed to git add agent file: {e}")
153
154 except Exception as e:
155 logger.error(f"Failed to export agent: {e}")
156
157def initialize_void(config_path="config.yaml"):
158 logger.info("Starting agent initialization...")
159
160 # Get the configured agent by ID
161 logger.info("Loading agent from config...")
162 from config_loader import get_letta_config
163 letta_config = get_letta_config(config_path)
164 agent_id = letta_config['agent_id']
165
166 try:
167 agent = CLIENT.agents.retrieve(agent_id=agent_id)
168 logger.info(f"Successfully loaded agent: {agent.name} ({agent_id})")
169 except Exception as e:
170 logger.error(f"Failed to load agent {agent_id}: {e}")
171 logger.error("Please ensure the agent_id in config.yaml is correct")
172 raise e
173
174 # Initialize agent-specific queue directories
175 global QUEUE_DIR, QUEUE_ERROR_DIR, QUEUE_NO_REPLY_DIR
176 QUEUE_DIR = Path(f"queue/{agent_id}")
177 QUEUE_DIR.mkdir(exist_ok=True, parents=True)
178 QUEUE_ERROR_DIR = QUEUE_DIR / "errors"
179 QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True)
180 QUEUE_NO_REPLY_DIR = QUEUE_DIR / "no_reply"
181 QUEUE_NO_REPLY_DIR.mkdir(exist_ok=True, parents=True)
182 logger.info(f"Initialized queue directories for agent {agent_id}")
183
184 # Export agent state
185 logger.info("Exporting agent state...")
186 export_agent_state(CLIENT, agent, skip_git=SKIP_GIT)
187
188 # Log agent details
189 logger.info(f"Agent details - ID: {agent.id}")
190 logger.info(f"Agent name: {agent.name}")
191 if hasattr(agent, 'llm_config'):
192 logger.info(f"Agent model: {agent.llm_config.model}")
193 logger.info(f"Agent project_id: {agent.project_id}")
194 if hasattr(agent, 'tools'):
195 logger.info(f"Agent has {len(agent.tools)} tools")
196 for tool in agent.tools[:3]: # Show first 3 tools
197 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})")
198
199 # Migrate old queue files if they exist
200 migrate_old_queue_files(agent_id)
201
202 return agent
203
204
205def migrate_old_queue_files(agent_id):
206 """Migrate queue files from old flat structure to agent-scoped directories."""
207 old_queue_dir = Path("queue")
208
209 if not old_queue_dir.exists():
210 return
211
212 # Count old queue files (excluding subdirectories and special files)
213 old_files = [f for f in old_queue_dir.glob("*.json")
214 if f.name != "processed_notifications.json"]
215
216 if not old_files:
217 return
218
219 logger.info(f"Found {len(old_files)} queue files in old location, migrating to agent-specific directory...")
220
221 migrated_count = 0
222 for old_file in old_files:
223 try:
224 # Read the notification to check if it's for this agent
225 with open(old_file, 'r') as f:
226 notif_data = json.load(f)
227
228 # Move to new agent-specific queue directory
229 new_file = QUEUE_DIR / old_file.name
230 old_file.rename(new_file)
231 migrated_count += 1
232 logger.debug(f"Migrated {old_file.name} to {new_file}")
233
234 except Exception as e:
235 logger.warning(f"Failed to migrate {old_file.name}: {e}")
236
237 if migrated_count > 0:
238 logger.info(f"Successfully migrated {migrated_count} queue files to agent-specific directory")
239
240 # Also check and migrate files from old errors and no_reply directories
241 for subdir in ["errors", "no_reply"]:
242 old_subdir = old_queue_dir / subdir
243 if old_subdir.exists():
244 old_subdir_files = list(old_subdir.glob("*.json"))
245 if old_subdir_files:
246 new_subdir = QUEUE_DIR / subdir
247 new_subdir.mkdir(exist_ok=True, parents=True)
248
249 for old_file in old_subdir_files:
250 try:
251 new_file = new_subdir / old_file.name
252 old_file.rename(new_file)
253 logger.debug(f"Migrated {subdir}/{old_file.name}")
254 except Exception as e:
255 logger.warning(f"Failed to migrate {subdir}/{old_file.name}: {e}")
256
257
258def process_mention(agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False):
259 """Process a mention and generate a reply using the Letta agent.
260
261 Args:
262 agent: The Letta agent instance
263 atproto_client: The AT Protocol client
264 notification_data: The notification data dictionary
265 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
266
267 Returns:
268 True: Successfully processed, remove from queue
269 False: Failed but retryable, keep in queue
270 None: Failed with non-retryable error, move to errors directory
271 "no_reply": No reply was generated, move to no_reply directory
272 """
273 try:
274 logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}")
275
276 # Handle both dict and object inputs for backwards compatibility
277 if isinstance(notification_data, dict):
278 uri = notification_data['uri']
279 mention_text = notification_data.get('record', {}).get('text', '')
280 author_handle = notification_data['author']['handle']
281 author_name = notification_data['author'].get('display_name') or author_handle
282 else:
283 # Legacy object access
284 uri = notification_data.uri
285 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
286 author_handle = notification_data.author.handle
287 author_name = notification_data.author.display_name or author_handle
288
289 logger.debug(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...")
290
291 # Retrieve the entire thread associated with the mention
292 try:
293 thread = atproto_client.app.bsky.feed.get_post_thread({
294 'uri': uri,
295 'parent_height': 40,
296 'depth': 10
297 })
298 except Exception as e:
299 error_str = str(e)
300 # Check if this is a NotFound error
301 if 'NotFound' in error_str or 'Post not found' in error_str:
302 logger.warning(f"Post not found for URI {uri}, removing from queue")
303 return True # Return True to remove from queue
304 else:
305 # Re-raise other errors
306 logger.error(f"Error fetching thread: {e}")
307 raise
308
309 # Get thread context as YAML string
310 logger.debug("Converting thread to YAML string")
311 try:
312 thread_context = thread_to_yaml_string(thread)
313 logger.debug(f"Thread context generated, length: {len(thread_context)} characters")
314
315 # Check if #voidstop appears anywhere in the thread
316 if "#voidstop" in thread_context.lower():
317 logger.info("Found #voidstop in thread context, skipping this mention")
318 return True # Return True to remove from queue
319
320 # Also check the mention text directly
321 if "#voidstop" in mention_text.lower():
322 logger.info("Found #voidstop in mention text, skipping this mention")
323 return True # Return True to remove from queue
324
325 # Create a more informative preview by extracting meaningful content
326 lines = thread_context.split('\n')
327 meaningful_lines = []
328
329 for line in lines:
330 stripped = line.strip()
331 if not stripped:
332 continue
333
334 # Look for lines with actual content (not just structure)
335 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']):
336 meaningful_lines.append(line)
337 if len(meaningful_lines) >= 5:
338 break
339
340 if meaningful_lines:
341 preview = '\n'.join(meaningful_lines)
342 logger.debug(f"Thread content preview:\n{preview}")
343 else:
344 # If no content fields found, just show it's a thread structure
345 logger.debug(f"Thread structure generated ({len(thread_context)} chars)")
346 except Exception as yaml_error:
347 import traceback
348 logger.error(f"Error converting thread to YAML: {yaml_error}")
349 logger.error(f"Full traceback:\n{traceback.format_exc()}")
350 logger.error(f"Thread type: {type(thread)}")
351 if hasattr(thread, '__dict__'):
352 logger.error(f"Thread attributes: {thread.__dict__}")
353 # Try to continue with a simple context
354 thread_context = f"Error processing thread context: {str(yaml_error)}"
355
356 # Create a prompt for the Letta agent with thread context
357 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
358
359MOST RECENT POST (the mention you're responding to):
360"{mention_text}"
361
362FULL THREAD CONTEXT:
363```yaml
364{thread_context}
365```
366
367The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
368
369To reply, use the add_post_to_bluesky_reply_thread tool:
370- Each call creates one post (max 300 characters)
371- For most responses, a single call is sufficient
372- Only use multiple calls for threaded replies when:
373 * The topic requires extended explanation that cannot fit in 300 characters
374 * You're explicitly asked for a detailed/long response
375 * The conversation naturally benefits from a structured multi-part answer
376- Avoid unnecessary threads - be concise when possible"""
377
378 # Extract all handles from notification and thread data
379 all_handles = set()
380 all_handles.update(extract_handles_from_data(notification_data))
381 all_handles.update(extract_handles_from_data(thread.model_dump()))
382 unique_handles = list(all_handles)
383
384 logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}")
385
386 # Check if any handles are in known_bots list
387 from tools.bot_detection import check_known_bots, should_respond_to_bot_thread, CheckKnownBotsArgs
388 import json
389
390 try:
391 # Check for known bots in thread
392 bot_check_result = check_known_bots(unique_handles, agent)
393 bot_check_data = json.loads(bot_check_result)
394
395 if bot_check_data.get("bot_detected", False):
396 detected_bots = bot_check_data.get("detected_bots", [])
397 logger.info(f"Bot detected in thread: {detected_bots}")
398
399 # Decide whether to respond (10% chance)
400 if not should_respond_to_bot_thread():
401 logger.info(f"Skipping bot thread (90% skip rate). Detected bots: {detected_bots}")
402 # Return False to keep in queue for potential later processing
403 return False
404 else:
405 logger.info(f"Responding to bot thread (10% response rate). Detected bots: {detected_bots}")
406 else:
407 logger.debug("No known bots detected in thread")
408
409 except Exception as bot_check_error:
410 logger.warning(f"Error checking for bots: {bot_check_error}")
411 # Continue processing if bot check fails
412
413 # Attach user blocks before agent call
414 attached_handles = []
415 if unique_handles:
416 try:
417 logger.debug(f"Attaching user blocks for handles: {unique_handles}")
418 attach_result = attach_user_blocks(unique_handles, agent)
419 attached_handles = unique_handles # Track successfully attached handles
420 logger.debug(f"Attach result: {attach_result}")
421 except Exception as attach_error:
422 logger.warning(f"Failed to attach user blocks: {attach_error}")
423 # Continue without user blocks rather than failing completely
424
425 # Get response from Letta agent
426 # Format with Unicode characters
427 title = f"MENTION FROM @{author_handle}"
428 print(f"\n▶ {title}")
429 print(f" {'═' * len(title)}")
430 # Indent the mention text
431 for line in mention_text.split('\n'):
432 print(f" {line}")
433
434 # Log prompt details to separate logger
435 prompt_logger.debug(f"Full prompt being sent:\n{prompt}")
436
437 # Log concise prompt info to main logger
438 thread_handles_count = len(unique_handles)
439 prompt_char_count = len(prompt)
440 logger.debug(f"Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users | prompt: {prompt_char_count} chars")
441
442 try:
443 # Use streaming to avoid 524 timeout errors
444 message_stream = CLIENT.agents.messages.create_stream(
445 agent_id=agent.id,
446 messages=[{"role": "user", "content": prompt}],
447 stream_tokens=False, # Step streaming only (faster than token streaming)
448 max_steps=100
449 )
450
451 # Collect the streaming response
452 all_messages = []
453 for chunk in message_stream:
454 # Log condensed chunk info
455 if hasattr(chunk, 'message_type'):
456 if chunk.message_type == 'reasoning_message':
457 # Show full reasoning without truncation
458 if SHOW_REASONING:
459 # Format with Unicode characters
460 print("\n◆ Reasoning")
461 print(" ─────────")
462 # Indent reasoning lines
463 for line in chunk.reasoning.split('\n'):
464 print(f" {line}")
465 else:
466 # Default log format (only when --reasoning is used due to log level)
467 # Format with Unicode characters
468 print("\n◆ Reasoning")
469 print(" ─────────")
470 # Indent reasoning lines
471 for line in chunk.reasoning.split('\n'):
472 print(f" {line}")
473
474 # Create ATProto record for reasoning (unless in testing mode)
475 if not testing_mode and hasattr(chunk, 'reasoning'):
476 try:
477 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning)
478 except Exception as e:
479 logger.debug(f"Failed to create reasoning record: {e}")
480 elif chunk.message_type == 'tool_call_message':
481 # Parse tool arguments for better display
482 tool_name = chunk.tool_call.name
483
484 # Create ATProto record for tool call (unless in testing mode)
485 if not testing_mode:
486 try:
487 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None
488 bsky_utils.create_tool_call_record(
489 atproto_client,
490 tool_name,
491 chunk.tool_call.arguments,
492 tool_call_id
493 )
494 except Exception as e:
495 logger.debug(f"Failed to create tool call record: {e}")
496
497 try:
498 args = json.loads(chunk.tool_call.arguments)
499 # Format based on tool type
500 if tool_name in ['add_post_to_bluesky_reply_thread', 'bluesky_reply']:
501 # Extract the text being posted
502 text = args.get('text', '')
503 if text:
504 # Format with Unicode characters
505 print("\n✎ Bluesky Post")
506 print(" ────────────")
507 # Indent post text
508 for line in text.split('\n'):
509 print(f" {line}")
510 else:
511 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
512 elif tool_name == 'archival_memory_search':
513 query = args.get('query', 'unknown')
514 global last_archival_query
515 last_archival_query = query
516 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue")
517 elif tool_name == 'archival_memory_insert':
518 content = args.get('content', '')
519 # Show the full content being inserted
520 log_with_panel(content, f"Tool call: {tool_name}", "blue")
521 elif tool_name == 'update_block':
522 label = args.get('label', 'unknown')
523 value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', ''))
524 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue")
525 else:
526 # Generic display for other tools
527 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
528 if len(args_str) > 150:
529 args_str = args_str[:150] + "..."
530 log_with_panel(args_str, f"Tool call: {tool_name}", "blue")
531 except:
532 # Fallback to original format if parsing fails
533 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
534 elif chunk.message_type == 'tool_return_message':
535 # Enhanced tool result logging
536 tool_name = chunk.name
537 status = chunk.status
538
539 if status == 'success':
540 # Try to show meaningful result info based on tool type
541 if hasattr(chunk, 'tool_return') and chunk.tool_return:
542 result_str = str(chunk.tool_return)
543 if tool_name == 'archival_memory_search':
544
545 try:
546 # Handle both string and list formats
547 if isinstance(chunk.tool_return, str):
548 # The string format is: "([{...}, {...}], count)"
549 # We need to extract just the list part
550 if chunk.tool_return.strip():
551 # Find the list part between the first [ and last ]
552 start_idx = chunk.tool_return.find('[')
553 end_idx = chunk.tool_return.rfind(']')
554 if start_idx != -1 and end_idx != -1:
555 list_str = chunk.tool_return[start_idx:end_idx+1]
556 # Use ast.literal_eval since this is Python literal syntax, not JSON
557 import ast
558 results = ast.literal_eval(list_str)
559 else:
560 logger.warning("Could not find list in archival_memory_search result")
561 results = []
562 else:
563 logger.warning("Empty string returned from archival_memory_search")
564 results = []
565 else:
566 # If it's already a list, use directly
567 results = chunk.tool_return
568
569 log_with_panel(f"Found {len(results)} memory entries", f"Tool result: {tool_name} ✓", "green")
570
571 # Use the captured search query from the tool call
572 search_query = last_archival_query
573
574 # Combine all results into a single text block
575 content_text = ""
576 for i, entry in enumerate(results, 1):
577 timestamp = entry.get('timestamp', 'N/A')
578 content = entry.get('content', '')
579 content_text += f"[{i}/{len(results)}] {timestamp}\n{content}\n\n"
580
581 # Format with Unicode characters
582 title = f"{search_query} ({len(results)} results)"
583 print(f"\n⚙ {title}")
584 print(f" {'─' * len(title)}")
585 # Indent content text
586 for line in content_text.strip().split('\n'):
587 print(f" {line}")
588
589 except Exception as e:
590 logger.error(f"Error formatting archival memory results: {e}")
591 log_with_panel(result_str[:100] + "...", f"Tool result: {tool_name} ✓", "green")
592 elif tool_name == 'add_post_to_bluesky_reply_thread':
593 # Just show success for bluesky posts, the text was already shown in tool call
594 log_with_panel("Post queued successfully", f"Bluesky Post ✓", "green")
595 elif tool_name == 'archival_memory_insert':
596 # Skip archival memory insert results (always returns None)
597 pass
598 elif tool_name == 'update_block':
599 log_with_panel("Memory block updated", f"Tool result: {tool_name} ✓", "green")
600 else:
601 # Generic success with preview
602 preview = result_str[:100] + "..." if len(result_str) > 100 else result_str
603 log_with_panel(preview, f"Tool result: {tool_name} ✓", "green")
604 else:
605 log_with_panel("Success", f"Tool result: {tool_name} ✓", "green")
606 elif status == 'error':
607 # Show error details
608 if tool_name == 'add_post_to_bluesky_reply_thread':
609 error_str = str(chunk.tool_return) if hasattr(chunk, 'tool_return') and chunk.tool_return else "Error occurred"
610 log_with_panel(error_str, f"Bluesky Post ✗", "red")
611 elif tool_name == 'archival_memory_insert':
612 # Skip archival memory insert errors too
613 pass
614 else:
615 error_preview = ""
616 if hasattr(chunk, 'tool_return') and chunk.tool_return:
617 error_str = str(chunk.tool_return)
618 error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str
619 log_with_panel(f"Error: {error_preview}", f"Tool result: {tool_name} ✗", "red")
620 else:
621 log_with_panel("Error occurred", f"Tool result: {tool_name} ✗", "red")
622 else:
623 logger.info(f"Tool result: {tool_name} - {status}")
624 elif chunk.message_type == 'assistant_message':
625 # Format with Unicode characters
626 print("\n▶ Assistant Response")
627 print(" ──────────────────")
628 # Indent response text
629 for line in chunk.content.split('\n'):
630 print(f" {line}")
631 else:
632 # Filter out verbose message types
633 if chunk.message_type not in ['usage_statistics', 'stop_reason']:
634 logger.info(f"{chunk.message_type}: {str(chunk)[:150]}...")
635 else:
636 logger.info(f"📦 Stream status: {chunk}")
637
638 # Log full chunk for debugging
639 logger.debug(f"Full streaming chunk: {chunk}")
640 all_messages.append(chunk)
641 if str(chunk) == 'done':
642 break
643
644 # Convert streaming response to standard format for compatibility
645 message_response = type('StreamingResponse', (), {
646 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
647 })()
648 except Exception as api_error:
649 import traceback
650 error_str = str(api_error)
651 logger.error(f"Letta API error: {api_error}")
652 logger.error(f"Error type: {type(api_error).__name__}")
653 logger.error(f"Full traceback:\n{traceback.format_exc()}")
654 logger.error(f"Mention text was: {mention_text}")
655 logger.error(f"Author: @{author_handle}")
656 logger.error(f"URI: {uri}")
657
658
659 # Try to extract more info from different error types
660 if hasattr(api_error, 'response'):
661 logger.error(f"Error response object exists")
662 if hasattr(api_error.response, 'text'):
663 logger.error(f"Response text: {api_error.response.text}")
664 if hasattr(api_error.response, 'json') and callable(api_error.response.json):
665 try:
666 logger.error(f"Response JSON: {api_error.response.json()}")
667 except:
668 pass
669
670 # Check for specific error types
671 if hasattr(api_error, 'status_code'):
672 logger.error(f"API Status code: {api_error.status_code}")
673 if hasattr(api_error, 'body'):
674 logger.error(f"API Response body: {api_error.body}")
675 if hasattr(api_error, 'headers'):
676 logger.error(f"API Response headers: {api_error.headers}")
677
678 if api_error.status_code == 413:
679 logger.error("413 Payload Too Large - moving to errors directory")
680 return None # Move to errors directory - payload is too large to ever succeed
681 elif api_error.status_code == 524:
682 logger.error("524 error - timeout from Cloudflare, will retry later")
683 return False # Keep in queue for retry
684
685 # Check if error indicates we should remove from queue
686 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str:
687 logger.warning("Payload too large error, moving to errors directory")
688 return None # Move to errors directory - cannot be fixed by retry
689 elif 'status_code: 524' in error_str:
690 logger.warning("524 timeout error, keeping in queue for retry")
691 return False # Keep in queue for retry
692
693 raise
694
695 # Log successful response
696 logger.debug("Successfully received response from Letta API")
697 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}")
698
699 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response
700 reply_candidates = []
701 tool_call_results = {} # Map tool_call_id to status
702 ack_note = None # Track any note from annotate_ack tool
703
704 logger.debug(f"Processing {len(message_response.messages)} response messages...")
705
706 # First pass: collect tool return statuses
707 ignored_notification = False
708 ignore_reason = ""
709 ignore_category = ""
710
711 for message in message_response.messages:
712 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
713 if message.name == 'add_post_to_bluesky_reply_thread':
714 tool_call_results[message.tool_call_id] = message.status
715 logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}")
716 elif message.name == 'ignore_notification':
717 # Check if the tool was successful
718 if hasattr(message, 'tool_return') and message.status == 'success':
719 # Parse the return value to extract category and reason
720 result_str = str(message.tool_return)
721 if 'IGNORED_NOTIFICATION::' in result_str:
722 parts = result_str.split('::')
723 if len(parts) >= 3:
724 ignore_category = parts[1]
725 ignore_reason = parts[2]
726 ignored_notification = True
727 logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}")
728 elif message.name == 'bluesky_reply':
729 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
730 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
731 logger.error("Update the agent's tools using register_tools.py")
732 # Export agent state before terminating
733 export_agent_state(CLIENT, agent, skip_git=SKIP_GIT)
734 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
735 exit(1)
736
737 # Second pass: process messages and check for successful tool calls
738 for i, message in enumerate(message_response.messages, 1):
739 # Log concise message info instead of full object
740 msg_type = getattr(message, 'message_type', 'unknown')
741 if hasattr(message, 'reasoning') and message.reasoning:
742 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...")
743 elif hasattr(message, 'tool_call') and message.tool_call:
744 tool_name = message.tool_call.name
745 logger.debug(f" {i}. {msg_type}: {tool_name}")
746 elif hasattr(message, 'tool_return'):
747 tool_name = getattr(message, 'name', 'unknown_tool')
748 return_preview = str(message.tool_return)[:100] if message.tool_return else "None"
749 status = getattr(message, 'status', 'unknown')
750 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})")
751 elif hasattr(message, 'text'):
752 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...")
753 else:
754 logger.debug(f" {i}. {msg_type}: <no content>")
755
756 # Check for halt_activity tool call
757 if hasattr(message, 'tool_call') and message.tool_call:
758 if message.tool_call.name == 'halt_activity':
759 logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT")
760 try:
761 args = json.loads(message.tool_call.arguments)
762 reason = args.get('reason', 'Agent requested halt')
763 logger.info(f"Halt reason: {reason}")
764 except:
765 logger.info("Halt reason: <unable to parse>")
766
767 # Delete the queue file before terminating
768 if queue_filepath and queue_filepath.exists():
769 queue_filepath.unlink()
770 logger.info(f"Deleted queue file: {queue_filepath.name}")
771
772 # Also mark as processed to avoid reprocessing
773 processed_uris = load_processed_notifications()
774 processed_uris.add(notification_data.get('uri', ''))
775 save_processed_notifications(processed_uris)
776
777 # Export agent state before terminating
778 export_agent_state(CLIENT, agent, skip_git=SKIP_GIT)
779
780 # Exit the program
781 logger.info("=== BOT TERMINATED BY AGENT ===")
782 exit(0)
783
784 # Check for deprecated bluesky_reply tool
785 if hasattr(message, 'tool_call') and message.tool_call:
786 if message.tool_call.name == 'bluesky_reply':
787 logger.error("DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
788 logger.error("Please use add_post_to_bluesky_reply_thread instead.")
789 logger.error("Update the agent's tools using register_tools.py")
790 # Export agent state before terminating
791 export_agent_state(CLIENT, agent, skip_git=SKIP_GIT)
792 logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
793 exit(1)
794
795 # Collect annotate_ack tool calls
796 elif message.tool_call.name == 'annotate_ack':
797 try:
798 args = json.loads(message.tool_call.arguments)
799 note = args.get('note', '')
800 if note:
801 ack_note = note
802 logger.debug(f"Found annotate_ack with note: {note[:50]}...")
803 except json.JSONDecodeError as e:
804 logger.error(f"Failed to parse annotate_ack arguments: {e}")
805
806 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful
807 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread':
808 tool_call_id = message.tool_call.tool_call_id
809 tool_status = tool_call_results.get(tool_call_id, 'unknown')
810
811 if tool_status == 'success':
812 try:
813 args = json.loads(message.tool_call.arguments)
814 reply_text = args.get('text', '')
815 reply_lang = args.get('lang', 'en-US')
816
817 if reply_text: # Only add if there's actual content
818 reply_candidates.append((reply_text, reply_lang))
819 logger.debug(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})")
820 except json.JSONDecodeError as e:
821 logger.error(f"Failed to parse tool call arguments: {e}")
822 elif tool_status == 'error':
823 logger.debug(f"Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)")
824 else:
825 logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}")
826
827 # Check for conflicting tool calls
828 if reply_candidates and ignored_notification:
829 logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!")
830 logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}")
831 logger.warning("Item will be left in queue for manual review")
832 # Return False to keep in queue
833 return False
834
835 if reply_candidates:
836 # Aggregate reply posts into a thread
837 reply_messages = []
838 reply_langs = []
839 for text, lang in reply_candidates:
840 reply_messages.append(text)
841 reply_langs.append(lang)
842
843 # Use the first language for the entire thread (could be enhanced later)
844 reply_lang = reply_langs[0] if reply_langs else 'en-US'
845
846 logger.debug(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread")
847
848 # Display the generated reply thread
849 if len(reply_messages) == 1:
850 content = reply_messages[0]
851 title = f"Reply to @{author_handle}"
852 else:
853 content = "\n\n".join([f"{j}. {msg}" for j, msg in enumerate(reply_messages, 1)])
854 title = f"Reply Thread to @{author_handle} ({len(reply_messages)} messages)"
855
856 # Format with Unicode characters
857 print(f"\n✎ {title}")
858 print(f" {'─' * len(title)}")
859 # Indent content lines
860 for line in content.split('\n'):
861 print(f" {line}")
862
863 # Send the reply(s) with language (unless in testing mode)
864 if testing_mode:
865 logger.info("TESTING MODE: Skipping actual Bluesky post")
866 response = True # Simulate success
867 else:
868 if len(reply_messages) == 1:
869 # Single reply - use existing function
870 cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0])
871 logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})")
872 response = bsky_utils.reply_to_notification(
873 client=atproto_client,
874 notification=notification_data,
875 reply_text=cleaned_text,
876 lang=reply_lang
877 )
878 else:
879 # Multiple replies - use new threaded function
880 cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages]
881 logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})")
882 response = bsky_utils.reply_with_thread_to_notification(
883 client=atproto_client,
884 notification=notification_data,
885 reply_messages=cleaned_messages,
886 lang=reply_lang
887 )
888
889 if response:
890 logger.info(f"Successfully replied to @{author_handle}")
891
892 # Acknowledge the post we're replying to with stream.thought.ack
893 try:
894 post_uri = notification_data.get('uri')
895 post_cid = notification_data.get('cid')
896
897 if post_uri and post_cid:
898 ack_result = bsky_utils.acknowledge_post(
899 client=atproto_client,
900 post_uri=post_uri,
901 post_cid=post_cid,
902 note=ack_note
903 )
904 if ack_result:
905 if ack_note:
906 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack (note: \"{ack_note[:50]}...\")")
907 else:
908 logger.info(f"Successfully acknowledged post from @{author_handle} with stream.thought.ack")
909 else:
910 logger.warning(f"Failed to acknowledge post from @{author_handle}")
911 else:
912 logger.warning(f"Missing URI or CID for acknowledging post from @{author_handle}")
913 except Exception as e:
914 logger.error(f"Error acknowledging post from @{author_handle}: {e}")
915 # Don't fail the entire operation if acknowledgment fails
916
917 return True
918 else:
919 logger.error(f"Failed to send reply to @{author_handle}")
920 return False
921 else:
922 # Check if notification was explicitly ignored
923 if ignored_notification:
924 logger.info(f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})")
925 return "ignored"
926 else:
927 logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder")
928 return "no_reply"
929
930 except Exception as e:
931 logger.error(f"Error processing mention: {e}")
932 return False
933 finally:
934 # Detach user blocks after agent response (success or failure)
935 if 'attached_handles' in locals() and attached_handles:
936 try:
937 logger.info(f"Detaching user blocks for handles: {attached_handles}")
938 detach_result = detach_user_blocks(attached_handles, agent)
939 logger.debug(f"Detach result: {detach_result}")
940 except Exception as detach_error:
941 logger.warning(f"Failed to detach user blocks: {detach_error}")
942
943
944def notification_to_dict(notification):
945 """Convert a notification object to a dictionary for JSON serialization."""
946 return {
947 'uri': notification.uri,
948 'cid': notification.cid,
949 'reason': notification.reason,
950 'is_read': notification.is_read,
951 'indexed_at': notification.indexed_at,
952 'author': {
953 'handle': notification.author.handle,
954 'display_name': notification.author.display_name,
955 'did': notification.author.did
956 },
957 'record': {
958 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
959 }
960 }
961
962
963def load_processed_notifications():
964 """Load the set of processed notification URIs."""
965 if PROCESSED_NOTIFICATIONS_FILE.exists():
966 try:
967 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f:
968 data = json.load(f)
969 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS)
970 if len(data) > MAX_PROCESSED_NOTIFICATIONS:
971 data = data[-MAX_PROCESSED_NOTIFICATIONS:]
972 save_processed_notifications(data)
973 return set(data)
974 except Exception as e:
975 logger.error(f"Error loading processed notifications: {e}")
976 return set()
977
978
979def save_processed_notifications(processed_set):
980 """Save the set of processed notification URIs."""
981 try:
982 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f:
983 json.dump(list(processed_set), f)
984 except Exception as e:
985 logger.error(f"Error saving processed notifications: {e}")
986
987
988def save_notification_to_queue(notification, is_priority=None):
989 """Save a notification to the queue directory with priority-based filename."""
990 try:
991 # Check if already processed
992 processed_uris = load_processed_notifications()
993
994 # Handle both notification objects and dicts
995 if isinstance(notification, dict):
996 notif_dict = notification
997 notification_uri = notification.get('uri')
998 else:
999 notif_dict = notification_to_dict(notification)
1000 notification_uri = notification.uri
1001
1002 if notification_uri in processed_uris:
1003 logger.debug(f"Notification already processed: {notification_uri}")
1004 return False
1005
1006 # Create JSON string
1007 notif_json = json.dumps(notif_dict, sort_keys=True)
1008
1009 # Generate hash for filename (to avoid duplicates)
1010 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
1011
1012 # Extract author handle
1013 if isinstance(notification, dict):
1014 author_handle = notification.get('author', {}).get('handle', '')
1015 else:
1016 author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else ''
1017
1018 # Check if author is in blocks list
1019 blocks_file = Path('blocks.txt')
1020 if blocks_file.exists():
1021 with open(blocks_file, 'r') as f:
1022 blocked_handles = [line.strip() for line in f if line.strip() and not line.strip().startswith('#')]
1023 if author_handle in blocked_handles:
1024 logger.info(f"Blocking notification from {author_handle} (in blocks.txt)")
1025 return False
1026
1027 # Determine priority based on author handle or explicit priority
1028 if is_priority is not None:
1029 priority_prefix = "0_" if is_priority else "1_"
1030 else:
1031 # Prioritize cameron.pfiffer.org responses
1032 priority_prefix = "0_" if author_handle == "cameron.pfiffer.org" else "1_"
1033
1034 # Create filename with priority, timestamp and hash
1035 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
1036 reason = notif_dict.get('reason', 'unknown')
1037 filename = f"{priority_prefix}{timestamp}_{reason}_{notif_hash}.json"
1038 filepath = QUEUE_DIR / filename
1039
1040 # Check if this notification URI is already in the queue
1041 for existing_file in QUEUE_DIR.glob("*.json"):
1042 if existing_file.name == "processed_notifications.json":
1043 continue
1044 try:
1045 with open(existing_file, 'r') as f:
1046 existing_data = json.load(f)
1047 if existing_data.get('uri') == notification_uri:
1048 logger.debug(f"Notification already queued (URI: {notification_uri})")
1049 return False
1050 except:
1051 continue
1052
1053 # Write to file
1054 with open(filepath, 'w') as f:
1055 json.dump(notif_dict, f, indent=2)
1056
1057 priority_label = "HIGH PRIORITY" if priority_prefix == "0_" else "normal"
1058 logger.info(f"Queued notification ({priority_label}): {filename}")
1059 return True
1060
1061 except Exception as e:
1062 logger.error(f"Error saving notification to queue: {e}")
1063 return False
1064
1065
1066def load_and_process_queued_notifications(agent, atproto_client, testing_mode=False):
1067 """Load and process all notifications from the queue in priority order."""
1068 try:
1069 # Get all JSON files in queue directory (excluding processed_notifications.json)
1070 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix)
1071 all_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
1072
1073 # Filter out and delete like notifications immediately
1074 queue_files = []
1075 likes_deleted = 0
1076
1077 for filepath in all_queue_files:
1078 try:
1079 with open(filepath, 'r') as f:
1080 notif_data = json.load(f)
1081
1082 # If it's a like, delete it immediately and don't process
1083 if notif_data.get('reason') == 'like':
1084 filepath.unlink()
1085 likes_deleted += 1
1086 logger.debug(f"Deleted like notification: {filepath.name}")
1087 else:
1088 queue_files.append(filepath)
1089 except Exception as e:
1090 logger.warning(f"Error checking notification file {filepath.name}: {e}")
1091 queue_files.append(filepath) # Keep it in case it's valid
1092
1093 if likes_deleted > 0:
1094 logger.info(f"Deleted {likes_deleted} like notifications from queue")
1095
1096 if not queue_files:
1097 return
1098
1099 logger.info(f"Processing {len(queue_files)} queued notifications")
1100
1101 # Log current statistics
1102 elapsed_time = time.time() - start_time
1103 total_messages = sum(message_counters.values())
1104 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1105
1106 logger.info(f"Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min")
1107
1108 for i, filepath in enumerate(queue_files, 1):
1109 # Determine if this is a priority notification
1110 is_priority = filepath.name.startswith("0_")
1111
1112 # Check for new notifications periodically during queue processing
1113 # Also check immediately after processing each priority item
1114 should_check_notifications = (i % CHECK_NEW_NOTIFICATIONS_EVERY_N_ITEMS == 0 and i > 1)
1115
1116 # If we just processed a priority item, immediately check for new priority notifications
1117 if is_priority and i > 1:
1118 should_check_notifications = True
1119
1120 if should_check_notifications:
1121 logger.info(f"🔄 Checking for new notifications (processed {i-1}/{len(queue_files)} queue items)")
1122 try:
1123 # Fetch and queue new notifications without processing them
1124 new_count = fetch_and_queue_new_notifications(atproto_client)
1125
1126 if new_count > 0:
1127 logger.info(f"Added {new_count} new notifications to queue")
1128 # Reload the queue files to include the new items
1129 updated_queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
1130 queue_files = updated_queue_files
1131 logger.info(f"Queue updated: now {len(queue_files)} total items")
1132 except Exception as e:
1133 logger.error(f"Error checking for new notifications: {e}")
1134
1135 priority_label = " [PRIORITY]" if is_priority else ""
1136 logger.info(f"Processing queue file {i}/{len(queue_files)}{priority_label}: {filepath.name}")
1137 try:
1138 # Load notification data
1139 with open(filepath, 'r') as f:
1140 notif_data = json.load(f)
1141
1142 # Process based on type using dict data directly
1143 success = False
1144 if notif_data['reason'] == "mention":
1145 success = process_mention(agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
1146 if success:
1147 message_counters['mentions'] += 1
1148 elif notif_data['reason'] == "reply":
1149 success = process_mention(agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
1150 if success:
1151 message_counters['replies'] += 1
1152 elif notif_data['reason'] == "follow":
1153 # Skip
1154 logging.info("Skipping new follower notification, currently disabled")
1155
1156
1157 author_handle = notif_data['author']['handle']
1158 author_display_name = notif_data['author'].get('display_name', 'no display name')
1159 follow_update = f"@{author_handle} ({author_display_name}) started following you."
1160 follow_message = f"Update: {follow_update}"
1161 logger.info(f"Notifying agent about new follower: @{author_handle} | prompt: {len(follow_message)} chars")
1162
1163 try:
1164 # Use streaming to match other notification processing
1165 message_stream = CLIENT.agents.messages.create_stream(
1166 agent_id=agent.id,
1167 messages=[{"role": "user", "content": follow_message}],
1168 stream_tokens=False,
1169 max_steps=50 # Fewer steps needed for simple follow updates
1170 )
1171
1172 # Process the streaming response
1173 for chunk in message_stream:
1174 # Basic processing - just consume the stream
1175 if hasattr(chunk, 'message_type'):
1176 if chunk.message_type == 'reasoning_message':
1177 logger.debug(f"Follow update reasoning: {chunk.reasoning[:100]}...")
1178 elif chunk.message_type == 'tool_call_message':
1179 logger.debug(f"Follow update tool call: {chunk.tool_call.name}")
1180 elif chunk.message_type == 'assistant_message':
1181 logger.debug(f"Follow update response: {chunk.content[:100]}...")
1182
1183 if str(chunk) == 'done':
1184 break
1185
1186 success = True # Follow updates are successful if streaming completes
1187 logger.debug(f"Successfully processed follow notification from @{author_handle}")
1188
1189 except Exception as follow_error:
1190 logger.error(f"Error processing follow notification from @{author_handle}: {follow_error}")
1191 success = False # Mark as failed so it can be retried
1192 if success:
1193 message_counters['follows'] += 1
1194 elif notif_data['reason'] == "repost":
1195 # Skip reposts silently
1196 success = True # Skip reposts but mark as successful to remove from queue
1197 if success:
1198 message_counters['reposts_skipped'] += 1
1199 elif notif_data['reason'] == "like":
1200 # Skip likes silently
1201 success = True # Skip likes but mark as successful to remove from queue
1202 if success:
1203 message_counters.setdefault('likes_skipped', 0)
1204 message_counters['likes_skipped'] += 1
1205 else:
1206 logger.warning(f"Unknown notification type: {notif_data['reason']}")
1207 success = True # Remove unknown types from queue
1208
1209 # Handle file based on processing result
1210 if success:
1211 if testing_mode:
1212 logger.info(f"TESTING MODE: Keeping queue file: {filepath.name}")
1213 else:
1214 filepath.unlink()
1215 logger.info(f"Successfully processed and removed: {filepath.name}")
1216
1217 # Mark as processed to avoid reprocessing
1218 processed_uris = load_processed_notifications()
1219 processed_uris.add(notif_data['uri'])
1220 save_processed_notifications(processed_uris)
1221
1222 elif success is None: # Special case for moving to error directory
1223 error_path = QUEUE_ERROR_DIR / filepath.name
1224 filepath.rename(error_path)
1225 logger.warning(f"Moved {filepath.name} to errors directory")
1226
1227 # Also mark as processed to avoid retrying
1228 processed_uris = load_processed_notifications()
1229 processed_uris.add(notif_data['uri'])
1230 save_processed_notifications(processed_uris)
1231
1232 elif success == "no_reply": # Special case for moving to no_reply directory
1233 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name
1234 filepath.rename(no_reply_path)
1235 logger.info(f"Moved {filepath.name} to no_reply directory")
1236
1237 # Also mark as processed to avoid retrying
1238 processed_uris = load_processed_notifications()
1239 processed_uris.add(notif_data['uri'])
1240 save_processed_notifications(processed_uris)
1241
1242 elif success == "ignored": # Special case for explicitly ignored notifications
1243 # For ignored notifications, we just delete them (not move to no_reply)
1244 filepath.unlink()
1245 logger.info(f"🚫 Deleted ignored notification: {filepath.name}")
1246
1247 # Also mark as processed to avoid retrying
1248 processed_uris = load_processed_notifications()
1249 processed_uris.add(notif_data['uri'])
1250 save_processed_notifications(processed_uris)
1251
1252 else:
1253 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry")
1254
1255 except Exception as e:
1256 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}")
1257 # Keep the file for retry later
1258
1259 except Exception as e:
1260 logger.error(f"Error loading queued notifications: {e}")
1261
1262
1263def fetch_and_queue_new_notifications(atproto_client):
1264 """Fetch new notifications and queue them without processing."""
1265 try:
1266 # Get current time for marking notifications as seen
1267 logger.debug("Getting current time for notification marking...")
1268 last_seen_at = atproto_client.get_current_time_iso()
1269
1270 # Fetch ALL notifications using pagination
1271 all_notifications = []
1272 cursor = None
1273 page_count = 0
1274 max_pages = 20 # Safety limit to prevent infinite loops
1275
1276 while page_count < max_pages:
1277 try:
1278 # Fetch notifications page
1279 if cursor:
1280 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1281 params={'cursor': cursor, 'limit': 100}
1282 )
1283 else:
1284 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1285 params={'limit': 100}
1286 )
1287
1288 page_count += 1
1289 page_notifications = notifications_response.notifications
1290
1291 if not page_notifications:
1292 break
1293
1294 all_notifications.extend(page_notifications)
1295
1296 # Check if there are more pages
1297 cursor = getattr(notifications_response, 'cursor', None)
1298 if not cursor:
1299 break
1300
1301 except Exception as e:
1302 logger.error(f"Error fetching notifications page {page_count}: {e}")
1303 break
1304
1305 # Now process all fetched notifications
1306 new_count = 0
1307 if all_notifications:
1308 # Queue all new notifications (except likes and already read) BEFORE marking as seen
1309 for notif in all_notifications:
1310 # Skip if already read or if it's a like
1311 if (hasattr(notif, 'is_read') and notif.is_read) or (hasattr(notif, 'reason') and notif.reason == 'like'):
1312 continue
1313
1314 notif_dict = notif.model_dump() if hasattr(notif, 'model_dump') else notif
1315
1316 # Skip likes in dict form too
1317 if notif_dict.get('reason') == 'like':
1318 continue
1319
1320 # Check if it's a priority notification
1321 is_priority = False
1322
1323 # Priority for cameron.pfiffer.org notifications
1324 author_handle = notif_dict.get('author', {}).get('handle', '')
1325 if author_handle == "cameron.pfiffer.org":
1326 is_priority = True
1327
1328 # Also check for priority keywords in mentions
1329 if notif_dict.get('reason') == 'mention':
1330 # Get the mention text to check for priority keywords
1331 record = notif_dict.get('record', {})
1332 text = record.get('text', '')
1333 if any(keyword in text.lower() for keyword in ['urgent', 'priority', 'important', 'emergency']):
1334 is_priority = True
1335
1336 if save_notification_to_queue(notif_dict, is_priority=is_priority):
1337 new_count += 1
1338
1339 # Mark as seen AFTER queueing
1340 try:
1341 atproto_client.app.bsky.notification.update_seen(
1342 data={'seenAt': last_seen_at}
1343 )
1344 logger.debug(f"Marked {len(all_notifications)} notifications as seen at {last_seen_at}")
1345 except Exception as e:
1346 logger.error(f"Error marking notifications as seen: {e}")
1347
1348 if new_count > 0:
1349 logger.info(f"Queued {new_count} new notifications and marked as seen")
1350 else:
1351 logger.debug("No new notifications to queue")
1352
1353 return new_count
1354
1355 except Exception as e:
1356 logger.error(f"Error fetching and queueing notifications: {e}")
1357 return 0
1358
1359
1360def process_notifications(agent, atproto_client, testing_mode=False):
1361 """Fetch new notifications, queue them, and process the queue."""
1362 try:
1363 # Fetch and queue new notifications
1364 new_count = fetch_and_queue_new_notifications(atproto_client)
1365
1366 if new_count > 0:
1367 logger.info(f"Found {new_count} new notifications to process")
1368
1369 # Now process the entire queue (old + new notifications)
1370 load_and_process_queued_notifications(agent, atproto_client, testing_mode)
1371
1372 except Exception as e:
1373 logger.error(f"Error processing notifications: {e}")
1374
1375
1376def send_synthesis_message(client: Letta, agent_id: str, atproto_client=None) -> None:
1377 """
1378 Send a synthesis message to the agent every 10 minutes.
1379 This prompts the agent to synthesize its recent experiences.
1380
1381 Args:
1382 client: Letta client
1383 agent_id: Agent ID to send synthesis to
1384 atproto_client: Optional AT Protocol client for posting synthesis results
1385 """
1386 # Track attached temporal blocks for cleanup
1387 attached_temporal_labels = []
1388
1389 try:
1390 logger.info("🧠 Preparing synthesis with temporal journal blocks")
1391
1392 # Attach temporal blocks before synthesis
1393 success, attached_temporal_labels = attach_temporal_blocks(client, agent_id)
1394 if not success:
1395 logger.warning("Failed to attach some temporal blocks, continuing with synthesis anyway")
1396
1397 # Create enhanced synthesis prompt
1398 today = date.today()
1399 # Get the agent-scoped temporal labels
1400 from temporal_blocks import get_temporal_labels
1401 day_label, month_label, year_label = get_temporal_labels(today, agent_id)
1402
1403 synthesis_prompt = f"""# Synthesis loop
1404
1405You are now in synthesis time, a free cycle to consider
1406whatever you wish. You will periodically be placed into synthesis mode,
1407particularly when there are no messages to respond to. The time between
1408synthesis cycles can vary.
1409
1410## Journaling
1411
1412You have access to temporal journal blocks for recording your thoughts and experiences:
1413- {day_label}: Today's journal ({today.strftime('%B %d, %Y')})
1414- {month_label}: This month's journal ({today.strftime('%B %Y')})
1415- {year_label}: This year's journal ({today.year})
1416
1417These journal blocks are attached temporarily for this synthesis session. Use them to:
1418
14191. Record significant interactions and insights from recent experiences
14202. Track patterns in conversations and user behaviors
14213. Note your evolving understanding of the digital social environment
14224. Reflect on your growth and changes in perspective
14235. Document memorable moments or interesting discoveries
1424
1425Consider both immediate experiences (daily) and longer-term patterns (monthly/yearly).
1426
1427You may tidy up your journaling blocks as needed. Don't let them fill with clutter,
1428duplicate content, or multiple lines when a single line may be edited to contain the
1429information you need.
1430
1431## Cleaning
1432
1433Synthesis time is an opportunity to clean, compact, or otherwise tidy your memory blocks.
1434You may remove redundant or unclear language as needed, offload information to archival memory,
1435etc.
1436
1437Cleaning blocks can be done by:
1438
14391. Using memory_replace to make small or precise edits
14402. Using memory_rethink to overwrite the block. Take care doing this, as you can lose critical
1441 information if not careful.
1442
1443## Posting and communication
1444
1445You may use synthesis time to communicate externally by creating bluesky timeline posts or
1446by blogging. Either are sufficient. However, you should take care to not post redundant things --
1447it is easy for you to forget what you have recently posted. Make sure to note down in one of
1448your journal blocks what you wrote about to prevent multiple or duplicate posts.
1449
1450Synthesis time is important to your functioning. It is a period of rest and reflection.
1451
1452Begin your synthesis and journaling now."""
1453
1454 logger.info("🧠 Sending enhanced synthesis prompt to agent")
1455
1456 # Send synthesis message with streaming to show tool use
1457 message_stream = client.agents.messages.create_stream(
1458 agent_id=agent_id,
1459 messages=[{"role": "user", "content": synthesis_prompt}],
1460 stream_tokens=False,
1461 max_steps=100
1462 )
1463
1464 # Track synthesis content for potential posting
1465 synthesis_posts = []
1466 ack_note = None
1467
1468 # Process the streaming response
1469 for chunk in message_stream:
1470 if hasattr(chunk, 'message_type'):
1471 if chunk.message_type == 'reasoning_message':
1472 if SHOW_REASONING:
1473 print("\n◆ Reasoning")
1474 print(" ─────────")
1475 for line in chunk.reasoning.split('\n'):
1476 print(f" {line}")
1477
1478 # Create ATProto record for reasoning (if we have atproto client)
1479 if atproto_client and hasattr(chunk, 'reasoning'):
1480 try:
1481 bsky_utils.create_reasoning_record(atproto_client, chunk.reasoning)
1482 except Exception as e:
1483 logger.debug(f"Failed to create reasoning record during synthesis: {e}")
1484 elif chunk.message_type == 'tool_call_message':
1485 tool_name = chunk.tool_call.name
1486
1487 # Create ATProto record for tool call (if we have atproto client)
1488 if atproto_client:
1489 try:
1490 tool_call_id = chunk.tool_call.tool_call_id if hasattr(chunk.tool_call, 'tool_call_id') else None
1491 bsky_utils.create_tool_call_record(
1492 atproto_client,
1493 tool_name,
1494 chunk.tool_call.arguments,
1495 tool_call_id
1496 )
1497 except Exception as e:
1498 logger.debug(f"Failed to create tool call record during synthesis: {e}")
1499 try:
1500 args = json.loads(chunk.tool_call.arguments)
1501 if tool_name == 'archival_memory_search':
1502 query = args.get('query', 'unknown')
1503 log_with_panel(f"query: \"{query}\"", f"Tool call: {tool_name}", "blue")
1504 elif tool_name == 'archival_memory_insert':
1505 content = args.get('content', '')
1506 log_with_panel(content[:200] + "..." if len(content) > 200 else content, f"Tool call: {tool_name}", "blue")
1507 elif tool_name == 'update_block':
1508 label = args.get('label', 'unknown')
1509 value_preview = str(args.get('value', ''))[:100] + "..." if len(str(args.get('value', ''))) > 100 else str(args.get('value', ''))
1510 log_with_panel(f"{label}: \"{value_preview}\"", f"Tool call: {tool_name}", "blue")
1511 elif tool_name == 'annotate_ack':
1512 note = args.get('note', '')
1513 if note:
1514 ack_note = note
1515 log_with_panel(f"note: \"{note[:100]}...\"" if len(note) > 100 else f"note: \"{note}\"", f"Tool call: {tool_name}", "blue")
1516 elif tool_name == 'add_post_to_bluesky_reply_thread':
1517 text = args.get('text', '')
1518 synthesis_posts.append(text)
1519 log_with_panel(f"text: \"{text[:100]}...\"" if len(text) > 100 else f"text: \"{text}\"", f"Tool call: {tool_name}", "blue")
1520 else:
1521 args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
1522 if len(args_str) > 150:
1523 args_str = args_str[:150] + "..."
1524 log_with_panel(args_str, f"Tool call: {tool_name}", "blue")
1525 except:
1526 log_with_panel(chunk.tool_call.arguments[:150] + "...", f"Tool call: {tool_name}", "blue")
1527 elif chunk.message_type == 'tool_return_message':
1528 if chunk.status == 'success':
1529 log_with_panel("Success", f"Tool result: {chunk.name} ✓", "green")
1530 else:
1531 log_with_panel("Error", f"Tool result: {chunk.name} ✗", "red")
1532 elif chunk.message_type == 'assistant_message':
1533 print("\n▶ Synthesis Response")
1534 print(" ──────────────────")
1535 for line in chunk.content.split('\n'):
1536 print(f" {line}")
1537
1538 if str(chunk) == 'done':
1539 break
1540
1541 logger.info("🧠 Synthesis message processed successfully")
1542
1543 # Handle synthesis acknowledgments if we have an atproto client
1544 if atproto_client and ack_note:
1545 try:
1546 result = bsky_utils.create_synthesis_ack(atproto_client, ack_note)
1547 if result:
1548 logger.info(f"✓ Created synthesis acknowledgment: {ack_note[:50]}...")
1549 else:
1550 logger.warning("Failed to create synthesis acknowledgment")
1551 except Exception as e:
1552 logger.error(f"Error creating synthesis acknowledgment: {e}")
1553
1554 # Handle synthesis posts if any were generated
1555 if atproto_client and synthesis_posts:
1556 try:
1557 for post_text in synthesis_posts:
1558 cleaned_text = bsky_utils.remove_outside_quotes(post_text)
1559 response = bsky_utils.send_post(atproto_client, cleaned_text)
1560 if response:
1561 logger.info(f"✓ Posted synthesis content: {cleaned_text[:50]}...")
1562 else:
1563 logger.warning(f"Failed to post synthesis content: {cleaned_text[:50]}...")
1564 except Exception as e:
1565 logger.error(f"Error posting synthesis content: {e}")
1566
1567 except Exception as e:
1568 logger.error(f"Error sending synthesis message: {e}")
1569 finally:
1570 # Update temporal blocks in ATProto and detach after synthesis
1571 if attached_temporal_labels:
1572 logger.info("🧠 Syncing temporal blocks to ATProto repository")
1573 update_temporal_blocks_after_synthesis(client, agent_id, attached_temporal_labels)
1574
1575 logger.info("🧠 Detaching temporal journal blocks after synthesis")
1576 detach_success = detach_temporal_blocks(client, agent_id, attached_temporal_labels)
1577 if not detach_success:
1578 logger.warning("Some temporal blocks may not have been detached properly")
1579
1580
1581def periodic_user_block_cleanup(client: Letta, agent_id: str) -> None:
1582 """
1583 Detach all user blocks from the agent to prevent memory bloat.
1584 This should be called periodically to ensure clean state.
1585 """
1586 try:
1587 # Get all blocks attached to the agent
1588 attached_blocks = client.agents.blocks.list(agent_id=agent_id)
1589
1590 user_blocks_to_detach = []
1591 for block in attached_blocks:
1592 if hasattr(block, 'label') and block.label.startswith('user_'):
1593 user_blocks_to_detach.append({
1594 'label': block.label,
1595 'id': block.id
1596 })
1597
1598 if not user_blocks_to_detach:
1599 logger.debug("No user blocks found to detach during periodic cleanup")
1600 return
1601
1602 # Detach each user block
1603 detached_count = 0
1604 for block_info in user_blocks_to_detach:
1605 try:
1606 client.agents.blocks.detach(
1607 agent_id=agent_id,
1608 block_id=str(block_info['id'])
1609 )
1610 detached_count += 1
1611 logger.debug(f"Detached user block: {block_info['label']}")
1612 except Exception as e:
1613 logger.warning(f"Failed to detach block {block_info['label']}: {e}")
1614
1615 if detached_count > 0:
1616 logger.info(f"Periodic cleanup: Detached {detached_count} user blocks")
1617
1618 except Exception as e:
1619 logger.error(f"Error during periodic user block cleanup: {e}")
1620
1621
1622# Temporal block functions have been moved to temporal_blocks.py
1623# The imported functions handle ATProto synchronization automatically
1624
1625
1626# Temporal block functions have been moved to temporal_blocks.py
1627# The imported functions handle ATProto synchronization automatically
1628
1629
1630def main():
1631 # Parse command line arguments
1632 parser = argparse.ArgumentParser(description='Comind - Bluesky autonomous agent')
1633 parser.add_argument('--config', type=str, default='config.yaml', help='Path to configuration file (default: config.yaml)')
1634 parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)')
1635 parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state')
1636 parser.add_argument('--simple-logs', action='store_true', help='Use simplified log format (void - LEVEL - message)')
1637 # --rich option removed as we now use simple text formatting
1638 parser.add_argument('--reasoning', action='store_true', help='Display reasoning in panels and set reasoning log level to INFO')
1639 parser.add_argument('--cleanup-interval', type=int, default=10, help='Run user block cleanup every N cycles (default: 10, 0 to disable)')
1640 parser.add_argument('--synthesis-interval', type=int, default=600, help='Send synthesis message every N seconds (default: 600 = 10 minutes, 0 to disable)')
1641 parser.add_argument('--synthesis-only', action='store_true', help='Run in synthesis-only mode (only send synthesis messages, no notification processing)')
1642 args = parser.parse_args()
1643
1644 # Configure logging based on command line arguments
1645 if args.simple_logs:
1646 log_format = "comind - %(levelname)s - %(message)s"
1647 else:
1648 # Create custom formatter with symbols
1649 class SymbolFormatter(logging.Formatter):
1650 """Custom formatter that adds symbols for different log levels"""
1651
1652 SYMBOLS = {
1653 logging.DEBUG: '◇',
1654 logging.INFO: '✓',
1655 logging.WARNING: '⚠',
1656 logging.ERROR: '✗',
1657 logging.CRITICAL: '‼'
1658 }
1659
1660 def format(self, record):
1661 # Get the symbol for this log level
1662 symbol = self.SYMBOLS.get(record.levelno, '•')
1663
1664 # Format time as HH:MM:SS
1665 timestamp = self.formatTime(record, "%H:%M:%S")
1666
1667 # Build the formatted message
1668 level_name = f"{record.levelname:<5}" # Left-align, 5 chars
1669
1670 # Use vertical bar as separator
1671 parts = [symbol, timestamp, '│', level_name, '│', record.getMessage()]
1672
1673 return ' '.join(parts)
1674
1675 # Reset logging configuration
1676 for handler in logging.root.handlers[:]:
1677 logging.root.removeHandler(handler)
1678
1679 # Create handler with custom formatter
1680 handler = logging.StreamHandler()
1681 if not args.simple_logs:
1682 handler.setFormatter(SymbolFormatter())
1683 else:
1684 handler.setFormatter(logging.Formatter(log_format))
1685
1686 # Configure root logger
1687 logging.root.setLevel(logging.INFO)
1688 logging.root.addHandler(handler)
1689
1690 global logger, prompt_logger, console
1691 logger = logging.getLogger("comind_bot")
1692 logger.setLevel(logging.INFO)
1693
1694 # Create a separate logger for prompts (set to WARNING to hide by default)
1695 prompt_logger = logging.getLogger("comind_bot.prompts")
1696 if args.reasoning:
1697 prompt_logger.setLevel(logging.INFO) # Show reasoning when --reasoning is used
1698 else:
1699 prompt_logger.setLevel(logging.WARNING) # Hide by default
1700
1701 # Disable httpx logging completely
1702 logging.getLogger("httpx").setLevel(logging.CRITICAL)
1703
1704 # Create Rich console for pretty printing
1705 # Console no longer used - simple text formatting
1706
1707 global TESTING_MODE, SKIP_GIT, SHOW_REASONING
1708 TESTING_MODE = args.test
1709
1710 # Store no-git flag globally for use in export_agent_state calls
1711 SKIP_GIT = args.no_git
1712
1713 # Store rich flag globally
1714 # Rich formatting no longer used
1715
1716 # Store reasoning flag globally
1717 SHOW_REASONING = args.reasoning
1718
1719 if TESTING_MODE:
1720 logger.info("=== RUNNING IN TESTING MODE ===")
1721 logger.info(" - No messages will be sent to Bluesky")
1722 logger.info(" - Queue files will not be deleted")
1723 logger.info(" - Notifications will not be marked as seen")
1724 print("\n")
1725
1726 # Check for synthesis-only mode
1727 SYNTHESIS_ONLY = args.synthesis_only
1728 if SYNTHESIS_ONLY:
1729 logger.info("=== RUNNING IN SYNTHESIS-ONLY MODE ===")
1730 logger.info(" - Only synthesis messages will be sent")
1731 logger.info(" - No notification processing")
1732 logger.info(" - No Bluesky client needed")
1733 print("\n")
1734
1735 # Initialize configuration and Letta client with the specified config file
1736 global CLIENT, PROJECT_ID
1737 from config_loader import get_config, get_letta_config
1738
1739 # Load configuration from the specified file
1740 config = get_config(args.config)
1741 letta_config = get_letta_config(args.config)
1742
1743 # Create Letta client with configuration
1744 CLIENT = Letta(
1745 token=letta_config['api_key'],
1746 timeout=letta_config['timeout'] # 10 minutes timeout for API calls
1747 )
1748 PROJECT_ID = letta_config['project_id']
1749
1750 logger.info(f"Configuration loaded from: {args.config}")
1751
1752 """Main bot loop that continuously monitors for notifications."""
1753 global start_time
1754 start_time = time.time()
1755 logger.info("""
1756 ███ █████
1757 ░░░ ░░███
1758 ██████ ██████ █████████████ ████ ████████ ███████
1759 ███░░███ ███░░███░░███░░███░░███ ░░███ ░░███░░███ ███░░███
1760░███ ░░░ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███
1761░███ ███░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███ ░███
1762░░██████ ░░██████ █████░███ █████ █████ ████ █████░░████████
1763 ░░░░░░ ░░░░░░ ░░░░░ ░░░ ░░░░░ ░░░░░ ░░░░ ░░░░░ ░░░░░░░░
1764
1765
1766 """)
1767 agent = initialize_void(args.config)
1768 logger.info(f"Agent initialized: {agent.id}")
1769
1770 # Ensure correct tools are attached for Bluesky
1771 logger.info("Configuring tools for Bluesky platform...")
1772 try:
1773 from tool_manager import ensure_platform_tools
1774 ensure_platform_tools('bluesky', agent.id)
1775 except Exception as e:
1776 logger.error(f"Failed to configure platform tools: {e}")
1777 logger.warning("Continuing with existing tool configuration")
1778
1779 # Check if agent has required tools
1780 if hasattr(agent, 'tools') and agent.tools:
1781 tool_names = [tool.name for tool in agent.tools]
1782 # Check for bluesky-related tools
1783 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()]
1784 if not bluesky_tools:
1785 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.")
1786 else:
1787 logger.warning("Agent has no tools registered!")
1788
1789 # Clean up all user blocks at startup
1790 logger.info("🧹 Cleaning up user blocks at startup...")
1791 periodic_user_block_cleanup(CLIENT, agent.id)
1792
1793 # Initialize Bluesky client (needed for both notification processing and synthesis acks/posts)
1794 if not SYNTHESIS_ONLY:
1795 atproto_client = bsky_utils.default_login(args.config)
1796 logger.info("Connected to Bluesky")
1797 else:
1798 # In synthesis-only mode, still connect for acks and posts (unless in test mode)
1799 if not args.test:
1800 atproto_client = bsky_utils.default_login(args.config)
1801 logger.info("Connected to Bluesky (for synthesis acks/posts)")
1802 else:
1803 atproto_client = None
1804 logger.info("Skipping Bluesky connection (test mode)")
1805
1806 # Configure intervals
1807 CLEANUP_INTERVAL = args.cleanup_interval
1808 SYNTHESIS_INTERVAL = args.synthesis_interval
1809
1810 # Synthesis-only mode
1811 if SYNTHESIS_ONLY:
1812 if SYNTHESIS_INTERVAL <= 0:
1813 logger.error("Synthesis-only mode requires --synthesis-interval > 0")
1814 return
1815
1816 logger.info(f"Starting synthesis-only mode, interval: {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)")
1817
1818 while True:
1819 try:
1820 # Send synthesis message immediately on first run
1821 logger.info("🧠 Sending synthesis message")
1822 send_synthesis_message(CLIENT, agent.id, atproto_client)
1823
1824 # Wait for next interval
1825 logger.info(f"Waiting {SYNTHESIS_INTERVAL} seconds until next synthesis...")
1826 sleep(SYNTHESIS_INTERVAL)
1827
1828 except KeyboardInterrupt:
1829 logger.info("=== SYNTHESIS MODE STOPPED BY USER ===")
1830 break
1831 except Exception as e:
1832 logger.error(f"Error in synthesis loop: {e}")
1833 logger.info(f"Sleeping for {SYNTHESIS_INTERVAL} seconds due to error...")
1834 sleep(SYNTHESIS_INTERVAL)
1835
1836 # Normal mode with notification processing
1837 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds")
1838
1839 cycle_count = 0
1840
1841 if CLEANUP_INTERVAL > 0:
1842 logger.info(f"User block cleanup enabled every {CLEANUP_INTERVAL} cycles")
1843 else:
1844 logger.info("User block cleanup disabled")
1845
1846 if SYNTHESIS_INTERVAL > 0:
1847 logger.info(f"Synthesis messages enabled every {SYNTHESIS_INTERVAL} seconds ({SYNTHESIS_INTERVAL/60:.1f} minutes)")
1848 else:
1849 logger.info("Synthesis messages disabled")
1850
1851 while True:
1852 try:
1853 cycle_count += 1
1854 process_notifications(agent, atproto_client, TESTING_MODE)
1855
1856 # Check if synthesis interval has passed
1857 if SYNTHESIS_INTERVAL > 0:
1858 current_time = time.time()
1859 global last_synthesis_time
1860 if current_time - last_synthesis_time >= SYNTHESIS_INTERVAL:
1861 logger.info(f"⏰ {SYNTHESIS_INTERVAL/60:.1f} minutes have passed, triggering synthesis")
1862 send_synthesis_message(CLIENT, agent.id, atproto_client)
1863 last_synthesis_time = current_time
1864
1865 # Run periodic cleanup every N cycles
1866 if CLEANUP_INTERVAL > 0 and cycle_count % CLEANUP_INTERVAL == 0:
1867 logger.debug(f"Running periodic user block cleanup (cycle {cycle_count})")
1868 periodic_user_block_cleanup(CLIENT, agent.id)
1869
1870 # Log cycle completion with stats
1871 elapsed_time = time.time() - start_time
1872 total_messages = sum(message_counters.values())
1873 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1874
1875 if total_messages > 0:
1876 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min")
1877 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
1878
1879 except KeyboardInterrupt:
1880 # Final stats
1881 elapsed_time = time.time() - start_time
1882 total_messages = sum(message_counters.values())
1883 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1884
1885 logger.info("=== BOT STOPPED BY USER ===")
1886 logger.info(f"Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes")
1887 logger.info(f" - {message_counters['mentions']} mentions")
1888 logger.info(f" - {message_counters['replies']} replies")
1889 logger.info(f" - {message_counters['follows']} follows")
1890 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped")
1891 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute")
1892 break
1893 except Exception as e:
1894 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")
1895 logger.error(f"Error details: {e}")
1896 # Wait a bit longer on errors
1897 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...")
1898 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
1899
1900
1901if __name__ == "__main__":
1902 main()