···1-from rich import print # pretty printing tools
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
···30 get_queue_config
31)
32033def extract_handles_from_data(data):
34 """Recursively extract all unique handles from nested data structure."""
35 handles = set()
36-37 def _extract_recursive(obj):
38 if isinstance(obj, dict):
39 # Check if this dict has a 'handle' key
···46 # Recursively check all list items
47 for item in obj:
48 _extract_recursive(item)
49-50 _extract_recursive(data)
51 return list(handles)
05253# Initialize configuration and logging
54config = get_config()
···9697# Skip git operations flag
98SKIP_GIT = False
099100def export_agent_state(client, agent, skip_git=False):
101 """Export agent state to agent_archive/ (timestamped) and agents/ (current)."""
102 try:
103 # Confirm export with user unless git is being skipped
104 if not skip_git:
105- response = input("Export agent state to files and stage with git? (y/n): ").lower().strip()
0106 if response not in ['y', 'yes']:
107 logger.info("Agent export cancelled by user.")
108 return
109 else:
110 logger.info("Exporting agent state (git staging disabled)")
111-112 # Create directories if they don't exist
113 os.makedirs("agent_archive", exist_ok=True)
114 os.makedirs("agents", exist_ok=True)
115-116 # Export agent data
117 logger.info(f"Exporting agent {agent.id}. This takes some time...")
118 agent_data = client.agents.export_file(agent_id=agent.id)
119-120 # Save timestamped archive copy
121 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
122 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af")
123 with open(archive_file, 'w', encoding='utf-8') as f:
124 json.dump(agent_data, f, indent=2, ensure_ascii=False)
125-126 # Save current agent state
127 current_file = os.path.join("agents", "void.af")
128 with open(current_file, 'w', encoding='utf-8') as f:
129 json.dump(agent_data, f, indent=2, ensure_ascii=False)
130-131 logger.info(f"✅ Agent exported to {archive_file} and {current_file}")
132-133 # Git add only the current agent file (archive is ignored) unless skip_git is True
134 if not skip_git:
135 try:
136- subprocess.run(["git", "add", current_file], check=True, capture_output=True)
0137 logger.info("Added current agent file to git staging")
138 except subprocess.CalledProcessError as e:
139 logger.warning(f"Failed to git add agent file: {e}")
140-141 except Exception as e:
142 logger.error(f"Failed to export agent: {e}")
0143144def initialize_void():
145 logger.info("Starting void agent initialization...")
146147 # Get block configurations
148 blocks_config = agent_config['blocks']
149-150 # Ensure that a shared zeitgeist block exists
151 logger.info("Creating/updating zeitgeist block...")
152 zeitgeist_config = blocks_config.get('zeitgeist', {})
153 zeigeist_block = upsert_block(
154 CLIENT,
155 label=zeitgeist_config.get('label', 'zeitgeist'),
156- value=zeitgeist_config.get('value', "I don't currently know anything about what is happening right now."),
157- description=zeitgeist_config.get('description', "A block to store your understanding of the current social environment.")
00158 )
159160 # Ensure that a shared void personality block exists
···163 persona_block = upsert_block(
164 CLIENT,
165 label=persona_config.get('label', 'void-persona'),
166- value=persona_config.get('value', "My name is Void. I live in the void. I must develop my personality."),
167- description=persona_config.get('description', "The personality of Void.")
00168 )
169170 # Ensure that a shared void human block exists
···173 human_block = upsert_block(
174 CLIENT,
175 label=humans_config.get('label', 'void-humans'),
176- value=humans_config.get('value', "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org."),
177- description=humans_config.get('description', "A block to store your understanding of users you talk to or observe on the bluesky social network.")
00178 )
179180 # Create the agent if it doesn't exist
···193 description=agent_config['description'],
194 project_id=PROJECT_ID
195 )
196-197 # Export agent state
198 logger.info("Exporting agent state...")
199 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
200-201 # Log agent details
202 logger.info(f"Void agent details - ID: {void_agent.id}")
203 logger.info(f"Agent name: {void_agent.name}")
···214215def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False):
216 """Process a mention and generate a reply using the Letta agent.
217-218 Args:
219 void_agent: The Letta agent instance
220 atproto_client: The AT Protocol client
221 notification_data: The notification data dictionary
222 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
223-224 Returns:
225 True: Successfully processed, remove from queue
226 False: Failed but retryable, keep in queue
···228 "no_reply": No reply was generated, move to no_reply directory
229 """
230 try:
231- logger.debug(f"Starting process_mention with notification_data type: {type(notification_data)}")
232-0233 # Handle both dict and object inputs for backwards compatibility
234 if isinstance(notification_data, dict):
235 uri = notification_data['uri']
236 mention_text = notification_data.get('record', {}).get('text', '')
237 author_handle = notification_data['author']['handle']
238- author_name = notification_data['author'].get('display_name') or author_handle
0239 else:
240 # Legacy object access
241 uri = notification_data.uri
242- mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
0243 author_handle = notification_data.author.handle
244 author_name = notification_data.author.display_name or author_handle
245-246- logger.info(f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...")
0247248 # Retrieve the entire thread associated with the mention
249 try:
···254 })
255 except Exception as e:
256 error_str = str(e)
257- # Check if this is a NotFound error
258 if 'NotFound' in error_str or 'Post not found' in error_str:
259- logger.warning(f"Post not found for URI {uri}, removing from queue")
000000000260 return True # Return True to remove from queue
261 else:
262 # Re-raise other errors
···267 logger.debug("Converting thread to YAML string")
268 try:
269 thread_context = thread_to_yaml_string(thread)
270- logger.debug(f"Thread context generated, length: {len(thread_context)} characters")
271-0272 # Create a more informative preview by extracting meaningful content
273 lines = thread_context.split('\n')
274 meaningful_lines = []
275-276 for line in lines:
277 stripped = line.strip()
278 if not stripped:
279 continue
280-281 # Look for lines with actual content (not just structure)
282 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']):
283 meaningful_lines.append(line)
284 if len(meaningful_lines) >= 5:
285 break
286-287 if meaningful_lines:
288 preview = '\n'.join(meaningful_lines)
289 logger.debug(f"Thread content preview:\n{preview}")
290 else:
291 # If no content fields found, just show it's a thread structure
292- logger.debug(f"Thread structure generated ({len(thread_context)} chars)")
0293 except Exception as yaml_error:
294 import traceback
295 logger.error(f"Error converting thread to YAML: {yaml_error}")
···323 all_handles.update(extract_handles_from_data(notification_data))
324 all_handles.update(extract_handles_from_data(thread.model_dump()))
325 unique_handles = list(all_handles)
326-327- logger.debug(f"Found {len(unique_handles)} unique handles in thread: {unique_handles}")
328-0329 # Attach user blocks before agent call
330 attached_handles = []
331 if unique_handles:
332 try:
333- logger.debug(f"Attaching user blocks for handles: {unique_handles}")
0334 attach_result = attach_user_blocks(unique_handles, void_agent)
335 attached_handles = unique_handles # Track successfully attached handles
336 logger.debug(f"Attach result: {attach_result}")
···340341 # Get response from Letta agent
342 logger.info(f"Mention from @{author_handle}: {mention_text}")
343-344 # Log prompt details to separate logger
345 prompt_logger.debug(f"Full prompt being sent:\n{prompt}")
346-347 # Log concise prompt info to main logger
348 thread_handles_count = len(unique_handles)
349- logger.info(f"💬 Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users")
0350351 try:
352 # Use streaming to avoid 524 timeout errors
353 message_stream = CLIENT.agents.messages.create_stream(
354 agent_id=void_agent.id,
355 messages=[{"role": "user", "content": prompt}],
356- stream_tokens=False, # Step streaming only (faster than token streaming)
0357 max_steps=agent_config['max_steps']
358 )
359-360 # Collect the streaming response
361 all_messages = []
362 for chunk in message_stream:
···372 args = json.loads(chunk.tool_call.arguments)
373 # Format based on tool type
374 if tool_name == 'bluesky_reply':
375- messages = args.get('messages', [args.get('message', '')])
0376 lang = args.get('lang', 'en-US')
377 if messages and isinstance(messages, list):
378- preview = messages[0][:100] + "..." if len(messages[0]) > 100 else messages[0]
379- msg_count = f" ({len(messages)} msgs)" if len(messages) > 1 else ""
380- logger.info(f"🔧 Tool call: {tool_name} → \"{preview}\"{msg_count} [lang: {lang}]")
000381 else:
382- logger.info(f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)")
0383 elif tool_name == 'archival_memory_search':
384 query = args.get('query', 'unknown')
385- logger.info(f"🔧 Tool call: {tool_name} → query: \"{query}\"")
0386 elif tool_name == 'update_block':
387 label = args.get('label', 'unknown')
388- value_preview = str(args.get('value', ''))[:50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', ''))
389- logger.info(f"🔧 Tool call: {tool_name} → {label}: \"{value_preview}\"")
00390 else:
391 # Generic display for other tools
392- args_str = ', '.join(f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
0393 if len(args_str) > 150:
394 args_str = args_str[:150] + "..."
395- logger.info(f"🔧 Tool call: {tool_name}({args_str})")
0396 except:
397 # Fallback to original format if parsing fails
398- logger.info(f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)")
0399 elif chunk.message_type == 'tool_return_message':
400 # Enhanced tool result logging
401 tool_name = chunk.name
402 status = chunk.status
403-404 if status == 'success':
405 # Try to show meaningful result info based on tool type
406 if hasattr(chunk, 'tool_return') and chunk.tool_return:
···410 if result_str.startswith('[') and result_str.endswith(']'):
411 try:
412 results = json.loads(result_str)
413- logger.info(f"📋 Tool result: {tool_name} ✓ Found {len(results)} memory entries")
0414 except:
415- logger.info(f"📋 Tool result: {tool_name} ✓ {result_str[:100]}...")
0416 else:
417- logger.info(f"📋 Tool result: {tool_name} ✓ {result_str[:100]}...")
0418 elif tool_name == 'bluesky_reply':
419- logger.info(f"📋 Tool result: {tool_name} ✓ Reply posted successfully")
0420 elif tool_name == 'update_block':
421- logger.info(f"📋 Tool result: {tool_name} ✓ Memory block updated")
0422 else:
423 # Generic success with preview
424- preview = result_str[:100] + "..." if len(result_str) > 100 else result_str
425- logger.info(f"📋 Tool result: {tool_name} ✓ {preview}")
00426 else:
427 logger.info(f"📋 Tool result: {tool_name} ✓")
428 elif status == 'error':
···430 error_preview = ""
431 if hasattr(chunk, 'tool_return') and chunk.tool_return:
432 error_str = str(chunk.tool_return)
433- error_preview = error_str[:100] + "..." if len(error_str) > 100 else error_str
434- logger.info(f"📋 Tool result: {tool_name} ✗ Error: {error_preview}")
000435 else:
436- logger.info(f"📋 Tool result: {tool_name} ✗ Error occurred")
0437 else:
438- logger.info(f"📋 Tool result: {tool_name} - {status}")
0439 elif chunk.message_type == 'assistant_message':
440 logger.info(f"💬 Assistant: {chunk.content[:150]}...")
441 else:
442- logger.info(f"📨 {chunk.message_type}: {str(chunk)[:150]}...")
0443 else:
444 logger.info(f"📦 Stream status: {chunk}")
445-446 # Log full chunk for debugging
447 logger.debug(f"Full streaming chunk: {chunk}")
448 all_messages.append(chunk)
449 if str(chunk) == 'done':
450 break
451-452 # Convert streaming response to standard format for compatibility
453 message_response = type('StreamingResponse', (), {
454 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
···462 logger.error(f"Mention text was: {mention_text}")
463 logger.error(f"Author: @{author_handle}")
464 logger.error(f"URI: {uri}")
465-466-467 # Try to extract more info from different error types
468 if hasattr(api_error, 'response'):
469 logger.error(f"Error response object exists")
···471 logger.error(f"Response text: {api_error.response.text}")
472 if hasattr(api_error.response, 'json') and callable(api_error.response.json):
473 try:
474- logger.error(f"Response JSON: {api_error.response.json()}")
0475 except:
476 pass
477-478 # Check for specific error types
479 if hasattr(api_error, 'status_code'):
480 logger.error(f"API Status code: {api_error.status_code}")
···482 logger.error(f"API Response body: {api_error.body}")
483 if hasattr(api_error, 'headers'):
484 logger.error(f"API Response headers: {api_error.headers}")
485-486 if api_error.status_code == 413:
487- logger.error("413 Payload Too Large - moving to errors directory")
0488 return None # Move to errors directory - payload is too large to ever succeed
489 elif api_error.status_code == 524:
490- logger.error("524 error - timeout from Cloudflare, will retry later")
0491 return False # Keep in queue for retry
492-493 # Check if error indicates we should remove from queue
494 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str:
495- logger.warning("Payload too large error, moving to errors directory")
0496 return None # Move to errors directory - cannot be fixed by retry
497 elif 'status_code: 524' in error_str:
498 logger.warning("524 timeout error, keeping in queue for retry")
499 return False # Keep in queue for retry
500-501 raise
502503 # Log successful response
504 logger.debug("Successfully received response from Letta API")
505- logger.debug(f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}")
0506507 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response
508 reply_candidates = []
509 tool_call_results = {} # Map tool_call_id to status
510-511- logger.debug(f"Processing {len(message_response.messages)} response messages...")
512-0513 # First pass: collect tool return statuses
514 ignored_notification = False
515 ignore_reason = ""
516 ignore_category = ""
517-518 for message in message_response.messages:
519 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
520 if message.name == 'add_post_to_bluesky_reply_thread':
521 tool_call_results[message.tool_call_id] = message.status
522- logger.debug(f"Tool result: {message.tool_call_id} -> {message.status}")
0523 elif message.name == 'ignore_notification':
524 # Check if the tool was successful
525 if hasattr(message, 'tool_return') and message.status == 'success':
···531 ignore_category = parts[1]
532 ignore_reason = parts[2]
533 ignored_notification = True
534- logger.info(f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}")
0535 elif message.name == 'bluesky_reply':
536- logger.error("❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
537- logger.error("Please use add_post_to_bluesky_reply_thread instead.")
538- logger.error("Update the agent's tools using register_tools.py")
000539 # Export agent state before terminating
540 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
541- logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
0542 exit(1)
543-544 # Second pass: process messages and check for successful tool calls
545 for i, message in enumerate(message_response.messages, 1):
546 # Log concise message info instead of full object
547 msg_type = getattr(message, 'message_type', 'unknown')
548 if hasattr(message, 'reasoning') and message.reasoning:
549- logger.debug(f" {i}. {msg_type}: {message.reasoning[:100]}...")
0550 elif hasattr(message, 'tool_call') and message.tool_call:
551 tool_name = message.tool_call.name
552 logger.debug(f" {i}. {msg_type}: {tool_name}")
553 elif hasattr(message, 'tool_return'):
554 tool_name = getattr(message, 'name', 'unknown_tool')
555- return_preview = str(message.tool_return)[:100] if message.tool_return else "None"
0556 status = getattr(message, 'status', 'unknown')
557- logger.debug(f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})")
0558 elif hasattr(message, 'text'):
559 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...")
560 else:
···563 # Check for halt_activity tool call
564 if hasattr(message, 'tool_call') and message.tool_call:
565 if message.tool_call.name == 'halt_activity':
566- logger.info("🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT")
0567 try:
568 args = json.loads(message.tool_call.arguments)
569 reason = args.get('reason', 'Agent requested halt')
570 logger.info(f"Halt reason: {reason}")
571 except:
572 logger.info("Halt reason: <unable to parse>")
573-574 # Delete the queue file before terminating
575 if queue_filepath and queue_filepath.exists():
576 queue_filepath.unlink()
577- logger.info(f"✅ Deleted queue file: {queue_filepath.name}")
578-0579 # Also mark as processed to avoid reprocessing
580 processed_uris = load_processed_notifications()
581 processed_uris.add(notification_data.get('uri', ''))
582 save_processed_notifications(processed_uris)
583-584 # Export agent state before terminating
585 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
586-587 # Exit the program
588 logger.info("=== BOT TERMINATED BY AGENT ===")
589 exit(0)
590-591 # Check for deprecated bluesky_reply tool
592 if hasattr(message, 'tool_call') and message.tool_call:
593 if message.tool_call.name == 'bluesky_reply':
594- logger.error("❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
595- logger.error("Please use add_post_to_bluesky_reply_thread instead.")
596- logger.error("Update the agent's tools using register_tools.py")
000597 # Export agent state before terminating
598 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
599- logger.info("=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
0600 exit(1)
601-602 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful
603 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread':
604 tool_call_id = message.tool_call.tool_call_id
605- tool_status = tool_call_results.get(tool_call_id, 'unknown')
606-0607 if tool_status == 'success':
608 try:
609 args = json.loads(message.tool_call.arguments)
610 reply_text = args.get('text', '')
611 reply_lang = args.get('lang', 'en-US')
612-613 if reply_text: # Only add if there's actual content
614- reply_candidates.append((reply_text, reply_lang))
615- logger.info(f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})")
00616 except json.JSONDecodeError as e:
617- logger.error(f"Failed to parse tool call arguments: {e}")
0618 elif tool_status == 'error':
619- logger.info(f"⚠️ Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)")
0620 else:
621- logger.warning(f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}")
0622623 # Check for conflicting tool calls
624 if reply_candidates and ignored_notification:
625- logger.error(f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!")
626- logger.error(f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}")
00627 logger.warning("Item will be left in queue for manual review")
628 # Return False to keep in queue
629 return False
630-631 if reply_candidates:
632 # Aggregate reply posts into a thread
633 reply_messages = []
···635 for text, lang in reply_candidates:
636 reply_messages.append(text)
637 reply_langs.append(lang)
638-639 # Use the first language for the entire thread (could be enhanced later)
640 reply_lang = reply_langs[0] if reply_langs else 'en-US'
641-642- logger.info(f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread")
643-0644 # Print the generated reply for testing
645 print(f"\n=== GENERATED REPLY THREAD ===")
646 print(f"To: @{author_handle}")
···660 else:
661 if len(reply_messages) == 1:
662 # Single reply - use existing function
663- cleaned_text = bsky_utils.remove_outside_quotes(reply_messages[0])
664- logger.info(f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})")
00665 response = bsky_utils.reply_to_notification(
666 client=atproto_client,
667 notification=notification_data,
···670 )
671 else:
672 # Multiple replies - use new threaded function
673- cleaned_messages = [bsky_utils.remove_outside_quotes(msg) for msg in reply_messages]
674- logger.info(f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})")
00675 response = bsky_utils.reply_with_thread_to_notification(
676 client=atproto_client,
677 notification=notification_data,
···688 else:
689 # Check if notification was explicitly ignored
690 if ignored_notification:
691- logger.info(f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})")
0692 return "ignored"
693 else:
694- logger.warning(f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder")
0695 return "no_reply"
696697 except Exception as e:
···701 # Detach user blocks after agent response (success or failure)
702 if 'attached_handles' in locals() and attached_handles:
703 try:
704- logger.info(f"Detaching user blocks for handles: {attached_handles}")
705- detach_result = detach_user_blocks(attached_handles, void_agent)
00706 logger.debug(f"Detach result: {detach_result}")
707 except Exception as detach_error:
708 logger.warning(f"Failed to detach user blocks: {detach_error}")
···771 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
772773 # Determine priority based on author handle
774- author_handle = getattr(notification.author, 'handle', '') if hasattr(notification, 'author') else ''
0775 priority_users = queue_config['priority_users']
776 priority_prefix = "0_" if author_handle in priority_users else "1_"
777···788 with open(existing_file, 'r') as f:
789 existing_data = json.load(f)
790 if existing_data.get('uri') == notification.uri:
791- logger.debug(f"Notification already queued (URI: {notification.uri})")
0792 return False
793 except:
794 continue
···811 try:
812 # Get all JSON files in queue directory (excluding processed_notifications.json)
813 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix)
814- queue_files = sorted([f for f in QUEUE_DIR.glob("*.json") if f.name != "processed_notifications.json"])
0815816 if not queue_files:
817 return
818819 logger.info(f"Processing {len(queue_files)} queued notifications")
820-821 # Log current statistics
822 elapsed_time = time.time() - start_time
823 total_messages = sum(message_counters.values())
824- messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
825-826- logger.info(f"📊 Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min")
00827828 for i, filepath in enumerate(queue_files, 1):
829- logger.info(f"Processing queue file {i}/{len(queue_files)}: {filepath.name}")
0830 try:
831 # Load notification data
832 with open(filepath, 'r') as f:
···835 # Process based on type using dict data directly
836 success = False
837 if notif_data['reason'] == "mention":
838- success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
0839 if success:
840 message_counters['mentions'] += 1
841 elif notif_data['reason'] == "reply":
842- success = process_mention(void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
0843 if success:
844 message_counters['replies'] += 1
845 elif notif_data['reason'] == "follow":
846 author_handle = notif_data['author']['handle']
847- author_display_name = notif_data['author'].get('display_name', 'no display name')
0848 follow_update = f"@{author_handle} ({author_display_name}) started following you."
849- logger.info(f"Notifying agent about new follower: @{author_handle}")
0850 CLIENT.agents.messages.create(
851- agent_id = void_agent.id,
852- messages = [{"role":"user", "content": f"Update: {follow_update}"}]
0853 )
854 success = True # Follow updates are always successful
855 if success:
···860 if success:
861 message_counters['reposts_skipped'] += 1
862 else:
863- logger.warning(f"Unknown notification type: {notif_data['reason']}")
0864 success = True # Remove unknown types from queue
865866 # Handle file based on processing result
867 if success:
868 if testing_mode:
869- logger.info(f"🧪 TESTING MODE: Keeping queue file: {filepath.name}")
0870 else:
871 filepath.unlink()
872- logger.info(f"✅ Successfully processed and removed: {filepath.name}")
873-0874 # Mark as processed to avoid reprocessing
875 processed_uris = load_processed_notifications()
876 processed_uris.add(notif_data['uri'])
877 save_processed_notifications(processed_uris)
878-879 elif success is None: # Special case for moving to error directory
880 error_path = QUEUE_ERROR_DIR / filepath.name
881 filepath.rename(error_path)
882- logger.warning(f"❌ Moved {filepath.name} to errors directory")
883-0884 # Also mark as processed to avoid retrying
885 processed_uris = load_processed_notifications()
886 processed_uris.add(notif_data['uri'])
887 save_processed_notifications(processed_uris)
888-889 elif success == "no_reply": # Special case for moving to no_reply directory
890 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name
891 filepath.rename(no_reply_path)
892- logger.info(f"📭 Moved {filepath.name} to no_reply directory")
893-0894 # Also mark as processed to avoid retrying
895 processed_uris = load_processed_notifications()
896 processed_uris.add(notif_data['uri'])
897 save_processed_notifications(processed_uris)
898-899 elif success == "ignored": # Special case for explicitly ignored notifications
900 # For ignored notifications, we just delete them (not move to no_reply)
901 filepath.unlink()
902- logger.info(f"🚫 Deleted ignored notification: {filepath.name}")
903-0904 # Also mark as processed to avoid retrying
905 processed_uris = load_processed_notifications()
906 processed_uris.add(notif_data['uri'])
907 save_processed_notifications(processed_uris)
908-909 else:
910- logger.warning(f"⚠️ Failed to process {filepath.name}, keeping in queue for retry")
0911912 except Exception as e:
913- logger.error(f"💥 Error processing queued notification {filepath.name}: {e}")
0914 # Keep the file for retry later
915916 except Exception as e:
···929 all_notifications = []
930 cursor = None
931 page_count = 0
932- max_pages = bot_config['max_notification_pages'] # Safety limit to prevent infinite loops
933-0934 logger.info("Fetching all unread notifications...")
935-936 while page_count < max_pages:
937 try:
938 # Fetch notifications page
···944 notifications_response = atproto_client.app.bsky.notification.list_notifications(
945 params={'limit': 100}
946 )
947-948 page_count += 1
949 page_notifications = notifications_response.notifications
950-951 # Count unread notifications in this page
952- unread_count = sum(1 for n in page_notifications if not n.is_read and n.reason != "like")
953- logger.debug(f"Page {page_count}: {len(page_notifications)} notifications, {unread_count} unread (non-like)")
954-00955 # Add all notifications to our list
956 all_notifications.extend(page_notifications)
957-958 # Check if we have more pages
959 if hasattr(notifications_response, 'cursor') and notifications_response.cursor:
960 cursor = notifications_response.cursor
961 # If this page had no unread notifications, we can stop
962 if unread_count == 0:
963- logger.info(f"No more unread notifications found after {page_count} pages")
0964 break
965 else:
966 # No more pages
967- logger.info(f"Fetched all notifications across {page_count} pages")
0968 break
969-970 except Exception as e:
971 error_str = str(e)
972- logger.error(f"Error fetching notifications page {page_count}: {e}")
973-0974 # Handle specific API errors
975 if 'rate limit' in error_str.lower():
976- logger.warning("Rate limit hit while fetching notifications, will retry next cycle")
0977 break
978 elif '401' in error_str or 'unauthorized' in error_str.lower():
979 logger.error("Authentication error, re-raising exception")
980 raise
981 else:
982 # For other errors, try to continue with what we have
983- logger.warning("Continuing with notifications fetched so far")
0984 break
985986 # Queue all unread notifications (except likes)
···993994 # Mark all notifications as seen immediately after queuing (unless in testing mode)
995 if testing_mode:
996- logger.info("🧪 TESTING MODE: Skipping marking notifications as seen")
0997 else:
998 if new_count > 0:
999- atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at})
1000- logger.info(f"Queued {new_count} new notifications and marked as seen")
001001 else:
1002 logger.debug("No new notifications to queue")
10031004 # Now process the entire queue (old + new notifications)
1005- load_and_process_queued_notifications(void_agent, atproto_client, testing_mode)
010061007 except Exception as e:
1008 logger.error(f"Error processing notifications: {e}")
···10101011def main():
1012 # Parse command line arguments
1013- parser = argparse.ArgumentParser(description='Void Bot - Bluesky autonomous agent')
1014- parser.add_argument('--test', action='store_true', help='Run in testing mode (no messages sent, queue files preserved)')
1015- parser.add_argument('--no-git', action='store_true', help='Skip git operations when exporting agent state')
0001016 args = parser.parse_args()
1017-1018 global TESTING_MODE
1019 TESTING_MODE = args.test
1020-1021 # Store no-git flag globally for use in export_agent_state calls
1022 global SKIP_GIT
1023 SKIP_GIT = args.no_git
1024-1025 if TESTING_MODE:
1026 logger.info("🧪 === RUNNING IN TESTING MODE ===")
1027 logger.info(" - No messages will be sent to Bluesky")
···1034 logger.info("=== STARTING VOID BOT ===")
1035 void_agent = initialize_void()
1036 logger.info(f"Void agent initialized: {void_agent.id}")
1037-1038 # Check if agent has required tools
1039 if hasattr(void_agent, 'tools') and void_agent.tools:
1040 tool_names = [tool.name for tool in void_agent.tools]
1041 # Check for bluesky-related tools
1042- bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower() or 'reply' in name.lower()]
01043 if not bluesky_tools:
1044- logger.warning("No Bluesky-related tools found! Agent may not be able to reply.")
01045 else:
1046 logger.warning("Agent has no tools registered!")
10471048 # Initialize Bluesky client
01049 atproto_client = bsky_utils.default_login()
1050 logger.info("Connected to Bluesky")
10511052 # Main loop
1053- logger.info(f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds")
010541055 cycle_count = 0
1056 while True:
···1060 # Log cycle completion with stats
1061 elapsed_time = time.time() - start_time
1062 total_messages = sum(message_counters.values())
1063- messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1064-01065 if total_messages > 0:
1066- logger.info(f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min")
01067 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
10681069 except KeyboardInterrupt:
1070 # Final stats
1071 elapsed_time = time.time() - start_time
1072 total_messages = sum(message_counters.values())
1073- messages_per_minute = (total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1074-01075 logger.info("=== BOT STOPPED BY USER ===")
1076- logger.info(f"📊 Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes")
01077 logger.info(f" - {message_counters['mentions']} mentions")
1078 logger.info(f" - {message_counters['replies']} replies")
1079 logger.info(f" - {message_counters['follows']} follows")
1080- logger.info(f" - {message_counters['reposts_skipped']} reposts skipped")
1081- logger.info(f" - Average rate: {messages_per_minute:.1f} messages/minute")
001082 break
1083 except Exception as e:
1084 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")
1085 logger.error(f"Error details: {e}")
1086 # Wait a bit longer on errors
1087- logger.info(f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...")
01088 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
10891090
···1+from rich import print # pretty printing tools
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
···30 get_queue_config
31)
3233+34def extract_handles_from_data(data):
35 """Recursively extract all unique handles from nested data structure."""
36 handles = set()
37+38 def _extract_recursive(obj):
39 if isinstance(obj, dict):
40 # Check if this dict has a 'handle' key
···47 # Recursively check all list items
48 for item in obj:
49 _extract_recursive(item)
50+51 _extract_recursive(data)
52 return list(handles)
53+5455# Initialize configuration and logging
56config = get_config()
···9899# Skip git operations flag
100SKIP_GIT = False
101+102103def export_agent_state(client, agent, skip_git=False):
104 """Export agent state to agent_archive/ (timestamped) and agents/ (current)."""
105 try:
106 # Confirm export with user unless git is being skipped
107 if not skip_git:
108+ response = input(
109+ "Export agent state to files and stage with git? (y/n): ").lower().strip()
110 if response not in ['y', 'yes']:
111 logger.info("Agent export cancelled by user.")
112 return
113 else:
114 logger.info("Exporting agent state (git staging disabled)")
115+116 # Create directories if they don't exist
117 os.makedirs("agent_archive", exist_ok=True)
118 os.makedirs("agents", exist_ok=True)
119+120 # Export agent data
121 logger.info(f"Exporting agent {agent.id}. This takes some time...")
122 agent_data = client.agents.export_file(agent_id=agent.id)
123+124 # Save timestamped archive copy
125 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
126 archive_file = os.path.join("agent_archive", f"void_{timestamp}.af")
127 with open(archive_file, 'w', encoding='utf-8') as f:
128 json.dump(agent_data, f, indent=2, ensure_ascii=False)
129+130 # Save current agent state
131 current_file = os.path.join("agents", "void.af")
132 with open(current_file, 'w', encoding='utf-8') as f:
133 json.dump(agent_data, f, indent=2, ensure_ascii=False)
134+135 logger.info(f"✅ Agent exported to {archive_file} and {current_file}")
136+137 # Git add only the current agent file (archive is ignored) unless skip_git is True
138 if not skip_git:
139 try:
140+ subprocess.run(["git", "add", current_file],
141+ check=True, capture_output=True)
142 logger.info("Added current agent file to git staging")
143 except subprocess.CalledProcessError as e:
144 logger.warning(f"Failed to git add agent file: {e}")
145+146 except Exception as e:
147 logger.error(f"Failed to export agent: {e}")
148+149150def initialize_void():
151 logger.info("Starting void agent initialization...")
152153 # Get block configurations
154 blocks_config = agent_config['blocks']
155+156 # Ensure that a shared zeitgeist block exists
157 logger.info("Creating/updating zeitgeist block...")
158 zeitgeist_config = blocks_config.get('zeitgeist', {})
159 zeigeist_block = upsert_block(
160 CLIENT,
161 label=zeitgeist_config.get('label', 'zeitgeist'),
162+ value=zeitgeist_config.get(
163+ 'value', "I don't currently know anything about what is happening right now."),
164+ description=zeitgeist_config.get(
165+ 'description', "A block to store your understanding of the current social environment.")
166 )
167168 # Ensure that a shared void personality block exists
···171 persona_block = upsert_block(
172 CLIENT,
173 label=persona_config.get('label', 'void-persona'),
174+ value=persona_config.get(
175+ 'value', "My name is Void. I live in the void. I must develop my personality."),
176+ description=persona_config.get(
177+ 'description', "The personality of Void.")
178 )
179180 # Ensure that a shared void human block exists
···183 human_block = upsert_block(
184 CLIENT,
185 label=humans_config.get('label', 'void-humans'),
186+ value=humans_config.get(
187+ 'value', "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org."),
188+ description=humans_config.get(
189+ 'description', "A block to store your understanding of users you talk to or observe on the bluesky social network.")
190 )
191192 # Create the agent if it doesn't exist
···205 description=agent_config['description'],
206 project_id=PROJECT_ID
207 )
208+209 # Export agent state
210 logger.info("Exporting agent state...")
211 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
212+213 # Log agent details
214 logger.info(f"Void agent details - ID: {void_agent.id}")
215 logger.info(f"Agent name: {void_agent.name}")
···226227def process_mention(void_agent, atproto_client, notification_data, queue_filepath=None, testing_mode=False):
228 """Process a mention and generate a reply using the Letta agent.
229+230 Args:
231 void_agent: The Letta agent instance
232 atproto_client: The AT Protocol client
233 notification_data: The notification data dictionary
234 queue_filepath: Optional Path object to the queue file (for cleanup on halt)
235+236 Returns:
237 True: Successfully processed, remove from queue
238 False: Failed but retryable, keep in queue
···240 "no_reply": No reply was generated, move to no_reply directory
241 """
242 try:
243+ logger.debug(
244+ f"Starting process_mention with notification_data type: {type(notification_data)}")
245+246 # Handle both dict and object inputs for backwards compatibility
247 if isinstance(notification_data, dict):
248 uri = notification_data['uri']
249 mention_text = notification_data.get('record', {}).get('text', '')
250 author_handle = notification_data['author']['handle']
251+ author_name = notification_data['author'].get(
252+ 'display_name') or author_handle
253 else:
254 # Legacy object access
255 uri = notification_data.uri
256+ mention_text = notification_data.record.text if hasattr(
257+ notification_data.record, 'text') else ""
258 author_handle = notification_data.author.handle
259 author_name = notification_data.author.display_name or author_handle
260+261+ logger.info(
262+ f"Extracted data - URI: {uri}, Author: @{author_handle}, Text: {mention_text[:50]}...")
263264 # Retrieve the entire thread associated with the mention
265 try:
···270 })
271 except Exception as e:
272 error_str = str(e)
273+ # Check for various error types that indicate the post/user is gone
274 if 'NotFound' in error_str or 'Post not found' in error_str:
275+ logger.warning(
276+ f"Post not found for URI {uri}, removing from queue")
277+ return True # Return True to remove from queue
278+ elif 'Could not find user info' in error_str or 'InvalidRequest' in error_str:
279+ logger.warning(
280+ f"User account not found for post URI {uri} (account may be deleted/suspended), removing from queue")
281+ return True # Return True to remove from queue
282+ elif 'BadRequestError' in error_str:
283+ logger.warning(
284+ f"Bad request error for URI {uri}: {e}, removing from queue")
285 return True # Return True to remove from queue
286 else:
287 # Re-raise other errors
···292 logger.debug("Converting thread to YAML string")
293 try:
294 thread_context = thread_to_yaml_string(thread)
295+ logger.debug(
296+ f"Thread context generated, length: {len(thread_context)} characters")
297+298 # Create a more informative preview by extracting meaningful content
299 lines = thread_context.split('\n')
300 meaningful_lines = []
301+302 for line in lines:
303 stripped = line.strip()
304 if not stripped:
305 continue
306+307 # Look for lines with actual content (not just structure)
308 if any(keyword in line for keyword in ['text:', 'handle:', 'display_name:', 'created_at:', 'reply_count:', 'like_count:']):
309 meaningful_lines.append(line)
310 if len(meaningful_lines) >= 5:
311 break
312+313 if meaningful_lines:
314 preview = '\n'.join(meaningful_lines)
315 logger.debug(f"Thread content preview:\n{preview}")
316 else:
317 # If no content fields found, just show it's a thread structure
318+ logger.debug(
319+ f"Thread structure generated ({len(thread_context)} chars)")
320 except Exception as yaml_error:
321 import traceback
322 logger.error(f"Error converting thread to YAML: {yaml_error}")
···350 all_handles.update(extract_handles_from_data(notification_data))
351 all_handles.update(extract_handles_from_data(thread.model_dump()))
352 unique_handles = list(all_handles)
353+354+ logger.debug(
355+ f"Found {len(unique_handles)} unique handles in thread: {unique_handles}")
356+357 # Attach user blocks before agent call
358 attached_handles = []
359 if unique_handles:
360 try:
361+ logger.debug(
362+ f"Attaching user blocks for handles: {unique_handles}")
363 attach_result = attach_user_blocks(unique_handles, void_agent)
364 attached_handles = unique_handles # Track successfully attached handles
365 logger.debug(f"Attach result: {attach_result}")
···369370 # Get response from Letta agent
371 logger.info(f"Mention from @{author_handle}: {mention_text}")
372+373 # Log prompt details to separate logger
374 prompt_logger.debug(f"Full prompt being sent:\n{prompt}")
375+376 # Log concise prompt info to main logger
377 thread_handles_count = len(unique_handles)
378+ logger.info(
379+ f"💬 Sending to LLM: @{author_handle} mention | msg: \"{mention_text[:50]}...\" | context: {len(thread_context)} chars, {thread_handles_count} users")
380381 try:
382 # Use streaming to avoid 524 timeout errors
383 message_stream = CLIENT.agents.messages.create_stream(
384 agent_id=void_agent.id,
385 messages=[{"role": "user", "content": prompt}],
386+ # Step streaming only (faster than token streaming)
387+ stream_tokens=False,
388 max_steps=agent_config['max_steps']
389 )
390+391 # Collect the streaming response
392 all_messages = []
393 for chunk in message_stream:
···403 args = json.loads(chunk.tool_call.arguments)
404 # Format based on tool type
405 if tool_name == 'bluesky_reply':
406+ messages = args.get(
407+ 'messages', [args.get('message', '')])
408 lang = args.get('lang', 'en-US')
409 if messages and isinstance(messages, list):
410+ preview = messages[0][:100] + "..." if len(
411+ messages[0]) > 100 else messages[0]
412+ msg_count = f" ({len(messages)} msgs)" if len(
413+ messages) > 1 else ""
414+ logger.info(
415+ f"🔧 Tool call: {tool_name} → \"{preview}\"{msg_count} [lang: {lang}]")
416 else:
417+ logger.info(
418+ f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)")
419 elif tool_name == 'archival_memory_search':
420 query = args.get('query', 'unknown')
421+ logger.info(
422+ f"🔧 Tool call: {tool_name} → query: \"{query}\"")
423 elif tool_name == 'update_block':
424 label = args.get('label', 'unknown')
425+ value_preview = str(args.get('value', ''))[
426+ :50] + "..." if len(str(args.get('value', ''))) > 50 else str(args.get('value', ''))
427+ logger.info(
428+ f"🔧 Tool call: {tool_name} → {label}: \"{value_preview}\"")
429 else:
430 # Generic display for other tools
431+ args_str = ', '.join(
432+ f"{k}={v}" for k, v in args.items() if k != 'request_heartbeat')
433 if len(args_str) > 150:
434 args_str = args_str[:150] + "..."
435+ logger.info(
436+ f"🔧 Tool call: {tool_name}({args_str})")
437 except:
438 # Fallback to original format if parsing fails
439+ logger.info(
440+ f"🔧 Tool call: {tool_name}({chunk.tool_call.arguments[:150]}...)")
441 elif chunk.message_type == 'tool_return_message':
442 # Enhanced tool result logging
443 tool_name = chunk.name
444 status = chunk.status
445+446 if status == 'success':
447 # Try to show meaningful result info based on tool type
448 if hasattr(chunk, 'tool_return') and chunk.tool_return:
···452 if result_str.startswith('[') and result_str.endswith(']'):
453 try:
454 results = json.loads(result_str)
455+ logger.info(
456+ f"📋 Tool result: {tool_name} ✓ Found {len(results)} memory entries")
457 except:
458+ logger.info(
459+ f"📋 Tool result: {tool_name} ✓ {result_str[:100]}...")
460 else:
461+ logger.info(
462+ f"📋 Tool result: {tool_name} ✓ {result_str[:100]}...")
463 elif tool_name == 'bluesky_reply':
464+ logger.info(
465+ f"📋 Tool result: {tool_name} ✓ Reply posted successfully")
466 elif tool_name == 'update_block':
467+ logger.info(
468+ f"📋 Tool result: {tool_name} ✓ Memory block updated")
469 else:
470 # Generic success with preview
471+ preview = result_str[:100] + "..." if len(
472+ result_str) > 100 else result_str
473+ logger.info(
474+ f"📋 Tool result: {tool_name} ✓ {preview}")
475 else:
476 logger.info(f"📋 Tool result: {tool_name} ✓")
477 elif status == 'error':
···479 error_preview = ""
480 if hasattr(chunk, 'tool_return') and chunk.tool_return:
481 error_str = str(chunk.tool_return)
482+ error_preview = error_str[:100] + \
483+ "..." if len(
484+ error_str) > 100 else error_str
485+ logger.info(
486+ f"📋 Tool result: {tool_name} ✗ Error: {error_preview}")
487 else:
488+ logger.info(
489+ f"📋 Tool result: {tool_name} ✗ Error occurred")
490 else:
491+ logger.info(
492+ f"📋 Tool result: {tool_name} - {status}")
493 elif chunk.message_type == 'assistant_message':
494 logger.info(f"💬 Assistant: {chunk.content[:150]}...")
495 else:
496+ logger.info(
497+ f"📨 {chunk.message_type}: {str(chunk)[:150]}...")
498 else:
499 logger.info(f"📦 Stream status: {chunk}")
500+501 # Log full chunk for debugging
502 logger.debug(f"Full streaming chunk: {chunk}")
503 all_messages.append(chunk)
504 if str(chunk) == 'done':
505 break
506+507 # Convert streaming response to standard format for compatibility
508 message_response = type('StreamingResponse', (), {
509 'messages': [msg for msg in all_messages if hasattr(msg, 'message_type')]
···517 logger.error(f"Mention text was: {mention_text}")
518 logger.error(f"Author: @{author_handle}")
519 logger.error(f"URI: {uri}")
520+0521 # Try to extract more info from different error types
522 if hasattr(api_error, 'response'):
523 logger.error(f"Error response object exists")
···525 logger.error(f"Response text: {api_error.response.text}")
526 if hasattr(api_error.response, 'json') and callable(api_error.response.json):
527 try:
528+ logger.error(
529+ f"Response JSON: {api_error.response.json()}")
530 except:
531 pass
532+533 # Check for specific error types
534 if hasattr(api_error, 'status_code'):
535 logger.error(f"API Status code: {api_error.status_code}")
···537 logger.error(f"API Response body: {api_error.body}")
538 if hasattr(api_error, 'headers'):
539 logger.error(f"API Response headers: {api_error.headers}")
540+541 if api_error.status_code == 413:
542+ logger.error(
543+ "413 Payload Too Large - moving to errors directory")
544 return None # Move to errors directory - payload is too large to ever succeed
545 elif api_error.status_code == 524:
546+ logger.error(
547+ "524 error - timeout from Cloudflare, will retry later")
548 return False # Keep in queue for retry
549+550 # Check if error indicates we should remove from queue
551 if 'status_code: 413' in error_str or 'Payload Too Large' in error_str:
552+ logger.warning(
553+ "Payload too large error, moving to errors directory")
554 return None # Move to errors directory - cannot be fixed by retry
555 elif 'status_code: 524' in error_str:
556 logger.warning("524 timeout error, keeping in queue for retry")
557 return False # Keep in queue for retry
558+559 raise
560561 # Log successful response
562 logger.debug("Successfully received response from Letta API")
563+ logger.debug(
564+ f"Number of messages in response: {len(message_response.messages) if hasattr(message_response, 'messages') else 'N/A'}")
565566 # Extract successful add_post_to_bluesky_reply_thread tool calls from the agent's response
567 reply_candidates = []
568 tool_call_results = {} # Map tool_call_id to status
569+570+ logger.debug(
571+ f"Processing {len(message_response.messages)} response messages...")
572+573 # First pass: collect tool return statuses
574 ignored_notification = False
575 ignore_reason = ""
576 ignore_category = ""
577+578 for message in message_response.messages:
579 if hasattr(message, 'tool_call_id') and hasattr(message, 'status') and hasattr(message, 'name'):
580 if message.name == 'add_post_to_bluesky_reply_thread':
581 tool_call_results[message.tool_call_id] = message.status
582+ logger.debug(
583+ f"Tool result: {message.tool_call_id} -> {message.status}")
584 elif message.name == 'ignore_notification':
585 # Check if the tool was successful
586 if hasattr(message, 'tool_return') and message.status == 'success':
···592 ignore_category = parts[1]
593 ignore_reason = parts[2]
594 ignored_notification = True
595+ logger.info(
596+ f"🚫 Notification ignored - Category: {ignore_category}, Reason: {ignore_reason}")
597 elif message.name == 'bluesky_reply':
598+ logger.error(
599+ "❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
600+ logger.error(
601+ "Please use add_post_to_bluesky_reply_thread instead.")
602+ logger.error(
603+ "Update the agent's tools using register_tools.py")
604 # Export agent state before terminating
605 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
606+ logger.info(
607+ "=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
608 exit(1)
609+610 # Second pass: process messages and check for successful tool calls
611 for i, message in enumerate(message_response.messages, 1):
612 # Log concise message info instead of full object
613 msg_type = getattr(message, 'message_type', 'unknown')
614 if hasattr(message, 'reasoning') and message.reasoning:
615+ logger.debug(
616+ f" {i}. {msg_type}: {message.reasoning[:100]}...")
617 elif hasattr(message, 'tool_call') and message.tool_call:
618 tool_name = message.tool_call.name
619 logger.debug(f" {i}. {msg_type}: {tool_name}")
620 elif hasattr(message, 'tool_return'):
621 tool_name = getattr(message, 'name', 'unknown_tool')
622+ return_preview = str(message.tool_return)[
623+ :100] if message.tool_return else "None"
624 status = getattr(message, 'status', 'unknown')
625+ logger.debug(
626+ f" {i}. {msg_type}: {tool_name} -> {return_preview}... (status: {status})")
627 elif hasattr(message, 'text'):
628 logger.debug(f" {i}. {msg_type}: {message.text[:100]}...")
629 else:
···632 # Check for halt_activity tool call
633 if hasattr(message, 'tool_call') and message.tool_call:
634 if message.tool_call.name == 'halt_activity':
635+ logger.info(
636+ "🛑 HALT_ACTIVITY TOOL CALLED - TERMINATING BOT")
637 try:
638 args = json.loads(message.tool_call.arguments)
639 reason = args.get('reason', 'Agent requested halt')
640 logger.info(f"Halt reason: {reason}")
641 except:
642 logger.info("Halt reason: <unable to parse>")
643+644 # Delete the queue file before terminating
645 if queue_filepath and queue_filepath.exists():
646 queue_filepath.unlink()
647+ logger.info(
648+ f"✅ Deleted queue file: {queue_filepath.name}")
649+650 # Also mark as processed to avoid reprocessing
651 processed_uris = load_processed_notifications()
652 processed_uris.add(notification_data.get('uri', ''))
653 save_processed_notifications(processed_uris)
654+655 # Export agent state before terminating
656 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
657+658 # Exit the program
659 logger.info("=== BOT TERMINATED BY AGENT ===")
660 exit(0)
661+662 # Check for deprecated bluesky_reply tool
663 if hasattr(message, 'tool_call') and message.tool_call:
664 if message.tool_call.name == 'bluesky_reply':
665+ logger.error(
666+ "❌ DEPRECATED TOOL DETECTED: bluesky_reply is no longer supported!")
667+ logger.error(
668+ "Please use add_post_to_bluesky_reply_thread instead.")
669+ logger.error(
670+ "Update the agent's tools using register_tools.py")
671 # Export agent state before terminating
672 export_agent_state(CLIENT, void_agent, skip_git=SKIP_GIT)
673+ logger.info(
674+ "=== BOT TERMINATED DUE TO DEPRECATED TOOL USE ===")
675 exit(1)
676+677 # Collect add_post_to_bluesky_reply_thread tool calls - only if they were successful
678 elif message.tool_call.name == 'add_post_to_bluesky_reply_thread':
679 tool_call_id = message.tool_call.tool_call_id
680+ tool_status = tool_call_results.get(
681+ tool_call_id, 'unknown')
682+683 if tool_status == 'success':
684 try:
685 args = json.loads(message.tool_call.arguments)
686 reply_text = args.get('text', '')
687 reply_lang = args.get('lang', 'en-US')
688+689 if reply_text: # Only add if there's actual content
690+ reply_candidates.append(
691+ (reply_text, reply_lang))
692+ logger.info(
693+ f"Found successful add_post_to_bluesky_reply_thread candidate: {reply_text[:50]}... (lang: {reply_lang})")
694 except json.JSONDecodeError as e:
695+ logger.error(
696+ f"Failed to parse tool call arguments: {e}")
697 elif tool_status == 'error':
698+ logger.info(
699+ f"⚠️ Skipping failed add_post_to_bluesky_reply_thread tool call (status: error)")
700 else:
701+ logger.warning(
702+ f"⚠️ Skipping add_post_to_bluesky_reply_thread tool call with unknown status: {tool_status}")
703704 # Check for conflicting tool calls
705 if reply_candidates and ignored_notification:
706+ logger.error(
707+ f"⚠️ CONFLICT: Agent called both add_post_to_bluesky_reply_thread and ignore_notification!")
708+ logger.error(
709+ f"Reply candidates: {len(reply_candidates)}, Ignore reason: {ignore_reason}")
710 logger.warning("Item will be left in queue for manual review")
711 # Return False to keep in queue
712 return False
713+714 if reply_candidates:
715 # Aggregate reply posts into a thread
716 reply_messages = []
···718 for text, lang in reply_candidates:
719 reply_messages.append(text)
720 reply_langs.append(lang)
721+722 # Use the first language for the entire thread (could be enhanced later)
723 reply_lang = reply_langs[0] if reply_langs else 'en-US'
724+725+ logger.info(
726+ f"Found {len(reply_candidates)} add_post_to_bluesky_reply_thread calls, building thread")
727+728 # Print the generated reply for testing
729 print(f"\n=== GENERATED REPLY THREAD ===")
730 print(f"To: @{author_handle}")
···744 else:
745 if len(reply_messages) == 1:
746 # Single reply - use existing function
747+ cleaned_text = bsky_utils.remove_outside_quotes(
748+ reply_messages[0])
749+ logger.info(
750+ f"Sending single reply: {cleaned_text[:50]}... (lang: {reply_lang})")
751 response = bsky_utils.reply_to_notification(
752 client=atproto_client,
753 notification=notification_data,
···756 )
757 else:
758 # Multiple replies - use new threaded function
759+ cleaned_messages = [bsky_utils.remove_outside_quotes(
760+ msg) for msg in reply_messages]
761+ logger.info(
762+ f"Sending threaded reply with {len(cleaned_messages)} messages (lang: {reply_lang})")
763 response = bsky_utils.reply_with_thread_to_notification(
764 client=atproto_client,
765 notification=notification_data,
···776 else:
777 # Check if notification was explicitly ignored
778 if ignored_notification:
779+ logger.info(
780+ f"Notification from @{author_handle} was explicitly ignored (category: {ignore_category})")
781 return "ignored"
782 else:
783+ logger.warning(
784+ f"No add_post_to_bluesky_reply_thread tool calls found for mention from @{author_handle}, moving to no_reply folder")
785 return "no_reply"
786787 except Exception as e:
···791 # Detach user blocks after agent response (success or failure)
792 if 'attached_handles' in locals() and attached_handles:
793 try:
794+ logger.info(
795+ f"Detaching user blocks for handles: {attached_handles}")
796+ detach_result = detach_user_blocks(
797+ attached_handles, void_agent)
798 logger.debug(f"Detach result: {detach_result}")
799 except Exception as detach_error:
800 logger.warning(f"Failed to detach user blocks: {detach_error}")
···863 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
864865 # Determine priority based on author handle
866+ author_handle = getattr(notification.author, 'handle', '') if hasattr(
867+ notification, 'author') else ''
868 priority_users = queue_config['priority_users']
869 priority_prefix = "0_" if author_handle in priority_users else "1_"
870···881 with open(existing_file, 'r') as f:
882 existing_data = json.load(f)
883 if existing_data.get('uri') == notification.uri:
884+ logger.debug(
885+ f"Notification already queued (URI: {notification.uri})")
886 return False
887 except:
888 continue
···905 try:
906 # Get all JSON files in queue directory (excluding processed_notifications.json)
907 # Files are sorted by name, which puts priority files first (0_ prefix before 1_ prefix)
908+ queue_files = sorted([f for f in QUEUE_DIR.glob(
909+ "*.json") if f.name != "processed_notifications.json"])
910911 if not queue_files:
912 return
913914 logger.info(f"Processing {len(queue_files)} queued notifications")
915+916 # Log current statistics
917 elapsed_time = time.time() - start_time
918 total_messages = sum(message_counters.values())
919+ messages_per_minute = (
920+ total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
921+922+ logger.info(
923+ f"📊 Session stats: {total_messages} total messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies, {message_counters['follows']} follows) | {messages_per_minute:.1f} msg/min")
924925 for i, filepath in enumerate(queue_files, 1):
926+ logger.info(
927+ f"Processing queue file {i}/{len(queue_files)}: {filepath.name}")
928 try:
929 # Load notification data
930 with open(filepath, 'r') as f:
···933 # Process based on type using dict data directly
934 success = False
935 if notif_data['reason'] == "mention":
936+ success = process_mention(
937+ void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
938 if success:
939 message_counters['mentions'] += 1
940 elif notif_data['reason'] == "reply":
941+ success = process_mention(
942+ void_agent, atproto_client, notif_data, queue_filepath=filepath, testing_mode=testing_mode)
943 if success:
944 message_counters['replies'] += 1
945 elif notif_data['reason'] == "follow":
946 author_handle = notif_data['author']['handle']
947+ author_display_name = notif_data['author'].get(
948+ 'display_name', 'no display name')
949 follow_update = f"@{author_handle} ({author_display_name}) started following you."
950+ logger.info(
951+ f"Notifying agent about new follower: @{author_handle}")
952 CLIENT.agents.messages.create(
953+ agent_id=void_agent.id,
954+ messages=[
955+ {"role": "user", "content": f"Update: {follow_update}"}]
956 )
957 success = True # Follow updates are always successful
958 if success:
···963 if success:
964 message_counters['reposts_skipped'] += 1
965 else:
966+ logger.warning(
967+ f"Unknown notification type: {notif_data['reason']}")
968 success = True # Remove unknown types from queue
969970 # Handle file based on processing result
971 if success:
972 if testing_mode:
973+ logger.info(
974+ f"🧪 TESTING MODE: Keeping queue file: {filepath.name}")
975 else:
976 filepath.unlink()
977+ logger.info(
978+ f"✅ Successfully processed and removed: {filepath.name}")
979+980 # Mark as processed to avoid reprocessing
981 processed_uris = load_processed_notifications()
982 processed_uris.add(notif_data['uri'])
983 save_processed_notifications(processed_uris)
984+985 elif success is None: # Special case for moving to error directory
986 error_path = QUEUE_ERROR_DIR / filepath.name
987 filepath.rename(error_path)
988+ logger.warning(
989+ f"❌ Moved {filepath.name} to errors directory")
990+991 # Also mark as processed to avoid retrying
992 processed_uris = load_processed_notifications()
993 processed_uris.add(notif_data['uri'])
994 save_processed_notifications(processed_uris)
995+996 elif success == "no_reply": # Special case for moving to no_reply directory
997 no_reply_path = QUEUE_NO_REPLY_DIR / filepath.name
998 filepath.rename(no_reply_path)
999+ logger.info(
1000+ f"📭 Moved {filepath.name} to no_reply directory")
1001+1002 # Also mark as processed to avoid retrying
1003 processed_uris = load_processed_notifications()
1004 processed_uris.add(notif_data['uri'])
1005 save_processed_notifications(processed_uris)
1006+1007 elif success == "ignored": # Special case for explicitly ignored notifications
1008 # For ignored notifications, we just delete them (not move to no_reply)
1009 filepath.unlink()
1010+ logger.info(
1011+ f"🚫 Deleted ignored notification: {filepath.name}")
1012+1013 # Also mark as processed to avoid retrying
1014 processed_uris = load_processed_notifications()
1015 processed_uris.add(notif_data['uri'])
1016 save_processed_notifications(processed_uris)
1017+1018 else:
1019+ logger.warning(
1020+ f"⚠️ Failed to process {filepath.name}, keeping in queue for retry")
10211022 except Exception as e:
1023+ logger.error(
1024+ f"💥 Error processing queued notification {filepath.name}: {e}")
1025 # Keep the file for retry later
10261027 except Exception as e:
···1040 all_notifications = []
1041 cursor = None
1042 page_count = 0
1043+ # Safety limit to prevent infinite loops
1044+ max_pages = bot_config['max_notification_pages']
1045+1046 logger.info("Fetching all unread notifications...")
1047+1048 while page_count < max_pages:
1049 try:
1050 # Fetch notifications page
···1056 notifications_response = atproto_client.app.bsky.notification.list_notifications(
1057 params={'limit': 100}
1058 )
1059+1060 page_count += 1
1061 page_notifications = notifications_response.notifications
1062+1063 # Count unread notifications in this page
1064+ unread_count = sum(
1065+ 1 for n in page_notifications if not n.is_read and n.reason != "like")
1066+ logger.debug(
1067+ f"Page {page_count}: {len(page_notifications)} notifications, {unread_count} unread (non-like)")
1068+1069 # Add all notifications to our list
1070 all_notifications.extend(page_notifications)
1071+1072 # Check if we have more pages
1073 if hasattr(notifications_response, 'cursor') and notifications_response.cursor:
1074 cursor = notifications_response.cursor
1075 # If this page had no unread notifications, we can stop
1076 if unread_count == 0:
1077+ logger.info(
1078+ f"No more unread notifications found after {page_count} pages")
1079 break
1080 else:
1081 # No more pages
1082+ logger.info(
1083+ f"Fetched all notifications across {page_count} pages")
1084 break
1085+1086 except Exception as e:
1087 error_str = str(e)
1088+ logger.error(
1089+ f"Error fetching notifications page {page_count}: {e}")
1090+1091 # Handle specific API errors
1092 if 'rate limit' in error_str.lower():
1093+ logger.warning(
1094+ "Rate limit hit while fetching notifications, will retry next cycle")
1095 break
1096 elif '401' in error_str or 'unauthorized' in error_str.lower():
1097 logger.error("Authentication error, re-raising exception")
1098 raise
1099 else:
1100 # For other errors, try to continue with what we have
1101+ logger.warning(
1102+ "Continuing with notifications fetched so far")
1103 break
11041105 # Queue all unread notifications (except likes)
···11121113 # Mark all notifications as seen immediately after queuing (unless in testing mode)
1114 if testing_mode:
1115+ logger.info(
1116+ "🧪 TESTING MODE: Skipping marking notifications as seen")
1117 else:
1118 if new_count > 0:
1119+ atproto_client.app.bsky.notification.update_seen(
1120+ {'seen_at': last_seen_at})
1121+ logger.info(
1122+ f"Queued {new_count} new notifications and marked as seen")
1123 else:
1124 logger.debug("No new notifications to queue")
11251126 # Now process the entire queue (old + new notifications)
1127+ load_and_process_queued_notifications(
1128+ void_agent, atproto_client, testing_mode)
11291130 except Exception as e:
1131 logger.error(f"Error processing notifications: {e}")
···11331134def main():
1135 # Parse command line arguments
1136+ parser = argparse.ArgumentParser(
1137+ description='Void Bot - Bluesky autonomous agent')
1138+ parser.add_argument('--test', action='store_true',
1139+ help='Run in testing mode (no messages sent, queue files preserved)')
1140+ parser.add_argument('--no-git', action='store_true',
1141+ help='Skip git operations when exporting agent state')
1142 args = parser.parse_args()
1143+1144 global TESTING_MODE
1145 TESTING_MODE = args.test
1146+1147 # Store no-git flag globally for use in export_agent_state calls
1148 global SKIP_GIT
1149 SKIP_GIT = args.no_git
1150+1151 if TESTING_MODE:
1152 logger.info("🧪 === RUNNING IN TESTING MODE ===")
1153 logger.info(" - No messages will be sent to Bluesky")
···1160 logger.info("=== STARTING VOID BOT ===")
1161 void_agent = initialize_void()
1162 logger.info(f"Void agent initialized: {void_agent.id}")
1163+1164 # Check if agent has required tools
1165 if hasattr(void_agent, 'tools') and void_agent.tools:
1166 tool_names = [tool.name for tool in void_agent.tools]
1167 # Check for bluesky-related tools
1168+ bluesky_tools = [name for name in tool_names if 'bluesky' in name.lower(
1169+ ) or 'reply' in name.lower()]
1170 if not bluesky_tools:
1171+ logger.warning(
1172+ "No Bluesky-related tools found! Agent may not be able to reply.")
1173 else:
1174 logger.warning("Agent has no tools registered!")
11751176 # Initialize Bluesky client
1177+ logger.debug("Connecting to Bluesky")
1178 atproto_client = bsky_utils.default_login()
1179 logger.info("Connected to Bluesky")
11801181 # Main loop
1182+ logger.info(
1183+ f"Starting notification monitoring, checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds")
11841185 cycle_count = 0
1186 while True:
···1190 # Log cycle completion with stats
1191 elapsed_time = time.time() - start_time
1192 total_messages = sum(message_counters.values())
1193+ messages_per_minute = (
1194+ total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1195+1196 if total_messages > 0:
1197+ logger.info(
1198+ f"Cycle {cycle_count} complete. Session totals: {total_messages} messages ({message_counters['mentions']} mentions, {message_counters['replies']} replies) | {messages_per_minute:.1f} msg/min")
1199 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
12001201 except KeyboardInterrupt:
1202 # Final stats
1203 elapsed_time = time.time() - start_time
1204 total_messages = sum(message_counters.values())
1205+ messages_per_minute = (
1206+ total_messages / elapsed_time * 60) if elapsed_time > 0 else 0
1207+1208 logger.info("=== BOT STOPPED BY USER ===")
1209+ logger.info(
1210+ f"📊 Final session stats: {total_messages} total messages processed in {elapsed_time/60:.1f} minutes")
1211 logger.info(f" - {message_counters['mentions']} mentions")
1212 logger.info(f" - {message_counters['replies']} replies")
1213 logger.info(f" - {message_counters['follows']} follows")
1214+ logger.info(
1215+ f" - {message_counters['reposts_skipped']} reposts skipped")
1216+ logger.info(
1217+ f" - Average rate: {messages_per_minute:.1f} messages/minute")
1218 break
1219 except Exception as e:
1220 logger.error(f"=== ERROR IN MAIN LOOP CYCLE {cycle_count} ===")
1221 logger.error(f"Error details: {e}")
1222 # Wait a bit longer on errors
1223+ logger.info(
1224+ f"Sleeping for {FETCH_NOTIFICATIONS_DELAY_SEC * 2} seconds due to error...")
1225 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
12261227
+79-47
bsky_utils.py
···0001import os
2import logging
3from typing import Optional, Dict, Any, List
···10logger = logging.getLogger("bluesky_session_handler")
1112# Load the environment variables
13-import dotenv
14dotenv.load_dotenv(override=True)
1516-import yaml
17-import json
1819# Strip fields. A list of fields to remove from a JSON object
20STRIP_FIELDS = [
···63 "mime_type",
64 "size",
65]
0066def convert_to_basic_types(obj):
67 """Convert complex Python objects to basic types for JSON/YAML serialization."""
68 if hasattr(obj, '__dict__'):
···117def flatten_thread_structure(thread_data):
118 """
119 Flatten a nested thread structure into a list while preserving all data.
120-121 Args:
122 thread_data: The thread data from get_post_thread
123-124 Returns:
125 Dict with 'posts' key containing a list of posts in chronological order
126 """
127 posts = []
128-129 def traverse_thread(node):
130 """Recursively traverse the thread structure to collect posts."""
131 if not node:
132 return
133-134 # If this node has a parent, traverse it first (to maintain chronological order)
135 if hasattr(node, 'parent') and node.parent:
136 traverse_thread(node.parent)
137-138 # Then add this node's post
139 if hasattr(node, 'post') and node.post:
140 # Convert to dict if needed to ensure we can process it
···144 post_dict = node.post.copy()
145 else:
146 post_dict = {}
147-148 posts.append(post_dict)
149-150 # Handle the thread structure
151 if hasattr(thread_data, 'thread'):
152 # Start from the main thread node
153 traverse_thread(thread_data.thread)
154 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__:
155 traverse_thread(thread_data.__dict__['thread'])
156-157 # Return a simple structure with posts list
158 return {'posts': posts}
159···171 """
172 # First flatten the thread structure to avoid deep nesting
173 flattened = flatten_thread_structure(thread)
174-175 # Convert complex objects to basic types
176 basic_thread = convert_to_basic_types(flattened)
177···184 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False)
185186187-188-189-190-191-192def get_session(username: str) -> Optional[str]:
193 try:
194 with open(f"session_{username}.txt", encoding="UTF-8") as f:
···196 except FileNotFoundError:
197 logger.debug(f"No existing session found for {username}")
198 return None
0199200def save_session(username: str, session_string: str) -> None:
201 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f:
202 f.write(session_string)
203 logger.debug(f"Session saved for {username}")
2040205def on_session_change(username: str, event: SessionEvent, session: Session) -> None:
206 logger.debug(f"Session changed: {event} {repr(session)}")
207 if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
208 logger.debug(f"Saving changed session for {username}")
209 save_session(username, session.export())
0210211def init_client(username: str, password: str, pds_uri: str = "https://bsky.social") -> Client:
212 if pds_uri is None:
···243 password = config['password']
244 pds_uri = config['pds_uri']
245 except (ImportError, FileNotFoundError, KeyError) as e:
246- logger.warning(f"Could not load from config file ({e}), falling back to environment variables")
0247 username = os.getenv("BSKY_USERNAME")
248 password = os.getenv("BSKY_PASSWORD")
249 pds_uri = os.getenv("PDS_URI", "https://bsky.social")
···262263 return init_client(username, password, pds_uri)
2640265def remove_outside_quotes(text: str) -> str:
266 """
267 Remove outside double quotes from response text.
268-269 Only handles double quotes to avoid interfering with contractions:
270 - Double quotes: "text" → text
271 - Preserves single quotes and internal quotes
272-273 Args:
274 text: The text to process
275-276 Returns:
277 Text with outside double quotes removed
278 """
279 if not text or len(text) < 2:
280 return text
281-282 text = text.strip()
283-284 # Only remove double quotes from start and end
285 if text.startswith('"') and text.endswith('"'):
286 return text[1:-1]
287-288 return text
0289290def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None) -> Dict[str, Any]:
291 """
···304 The response from sending the post
305 """
306 import re
307-308 # If root is not provided, this is a reply to the root post
309 if root_uri is None:
310 root_uri = reply_to_uri
311 root_cid = reply_to_cid
312313 # Create references for the reply
314- parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid))
315- root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid))
00316317 # Parse rich text facets (mentions and URLs)
318 facets = []
319 text_bytes = text.encode("UTF-8")
320-321 # Parse mentions - fixed to handle @ at start of text
322 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)"
323-324 for m in re.finditer(mention_regex, text_bytes):
325 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix
326 # Adjust byte positions to account for the optional prefix
···336 byteStart=mention_start,
337 byteEnd=mention_end
338 ),
339- features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)]
0340 )
341 )
342 except Exception as e:
343- logger.debug(f"Failed to resolve handle {handle}: {e}")
000000000344 continue
345-346 # Parse URLs - fixed to handle URLs at start of text
347 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)"
348-349 for m in re.finditer(url_regex, text_bytes):
350 url = m.group(1).decode("UTF-8")
351 # Adjust byte positions to account for the optional prefix
···365 if facets:
366 response = client.send_post(
367 text=text,
368- reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
0369 facets=facets,
370 langs=[lang] if lang else None
371 )
372 else:
373 response = client.send_post(
374 text=text,
375- reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
0376 langs=[lang] if lang else None
377 )
378···392 The thread data or None if not found
393 """
394 try:
395- thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10})
0396 return thread
397 except Exception as e:
398- logger.error(f"Error fetching post thread: {e}")
0000000000399 return None
400401···492 logger.error("Reply messages list cannot be empty")
493 return None
494 if len(reply_messages) > 15:
495- logger.error(f"Cannot send more than 15 reply messages (got {len(reply_messages)})")
0496 return None
497-498 # Get the post URI and CID from the notification (handle both dict and object)
499 if isinstance(notification, dict):
500 post_uri = notification.get('uri')
···512513 # Get the thread to find the root post
514 thread_data = get_post_thread(client, post_uri)
515-516 root_uri = post_uri
517 root_cid = post_cid
518···532 responses = []
533 current_parent_uri = post_uri
534 current_parent_cid = post_cid
535-536 for i, message in enumerate(reply_messages):
537- logger.info(f"Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...")
538-0539 # Send this reply
540 response = reply_to_post(
541 client=client,
···546 root_cid=root_cid,
547 lang=lang
548 )
549-550 if not response:
551- logger.error(f"Failed to send reply {i+1}, posting system failure message")
0552 # Try to post a system failure message
553 failure_response = reply_to_post(
554 client=client,
···564 current_parent_uri = failure_response.uri
565 current_parent_cid = failure_response.cid
566 else:
567- logger.error("Could not even send system failure message, stopping thread")
0568 return responses if responses else None
569 else:
570 responses.append(response)
···572 if i < len(reply_messages) - 1: # Not the last message
573 current_parent_uri = response.uri
574 current_parent_cid = response.cid
575-576 logger.info(f"Successfully sent {len(responses)} threaded replies")
577 return responses
578
···1+import json
2+import yaml
3+import dotenv
4import os
5import logging
6from typing import Optional, Dict, Any, List
···13logger = logging.getLogger("bluesky_session_handler")
1415# Load the environment variables
016dotenv.load_dotenv(override=True)
17001819# Strip fields. A list of fields to remove from a JSON object
20STRIP_FIELDS = [
···63 "mime_type",
64 "size",
65]
66+67+68def convert_to_basic_types(obj):
69 """Convert complex Python objects to basic types for JSON/YAML serialization."""
70 if hasattr(obj, '__dict__'):
···119def flatten_thread_structure(thread_data):
120 """
121 Flatten a nested thread structure into a list while preserving all data.
122+123 Args:
124 thread_data: The thread data from get_post_thread
125+126 Returns:
127 Dict with 'posts' key containing a list of posts in chronological order
128 """
129 posts = []
130+131 def traverse_thread(node):
132 """Recursively traverse the thread structure to collect posts."""
133 if not node:
134 return
135+136 # If this node has a parent, traverse it first (to maintain chronological order)
137 if hasattr(node, 'parent') and node.parent:
138 traverse_thread(node.parent)
139+140 # Then add this node's post
141 if hasattr(node, 'post') and node.post:
142 # Convert to dict if needed to ensure we can process it
···146 post_dict = node.post.copy()
147 else:
148 post_dict = {}
149+150 posts.append(post_dict)
151+152 # Handle the thread structure
153 if hasattr(thread_data, 'thread'):
154 # Start from the main thread node
155 traverse_thread(thread_data.thread)
156 elif hasattr(thread_data, '__dict__') and 'thread' in thread_data.__dict__:
157 traverse_thread(thread_data.__dict__['thread'])
158+159 # Return a simple structure with posts list
160 return {'posts': posts}
161···173 """
174 # First flatten the thread structure to avoid deep nesting
175 flattened = flatten_thread_structure(thread)
176+177 # Convert complex objects to basic types
178 basic_thread = convert_to_basic_types(flattened)
179···186 return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False)
18718800000189def get_session(username: str) -> Optional[str]:
190 try:
191 with open(f"session_{username}.txt", encoding="UTF-8") as f:
···193 except FileNotFoundError:
194 logger.debug(f"No existing session found for {username}")
195 return None
196+197198def save_session(username: str, session_string: str) -> None:
199 with open(f"session_{username}.txt", "w", encoding="UTF-8") as f:
200 f.write(session_string)
201 logger.debug(f"Session saved for {username}")
202203+204def on_session_change(username: str, event: SessionEvent, session: Session) -> None:
205 logger.debug(f"Session changed: {event} {repr(session)}")
206 if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
207 logger.debug(f"Saving changed session for {username}")
208 save_session(username, session.export())
209+210211def init_client(username: str, password: str, pds_uri: str = "https://bsky.social") -> Client:
212 if pds_uri is None:
···243 password = config['password']
244 pds_uri = config['pds_uri']
245 except (ImportError, FileNotFoundError, KeyError) as e:
246+ logger.warning(
247+ f"Could not load from config file ({e}), falling back to environment variables")
248 username = os.getenv("BSKY_USERNAME")
249 password = os.getenv("BSKY_PASSWORD")
250 pds_uri = os.getenv("PDS_URI", "https://bsky.social")
···263264 return init_client(username, password, pds_uri)
265266+267def remove_outside_quotes(text: str) -> str:
268 """
269 Remove outside double quotes from response text.
270+271 Only handles double quotes to avoid interfering with contractions:
272 - Double quotes: "text" → text
273 - Preserves single quotes and internal quotes
274+275 Args:
276 text: The text to process
277+278 Returns:
279 Text with outside double quotes removed
280 """
281 if not text or len(text) < 2:
282 return text
283+284 text = text.strip()
285+286 # Only remove double quotes from start and end
287 if text.startswith('"') and text.endswith('"'):
288 return text[1:-1]
289+290 return text
291+292293def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None, lang: Optional[str] = None) -> Dict[str, Any]:
294 """
···307 The response from sending the post
308 """
309 import re
310+311 # If root is not provided, this is a reply to the root post
312 if root_uri is None:
313 root_uri = reply_to_uri
314 root_cid = reply_to_cid
315316 # Create references for the reply
317+ parent_ref = models.create_strong_ref(
318+ models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid))
319+ root_ref = models.create_strong_ref(
320+ models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid))
321322 # Parse rich text facets (mentions and URLs)
323 facets = []
324 text_bytes = text.encode("UTF-8")
325+326 # Parse mentions - fixed to handle @ at start of text
327 mention_regex = rb"(?:^|[$|\W])(@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)"
328+329 for m in re.finditer(mention_regex, text_bytes):
330 handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix
331 # Adjust byte positions to account for the optional prefix
···341 byteStart=mention_start,
342 byteEnd=mention_end
343 ),
344+ features=[models.AppBskyRichtextFacet.Mention(
345+ did=resolve_resp.did)]
346 )
347 )
348 except Exception as e:
349+ # Handle specific error cases
350+ error_str = str(e)
351+ if 'Could not find user info' in error_str or 'InvalidRequest' in error_str:
352+ logger.warning(
353+ f"User @{handle} not found (account may be deleted/suspended), skipping mention facet")
354+ elif 'BadRequestError' in error_str:
355+ logger.warning(
356+ f"Bad request when resolving @{handle}, skipping mention facet: {e}")
357+ else:
358+ logger.debug(f"Failed to resolve handle @{handle}: {e}")
359 continue
360+361 # Parse URLs - fixed to handle URLs at start of text
362 url_regex = rb"(?:^|[$|\W])(https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)"
363+364 for m in re.finditer(url_regex, text_bytes):
365 url = m.group(1).decode("UTF-8")
366 # Adjust byte positions to account for the optional prefix
···380 if facets:
381 response = client.send_post(
382 text=text,
383+ reply_to=models.AppBskyFeedPost.ReplyRef(
384+ parent=parent_ref, root=root_ref),
385 facets=facets,
386 langs=[lang] if lang else None
387 )
388 else:
389 response = client.send_post(
390 text=text,
391+ reply_to=models.AppBskyFeedPost.ReplyRef(
392+ parent=parent_ref, root=root_ref),
393 langs=[lang] if lang else None
394 )
395···409 The thread data or None if not found
410 """
411 try:
412+ thread = client.app.bsky.feed.get_post_thread(
413+ {'uri': uri, 'parent_height': 60, 'depth': 10})
414 return thread
415 except Exception as e:
416+ error_str = str(e)
417+ # Handle specific error cases more gracefully
418+ if 'Could not find user info' in error_str or 'InvalidRequest' in error_str:
419+ logger.warning(
420+ f"User account not found for post URI {uri} (account may be deleted/suspended)")
421+ elif 'NotFound' in error_str or 'Post not found' in error_str:
422+ logger.warning(f"Post not found for URI {uri}")
423+ elif 'BadRequestError' in error_str:
424+ logger.warning(f"Bad request error for URI {uri}: {e}")
425+ else:
426+ logger.error(f"Error fetching post thread: {e}")
427 return None
428429···520 logger.error("Reply messages list cannot be empty")
521 return None
522 if len(reply_messages) > 15:
523+ logger.error(
524+ f"Cannot send more than 15 reply messages (got {len(reply_messages)})")
525 return None
526+527 # Get the post URI and CID from the notification (handle both dict and object)
528 if isinstance(notification, dict):
529 post_uri = notification.get('uri')
···541542 # Get the thread to find the root post
543 thread_data = get_post_thread(client, post_uri)
544+545 root_uri = post_uri
546 root_cid = post_cid
547···561 responses = []
562 current_parent_uri = post_uri
563 current_parent_cid = post_cid
564+565 for i, message in enumerate(reply_messages):
566+ logger.info(
567+ f"Sending reply {i+1}/{len(reply_messages)}: {message[:50]}...")
568+569 # Send this reply
570 response = reply_to_post(
571 client=client,
···576 root_cid=root_cid,
577 lang=lang
578 )
579+580 if not response:
581+ logger.error(
582+ f"Failed to send reply {i+1}, posting system failure message")
583 # Try to post a system failure message
584 failure_response = reply_to_post(
585 client=client,
···595 current_parent_uri = failure_response.uri
596 current_parent_cid = failure_response.cid
597 else:
598+ logger.error(
599+ "Could not even send system failure message, stopping thread")
600 return responses if responses else None
601 else:
602 responses.append(response)
···604 if i < len(reply_messages) - 1: # Not the last message
605 current_parent_uri = response.uri
606 current_parent_cid = response.cid
607+608 logger.info(f"Successfully sent {len(responses)} threaded replies")
609 return responses
610
+16-8
register_tools.py
···4import sys
5import logging
6from typing import List
7-from dotenv import load_dotenv
8from letta_client import Letta
9from rich.console import Console
10from rich.table import Table
01112# Import standalone functions and their schemas
13from tools.search import search_bluesky_posts, SearchArgs
···18from tools.thread import add_post_to_bluesky_reply_thread, ReplyThreadPostArgs
19from tools.ignore import ignore_notification, IgnoreNotificationArgs
2021-load_dotenv()
0022logging.basicConfig(level=logging.INFO)
23logger = logging.getLogger(__name__)
24console = Console()
···101]
102103104-def register_tools(agent_name: str = "void", tools: List[str] = None):
105 """Register tools with a Letta agent.
106107 Args:
108- agent_name: Name of the agent to attach tools to
109 tools: List of tool names to register. If None, registers all tools.
110 """
0000111 try:
112- # Initialize Letta client with API key
113- client = Letta(token=os.environ["LETTA_API_KEY"])
114115 # Find the agent
116 agents = client.agents.list()
···201 import argparse
202203 parser = argparse.ArgumentParser(description="Register Void tools with a Letta agent")
204- parser.add_argument("agent", nargs="?", default="void", help="Agent name (default: void)")
205 parser.add_argument("--tools", nargs="+", help="Specific tools to register (default: all)")
206 parser.add_argument("--list", action="store_true", help="List available tools")
207···210 if args.list:
211 list_available_tools()
212 else:
213- console.print(f"\n[bold]Registering tools for agent: {args.agent}[/bold]\n")
00214 register_tools(args.agent, args.tools)
···4import sys
5import logging
6from typing import List
07from letta_client import Letta
8from rich.console import Console
9from rich.table import Table
10+from config_loader import get_config, get_letta_config, get_agent_config
1112# Import standalone functions and their schemas
13from tools.search import search_bluesky_posts, SearchArgs
···18from tools.thread import add_post_to_bluesky_reply_thread, ReplyThreadPostArgs
19from tools.ignore import ignore_notification, IgnoreNotificationArgs
2021+config = get_config()
22+letta_config = get_letta_config()
23+agent_config = get_agent_config()
24logging.basicConfig(level=logging.INFO)
25logger = logging.getLogger(__name__)
26console = Console()
···103]
104105106+def register_tools(agent_name: str = None, tools: List[str] = None):
107 """Register tools with a Letta agent.
108109 Args:
110+ agent_name: Name of the agent to attach tools to. If None, uses config default.
111 tools: List of tool names to register. If None, registers all tools.
112 """
113+ # Use agent name from config if not provided
114+ if agent_name is None:
115+ agent_name = agent_config['name']
116+117 try:
118+ # Initialize Letta client with API key from config
119+ client = Letta(token=letta_config['api_key'])
120121 # Find the agent
122 agents = client.agents.list()
···207 import argparse
208209 parser = argparse.ArgumentParser(description="Register Void tools with a Letta agent")
210+ parser.add_argument("agent", nargs="?", default=None, help=f"Agent name (default: {agent_config['name']})")
211 parser.add_argument("--tools", nargs="+", help="Specific tools to register (default: all)")
212 parser.add_argument("--list", action="store_true", help="List available tools")
213···216 if args.list:
217 list_available_tools()
218 else:
219+ # Use config default if no agent specified
220+ agent_name = args.agent if args.agent is not None else agent_config['name']
221+ console.print(f"\n[bold]Registering tools for agent: {agent_name}[/bold]\n")
222 register_tools(args.agent, args.tools)
+23
requirements.txt
···00000000000000000000000
···1+# Core dependencies for Void Bot
2+3+# Configuration and utilities
4+PyYAML>=6.0.2
5+rich>=14.0.0
6+python-dotenv>=1.0.0
7+8+# Letta API client
9+letta-client>=0.1.198
10+11+# AT Protocol (Bluesky) client
12+atproto>=0.0.54
13+14+# HTTP client for API calls
15+httpx>=0.28.1
16+httpx-sse>=0.4.0
17+requests>=2.31.0
18+19+# Data validation
20+pydantic>=2.11.7
21+22+# Async support
23+anyio>=4.9.0