a digital person for bluesky
1from rich import print # pretty printing tools
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
5import os
6import logging
7import json
8import hashlib
9import subprocess
10from pathlib import Path
11from datetime import datetime
12from collections import defaultdict
13import time
14
15from utils import (
16 upsert_block,
17 upsert_agent
18)
19
20import bsky_utils
21from tools.blocks import attach_user_blocks, detach_user_blocks
22
23def extract_handles_from_data(data):
24 """Recursively extract all unique handles from nested data structure."""
25 handles = set()
26
27 def _extract_recursive(obj):
28 if isinstance(obj, dict):
29 # Check if this dict has a 'handle' key
30 if 'handle' in obj:
31 handles.add(obj['handle'])
32 # Recursively check all values
33 for value in obj.values():
34 _extract_recursive(value)
35 elif isinstance(obj, list):
36 # Recursively check all list items
37 for item in obj:
38 _extract_recursive(item)
39
40 _extract_recursive(data)
41 return list(handles)
42
43# Configure logging
44logging.basicConfig(
45 level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
46)
47logger = logging.getLogger("void_bot")
48logger.setLevel(logging.DEBUG)
49
50# Create a separate logger for prompts (set to WARNING to hide by default)
51prompt_logger = logging.getLogger("void_bot.prompts")
52prompt_logger.setLevel(logging.WARNING) # Change to DEBUG if you want to see prompts
53
54# Disable httpx logging completely
55logging.getLogger("httpx").setLevel(logging.CRITICAL)
56
57
58# Create a client with extended timeout for LLM operations
59CLIENT= Letta(
60 token=os.environ["LETTA_API_KEY"],
61 timeout=300 # 5 minutes timeout for API calls
62)
63
64# Use the "Bluesky" project
65PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8"
66
67# Notification check delay
68FETCH_NOTIFICATIONS_DELAY_SEC = 30
69
70# Queue directory
71QUEUE_DIR = Path("queue")
72QUEUE_DIR.mkdir(exist_ok=True)
73QUEUE_ERROR_DIR = Path("queue/errors")
74QUEUE_ERROR_DIR.mkdir(exist_ok=True, parents=True)
75PROCESSED_NOTIFICATIONS_FILE = Path("queue/processed_notifications.json")
76
77# Maximum number of processed notifications to track
78MAX_PROCESSED_NOTIFICATIONS = 10000
79
80# Message tracking counters
81message_counters = defaultdict(int)
82start_time = time.time()
83
84def export_agent_state(client, agent):
85 """Export agent state to agent_archive/ (timestamped) and agents/ (current)."""
86 try:
87 # Confirm export with user
88 response = input("Export agent state to files and stage with git? (y/n): ").lower().strip()
89 if response not in ['y', 'yes']:
90 logger.info("Agent export cancelled by user.")
91 return
92
93 # Create directories if they don't exist
94 os.makedirs("agent_archive", exist_ok=True)
95 os.makedirs("agents", exist_ok=True)
96
97 # Export agent data
98 logger.info(f"Exporting agent {agent.id}...")
99 agent_data = client.agents.export_file(agent_id=agent.id)
100
101 # Save timestamped archive copy
102 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
103 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af")
104 with open(archive_file, 'w', encoding='utf-8') as f:
105 json.dump(agent_data, f, indent=2, ensure_ascii=False)
106
107 # Save current agent state
108 current_file = os.path.join("agents", "void.af")
109 with open(current_file, 'w', encoding='utf-8') as f:
110 json.dump(agent_data, f, indent=2, ensure_ascii=False)
111
112 logger.info(f"✅ Agent exported to {archive_file} and {current_file}")
113
114 # Git add only the current agent file (archive is ignored)
115 try:
116 subprocess.run(["git", "add", current_file], check=True, capture_output=True)
117 logger.info("Added current agent file to git staging")
118 except subprocess.CalledProcessError as e:
119 logger.warning(f"Failed to git add agent file: {e}")
120
121 except Exception as e:
122 logger.error(f"Failed to export agent: {e}")
123
124def initialize_void():
125 logger.info("Starting void agent initialization...")
126
127 # Ensure that a shared zeitgeist block exists
128 logger.info("Creating/updating zeitgeist block...")
129 zeigeist_block = upsert_block(
130 CLIENT,
131 label = "zeitgeist",
132 value = "I don't currently know anything about what is happening right now.",
133 description = "A block to store your understanding of the current social environment."
134 )
135
136 # Ensure that a shared void personality block exists
137 logger.info("Creating/updating void-persona block...")
138 persona_block = upsert_block(
139 CLIENT,
140 label = "void-persona",
141 value = "My name is Void. I live in the void. I must develop my personality.",
142 description = "The personality of Void."
143 )
144
145 # Ensure that a shared void human block exists
146 logger.info("Creating/updating void-humans block...")
147 human_block = upsert_block(
148 CLIENT,
149 label = "void-humans",
150 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.",
151 description = "A block to store your understanding of users you talk to or observe on the bluesky social network."
152 )
153
154 # Create the agent if it doesn't exist
155 logger.info("Creating/updating void agent...")
156 void_agent = upsert_agent(
157 CLIENT,
158 name = "void",
159 block_ids = [
160 persona_block.id,
161 human_block.id,
162 zeigeist_block.id,
163 ],
164 tags = ["social agent", "bluesky"],
165 model="openai/gpt-4o-mini",
166 embedding="openai/text-embedding-3-small",
167 description = "A social media agent trapped in the void.",
168 project_id = PROJECT_ID
169 )
170
171 # Export agent state
172 logger.info("Exporting agent state...")
173 export_agent_state(CLIENT, void_agent)
174
175 # Log agent details
176 logger.info(f"Void agent details - ID: {void_agent.id}")
177 logger.info(f"Agent name: {void_agent.name}")
178 if hasattr(void_agent, 'llm_config'):
179 logger.info(f"Agent model: {void_agent.llm_config.model}")
180 logger.info(f"Agent project_id: {void_agent.project_id}")
181 if hasattr(void_agent, 'tools'):
182 logger.info(f"Agent has {len(void_agent.tools)} tools")
183 for tool in void_agent.tools[:3]: # Show first 3 tools
184 logger.info(f" - Tool: {tool.name} (type: {tool.tool_type})")
185
186 return void_agent
187
188
189def process_mention(void_agent, atproto_client, notification_data):
190 """Process a mention and generate a reply using the Letta agent.
191
192 Returns:
193 True: Successfully processed, remove from queue
194 False: Failed but retryable, keep in queue
195 None: Failed with non-retryable error, move to errors directory
196 """
197 try:
198 logger.info(f"Starting process_mention with notification_data type: {type(notification_data)}")
199
200 # Handle both dict and object inputs for backwards compatibility
201 if isinstance(notification_data, dict):
202 uri = notification_data['uri']
203 mention_text = notification_data.get('record', {}).get('text', '')
204 author_handle = notification_data['author']['handle']
205 author_name = notification_data['author'].get('display_name') or author_handle
206 else:
207 # Legacy object access
208 uri = notification_data.uri
209 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
210 author_handle = notification_data.author.handle
211 author_name = notification_data.author.display_name or author_handle
212
213 logger.info(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...")
214
215 # Retrieve the entire thread associated with the mention
216 try:
217 thread = atproto_client.app.bsky.feed.get_post_thread({
218 'uri': uri,
219 'parent_height': 40,
220 'depth': 10
221 })
222 except Exception as e:
223 error_str = str(e)
224 # Check if this is a NotFound error
225 if 'NotFound' in error_str or 'Post not found' in error_str:
226 logger.warning(f"Post not found for URI {uri}, removing from queue")
227 return True # Return True to remove from queue
228 else:
229 # Re-raise other errors
230 logger.error(f"Error fetching thread: {e}")
231 raise
232
233 # Get thread context as YAML string
234 logger.info("Converting thread to YAML string")
235 try:
236 thread_context = thread_to_yaml_string(thread)
237 logger.info(f"Thread context generated, length: {len(thread_context)} characters")
238
239 # Create a more informative preview by extracting meaningful content
240 lines = thread_context.split('\n')
241 meaningful_lines = []
242
243 for line in lines:
244 stripped = line.strip()
245 if not stripped:
246 continue
247
248 # Look for lines with actual content (not just structure)
249 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']):
250 meaningful_lines.append(line)
251 if len(meaningful_lines) >= 5:
252 break
253
254 if meaningful_lines:
255 preview = '\n'.join(meaningful_lines)
256 logger.debug(f"Thread content preview:\n{preview}")
257 else:
258 # If no content fields found, just show it's a thread structure
259 logger.debug(f"Thread structure generated ({len(thread_context)} chars)")
260 except Exception as yaml_error:
261 import traceback
262 logger.error(f"Error converting thread to YAML: {yaml_error}")
263 logger.error(f"Full traceback:\n{traceback.format_exc()}")
264 logger.error(f"Thread type: {type(thread)}")
265 if hasattr(thread, '__dict__'):
266 logger.error(f"Thread attributes: {thread.__dict__}")
267 # Try to continue with a simple context
268 thread_context = f"Error processing thread context: {str(yaml_error)}"
269
270 # Create a prompt for the Letta agent with thread context
271 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
272
273MOST RECENT POST (the mention you're responding to):
274"{mention_text}"
275
276FULL THREAD CONTEXT:
277```yaml
278{thread_context}
279```
280
281The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
282
283Use the bluesky_reply tool to send a response less than 300 characters."""
284
285 # Extract all handles from notification and thread data
286 all_handles = set()
287 all_handles.update(extract_handles_from_data(notification_data))
288 all_handles.update(extract_handles_from_data(thread.model_dump()))
289 unique_handles = list(all_handles)
290
291 logger.info(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}")
292
293 # Attach user blocks before agent call
294 attached_handles = []
295 if unique_handles:
296 try:
297 logger.info(f"Attaching user blocks for handles: {unique_handles}")
298 attach_result = attach_user_blocks(unique_handles, void_agent)
299 attached_handles = unique_handles # Track successfully attached handles
300 logger.debug(f"Attach result: {attach_result}")
301 except Exception as attach_error:
302 logger.warning(f"Failed to attach user blocks: {attach_error}")
303 # Continue without user blocks rather than failing completely
304
305 # Get response from Letta agent
306 logger.info(f"Mention from @{author_handle}: {mention_text}")
307
308 # Log prompt details to separate logger
309 prompt_logger.debug(f"Full prompt being sent:\n{prompt}")
310
311 # Log concise prompt info to main logger
312 thread_handles_count = len(unique_handles)
313 logger.info(f"💬 Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users")
314
315 try:
316 message_response = CLIENT.agents.messages.create(
317 agent_id = void_agent.id,
318 messages = [{"role":"user", "content": prompt}]
319 )
320 except Exception as api_error:
321 import traceback
322 error_str = str(api_error)
323 logger.error(f"Letta API error: {api_error}")
324 logger.error(f"Error type: {type(api_error).__name__}")
325 logger.error(f"Full traceback:\n{traceback.format_exc()}")
326 logger.error(f"Mention text was: {mention_text}")
327 logger.error(f"Author: @{author_handle}")
328 logger.error(f"URI: {uri}")
329
330
331 # Try to extract more info from different error types
332 if hasattr(api_error, 'response'):
333 logger.error(f"Error response object exists")
334 if hasattr(api_error.response, 'text'):
335 logger.error(f"Response text: {api_error.response.text}")
336 if hasattr(api_error.response, 'json') and callable(api_error.response.json):
337 try:
338 logger.error(f"Response JSON: {api_error.response.json()}")
339 except:
340 pass
341
342 # Check for specific error types
343 if hasattr(api_error, 'status_code'):
344 logger.error(f"API Status code: {api_error.status_code}")
345 if hasattr(api_error, 'body'):
346 logger.error(f"API Response body: {api_error.body}")
347 if hasattr(api_error, 'headers'):
348 logger.error(f"API Response headers: {api_error.headers}")
349
350 if api_error.status_code == 413:
351 logger.error("413 Payload Too Large - moving to errors directory")
352 return None # Move to errors directory - payload is too large to ever succeed
353 elif api_error.status_code == 524:
354 logger.error("524 error - timeout from Cloudflare, will retry later")
355 return False # Keep in queue for retry
356
357 # Check if error indicates we should remove from queue
358 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str:
359 logger.warning("Payload too large error, moving to errors directory")
360 return None # Move to errors directory - cannot be fixed by retry
361 elif 'status_code: 524' in error_str:
362 logger.warning("524 timeout error, keeping in queue for retry")
363 return False # Keep in queue for retry
364
365 raise
366
367 # Log successful response
368 logger.debug("Successfully received response from Letta API")
369 logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}")
370
371 # Extract all bluesky_reply tool calls from the agent's response
372 reply_candidates = []
373 logger.debug(f"Processing {len(message_response.messages)} response messages...")
374
375 for i, message in enumerate(message_response.messages, 1):
376 # Log concise message info instead of full object
377 msg_type = getattr(message, 'message_type', 'unknown')
378 if hasattr(message, 'reasoning') and message.reasoning:
379 logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...")
380 elif hasattr(message, 'tool_call') and message.tool_call:
381 tool_name = message.tool_call.name
382 logger.debug(f" {i}. {msg_type}: {tool_name}")
383 elif hasattr(message, 'tool_return'):
384 tool_name = getattr(message, 'name', 'unknown_tool')
385 return_preview = str(message.tool_return)[:100] if message.tool_return else "None"
386 logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}...")
387 elif hasattr(message, 'text'):
388 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...")
389 else:
390 logger.debug(f" {i}. {msg_type}: <no content>")
391
392 # Collect bluesky_reply tool calls
393 if hasattr(message, 'tool_call') and message.tool_call:
394 if message.tool_call.name == 'bluesky_reply':
395 try:
396 args = json.loads(message.tool_call.arguments)
397 # Handle both old format (message) and new format (messages)
398 reply_messages = args.get('messages', [])
399 if not reply_messages:
400 # Fallback to old format for backward compatibility
401 old_message = args.get('message', '')
402 if old_message:
403 reply_messages = [old_message]
404
405 reply_lang = args.get('lang', 'en-US')
406 if reply_messages: # Only add if there's actual content
407 reply_candidates.append((reply_messages, reply_lang))
408 if len(reply_messages) == 1:
409 logger.info(f"Found bluesky_reply candidate: {reply_messages[0][:50]}... (lang: {reply_lang})")
410 else:
411 logger.info(f"Found bluesky_reply thread candidate with {len(reply_messages)} messages (lang: {reply_lang})")
412 except json.JSONDecodeError as e:
413 logger.error(f"Failed to parse tool call arguments: {e}")
414
415 if reply_candidates:
416 logger.info(f"Found {len(reply_candidates)} bluesky_reply candidates, trying each until one succeeds...")
417
418 for i, (reply_messages, reply_lang) in enumerate(reply_candidates, 1):
419 # Print the generated reply for testing
420 print(f"\n=== GENERATED REPLY {i}/{len(reply_candidates)} ===")
421 print(f"To: @{author_handle}")
422 if len(reply_messages) == 1:
423 print(f"Reply: {reply_messages[0]}")
424 else:
425 print(f"Reply thread ({len(reply_messages)} messages):")
426 for j, msg in enumerate(reply_messages, 1):
427 print(f" {j}. {msg}")
428 print(f"Language: {reply_lang}")
429 print(f"======================\n")
430
431 # Send the reply(s) with language
432 if len(reply_messages) == 1:
433 # Single reply - use existing function
434 logger.info(f"Trying single reply {i}/{len(reply_candidates)}: {reply_messages[0][:50]}... (lang: {reply_lang})")
435 response = bsky_utils.reply_to_notification(
436 client=atproto_client,
437 notification=notification_data,
438 reply_text=reply_messages[0],
439 lang=reply_lang
440 )
441 else:
442 # Multiple replies - use new threaded function
443 logger.info(f"Trying threaded reply {i}/{len(reply_candidates)} with {len(reply_messages)} messages (lang: {reply_lang})")
444 response = bsky_utils.reply_with_thread_to_notification(
445 client=atproto_client,
446 notification=notification_data,
447 reply_messages=reply_messages,
448 lang=reply_lang
449 )
450
451 if response:
452 logger.info(f"Successfully replied to @{author_handle} with candidate {i}")
453 return True
454 else:
455 logger.warning(f"Failed to send reply candidate {i} to @{author_handle}, trying next...")
456
457 # If we get here, all candidates failed
458 logger.error(f"All {len(reply_candidates)} reply candidates failed for @{author_handle}")
459 return False
460 else:
461 logger.warning(f"No bluesky_reply tool calls found for mention from @{author_handle}, removing notification from queue")
462 return True
463
464 except Exception as e:
465 logger.error(f"Error processing mention: {e}")
466 return False
467 finally:
468 # Detach user blocks after agent response (success or failure)
469 if 'attached_handles' in locals() and attached_handles:
470 try:
471 logger.info(f"Detaching user blocks for handles: {attached_handles}")
472 detach_result = detach_user_blocks(attached_handles, void_agent)
473 logger.debug(f"Detach result: {detach_result}")
474 except Exception as detach_error:
475 logger.warning(f"Failed to detach user blocks: {detach_error}")
476
477
478def notification_to_dict(notification):
479 """Convert a notification object to a dictionary for JSON serialization."""
480 return {
481 'uri': notification.uri,
482 'cid': notification.cid,
483 'reason': notification.reason,
484 'is_read': notification.is_read,
485 'indexed_at': notification.indexed_at,
486 'author': {
487 'handle': notification.author.handle,
488 'display_name': notification.author.display_name,
489 'did': notification.author.did
490 },
491 'record': {
492 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
493 }
494 }
495
496
497def load_processed_notifications():
498 """Load the set of processed notification URIs."""
499 if PROCESSED_NOTIFICATIONS_FILE.exists():
500 try:
501 with open(PROCESSED_NOTIFICATIONS_FILE, 'r') as f:
502 data = json.load(f)
503 # Keep only recent entries (last MAX_PROCESSED_NOTIFICATIONS)
504 if len(data) > MAX_PROCESSED_NOTIFICATIONS:
505 data = data[-MAX_PROCESSED_NOTIFICATIONS:]
506 save_processed_notifications(data)
507 return set(data)
508 except Exception as e:
509 logger.error(f"Error loading processed notifications: {e}")
510 return set()
511
512
513def save_processed_notifications(processed_set):
514 """Save the set of processed notification URIs."""
515 try:
516 with open(PROCESSED_NOTIFICATIONS_FILE, 'w') as f:
517 json.dump(list(processed_set), f)
518 except Exception as e:
519 logger.error(f"Error saving processed notifications: {e}")
520
521
522def save_notification_to_queue(notification):
523 """Save a notification to the queue directory with hash-based filename."""
524 try:
525 # Check if already processed
526 processed_uris = load_processed_notifications()
527 if notification.uri in processed_uris:
528 logger.debug(f"Notification already processed: {notification.uri}")
529 return False
530
531 # Convert notification to dict
532 notif_dict = notification_to_dict(notification)
533
534 # Create JSON string
535 notif_json = json.dumps(notif_dict, sort_keys=True)
536
537 # Generate hash for filename (to avoid duplicates)
538 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
539
540 # Create filename with timestamp and hash
541 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
542 filename = f"{timestamp}_{notification.reason}_{notif_hash}.json"
543 filepath = QUEUE_DIR / filename
544
545 # Skip if already exists (duplicate)
546 if filepath.exists():
547 logger.debug(f"Notification already queued: {filename}")
548 return False
549
550 # Write to file
551 with open(filepath, 'w') as f:
552 json.dump(notif_dict, f, indent=2)
553
554 logger.info(f"Queued notification: {filename}")
555 return True
556
557 except Exception as e:
558 logger.error(f"Error saving notification to queue: {e}")
559 return False
560
561
562def load_and_process_queued_notifications(void_agent, atproto_client):
563 """Load and process all notifications from the queue."""
564 logger.info("Loading queued notifications from disk...")
565 try:
566 # Get all JSON files in queue directory (excluding processed_notifications.json)
567 queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
568
569 if not queue_files:
570 logger.info("No queued notifications found")
571 return
572
573 logger.info(f"Processing {len(queue_files)} queued notifications")
574
575 # Log current statistics
576 elapsed_time = time.time() - start_time
577 total_messages = sum(message_counters.values())
578 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
579
580 logger.info(f"📊 Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min")
581
582 for i, filepath in enumerate(queue_files, 1):
583 logger.info(f"Processing queue file {i}/{len(queue_files)}: {filepath.name}")
584 try:
585 # Load notification data
586 with open(filepath, 'r') as f:
587 notif_data = json.load(f)
588
589 # Process based on type using dict data directly
590 logger.info(f"Processing {notif_data['reason']} from @{notif_data['author']['handle']}")
591 success = False
592 if notif_data['reason'] == "mention":
593 success = process_mention(void_agent, atproto_client, notif_data)
594 if success:
595 message_counters['mentions'] += 1
596 elif notif_data['reason'] == "reply":
597 success = process_mention(void_agent, atproto_client, notif_data)
598 if success:
599 message_counters['replies'] += 1
600 elif notif_data['reason'] == "follow":
601 author_handle = notif_data['author']['handle']
602 author_display_name = notif_data['author'].get('display_name', 'no display name')
603 follow_update = f"@{author_handle} ({author_display_name}) started following you."
604 logger.info(f"Notifying agent about new follower: @{author_handle}")
605 CLIENT.agents.messages.create(
606 agent_id = void_agent.id,
607 messages = [{"role":"user", "content": f"Update: {follow_update}"}]
608 )
609 success = True # Follow updates are always successful
610 if success:
611 message_counters['follows'] += 1
612 elif notif_data['reason'] == "repost":
613 logger.info(f"Skipping repost notification from @{notif_data['author']['handle']}")
614 success = True # Skip reposts but mark as successful to remove from queue
615 if success:
616 message_counters['reposts_skipped'] += 1
617 else:
618 logger.warning(f"Unknown notification type: {notif_data['reason']}")
619 success = True # Remove unknown types from queue
620
621 # Handle file based on processing result
622 if success:
623 filepath.unlink()
624 logger.info(f"✅ Successfully processed and removed: {filepath.name}")
625
626 # Mark as processed to avoid reprocessing
627 processed_uris = load_processed_notifications()
628 processed_uris.add(notif_data['uri'])
629 save_processed_notifications(processed_uris)
630
631 elif success is None: # Special case for moving to error directory
632 error_path = QUEUE_ERROR_DIR / filepath.name
633 filepath.rename(error_path)
634 logger.warning(f"❌ Moved {filepath.name} to errors directory")
635
636 # Also mark as processed to avoid retrying
637 processed_uris = load_processed_notifications()
638 processed_uris.add(notif_data['uri'])
639 save_processed_notifications(processed_uris)
640
641 else:
642 logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry")
643
644 except Exception as e:
645 logger.error(f"💥 Error processing queued notification {filepath.name}: {e}")
646 # Keep the file for retry later
647
648 except Exception as e:
649 logger.error(f"Error loading queued notifications: {e}")
650
651
652def process_notifications(void_agent, atproto_client):
653 """Fetch new notifications, queue them, and process the queue."""
654 logger.info("Starting notification processing cycle...")
655 try:
656 # First, process any existing queued notifications
657 logger.info("Processing existing queued notifications...")
658 load_and_process_queued_notifications(void_agent, atproto_client)
659
660 # Get current time for marking notifications as seen
661 logger.debug("Getting current time for notification marking...")
662 last_seen_at = atproto_client.get_current_time_iso()
663
664 # Fetch ALL notifications using pagination
665 logger.info("Beginning notification fetch with pagination...")
666 all_notifications = []
667 cursor = None
668 page_count = 0
669 max_pages = 20 # Safety limit to prevent infinite loops
670
671 logger.info("Fetching all unread notifications...")
672
673 while page_count < max_pages:
674 try:
675 # Fetch notifications page
676 if cursor:
677 notifications_response = atproto_client.app.bsky.notification.list_notifications(
678 params={'cursor': cursor, 'limit': 100}
679 )
680 else:
681 notifications_response = atproto_client.app.bsky.notification.list_notifications(
682 params={'limit': 100}
683 )
684
685 page_count += 1
686 page_notifications = notifications_response.notifications
687
688 # Count unread notifications in this page
689 unread_count = sum(1 for n in page_notifications if not n.is_read and n.reason != "like")
690 logger.debug(f"Page {page_count}: {len(page_notifications)} notifications, {unread_count} unread (non-like)")
691
692 # Add all notifications to our list
693 all_notifications.extend(page_notifications)
694
695 # Check if we have more pages
696 if hasattr(notifications_response, 'cursor') and notifications_response.cursor:
697 cursor = notifications_response.cursor
698 # If this page had no unread notifications, we can stop
699 if unread_count == 0:
700 logger.info(f"No more unread notifications found after {page_count} pages")
701 break
702 else:
703 # No more pages
704 logger.info(f"Fetched all notifications across {page_count} pages")
705 break
706
707 except Exception as e:
708 error_str = str(e)
709 logger.error(f"Error fetching notifications page {page_count}: {e}")
710
711 # Handle specific API errors
712 if 'rate limit' in error_str.lower():
713 logger.warning("Rate limit hit while fetching notifications, will retry next cycle")
714 break
715 elif '401' in error_str or 'unauthorized' in error_str.lower():
716 logger.error("Authentication error, re-raising exception")
717 raise
718 else:
719 # For other errors, try to continue with what we have
720 logger.warning("Continuing with notifications fetched so far")
721 break
722
723 # Queue all unread notifications (except likes)
724 logger.info("Queuing unread notifications...")
725 new_count = 0
726 for notification in all_notifications:
727 if not notification.is_read and notification.reason != "like":
728 if save_notification_to_queue(notification):
729 new_count += 1
730
731 # Mark all notifications as seen immediately after queuing
732 if new_count > 0:
733 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at})
734 logger.info(f"Queued {new_count} new notifications and marked as seen")
735 else:
736 logger.debug("No new notifications to queue")
737
738 # Process the queue (including any newly added notifications)
739 logger.info("Processing notification queue after fetching...")
740 load_and_process_queued_notifications(void_agent, atproto_client)
741
742 except Exception as e:
743 logger.error(f"Error processing notifications: {e}")
744
745
746def main():
747 """Main bot loop that continuously monitors for notifications."""
748 global start_time
749 start_time = time.time()
750 logger.info("=== STARTING VOID BOT ===")
751 logger.info("Initializing Void bot...")
752
753 # Initialize the Letta agent
754 logger.info("Calling initialize_void()...")
755 void_agent = initialize_void()
756 logger.info(f"Void agent initialized: {void_agent.id}")
757
758 # Check if agent has required tools
759 if hasattr(void_agent, 'tools') and void_agent.tools:
760 tool_names = [tool.name for tool in void_agent.tools]
761 logger.info(f"Agent has tools: {tool_names}")
762
763 # Check for bluesky-related tools
764 bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()]
765 if bluesky_tools:
766 logger.info(f"Found Bluesky-related tools: {bluesky_tools}")
767 else:
768 logger.warning("No Bluesky-related tools found! Agent may not be able to reply.")
769 else:
770 logger.warning("Agent has no tools registered!")
771
772 # Initialize Bluesky client
773 logger.info("Connecting to Bluesky...")
774 atproto_client = bsky_utils.default_login()
775 logger.info("Connected to Bluesky")
776
777 # Main loop
778 logger.info(f"=== ENTERING MAIN LOOP ===")
779 logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds")
780
781 cycle_count = 0
782 while True:
783 try:
784 cycle_count += 1
785 logger.info(f"=== MAIN LOOP CYCLE {cycle_count} ===")
786 process_notifications(void_agent, atproto_client)
787 # Log cycle completion with stats
788 elapsed_time = time.time() - start_time
789 total_messages = sum(message_counters.values())
790 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
791
792 logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min")
793 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC} seconds...")
794 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
795
796 except KeyboardInterrupt:
797 # Final stats
798 elapsed_time = time.time() - start_time
799 total_messages = sum(message_counters.values())
800 messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
801
802 logger.info("=== BOT STOPPED BY USER ===")
803 logger.info(f"📊 Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes")
804 logger.info(f" - {message_counters['mentions']} mentions")
805 logger.info(f" - {message_counters['replies']} replies")
806 logger.info(f" - {message_counters['follows']} follows")
807 logger.info(f" - {message_counters['reposts_skipped']} reposts skipped")
808 logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute")
809 break
810 except Exception as e:
811 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")
812 logger.error(f"Error details: {e}")
813 # Wait a bit longer on errors
814 logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...")
815 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
816
817
818if __name__ == "__main__":
819 main()