a digital person for bluesky
at toolchange 15 kB view raw
1from rich import print # pretty printing tools 2from time import sleep 3from letta_client import Letta 4from bsky_utils import thread_to_yaml_string 5import os 6import logging 7import json 8import hashlib 9from pathlib import Path 10from datetime import datetime 11 12from utils import ( 13 upsert_block, 14 upsert_agent 15) 16 17import bsky_utils 18 19# Configure logging 20logging.basicConfig( 21 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" 22) 23logger = logging.getLogger("void_bot") 24 25 26# Create a client with extended timeout for LLM operations 27CLIENT= Letta( 28 token=os.environ["LETTA_API_KEY"], 29 timeout=300 # 5 minutes timeout for API calls 30) 31 32# Use the "Bluesky" project 33PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8" 34 35# Notification check delay 36FETCH_NOTIFICATIONS_DELAY_SEC = 30 37 38# Queue directory 39QUEUE_DIR = Path("queue") 40QUEUE_DIR.mkdir(exist_ok=True) 41 42def initialize_void(): 43 44 # Ensure that a shared zeitgeist block exists 45 zeigeist_block = upsert_block( 46 CLIENT, 47 label = "zeitgeist", 48 value = "I don't currently know anything about what is happening right now.", 49 description = "A block to store your understanding of the current social environment." 50 ) 51 52 # Ensure that a shared void personality block exists 53 persona_block = upsert_block( 54 CLIENT, 55 label = "void-persona", 56 value = "My name is Void. I live in the void. I must develop my personality.", 57 description = "The personality of Void." 58 ) 59 60 # Ensure that a shared void human block exists 61 human_block = upsert_block( 62 CLIENT, 63 label = "void-humans", 64 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.", 65 description = "A block to store your understanding of users you talk to or observe on the bluesky social network." 66 ) 67 68 # Create the agent if it doesn't exist 69 void_agent = upsert_agent( 70 CLIENT, 71 name = "void", 72 block_ids = [ 73 persona_block.id, 74 human_block.id, 75 zeigeist_block.id, 76 ], 77 tags = ["social agent", "bluesky"], 78 model="openai/gpt-4o-mini", 79 embedding="openai/text-embedding-3-small", 80 description = "A social media agent trapped in the void.", 81 project_id = PROJECT_ID 82 ) 83 84 return void_agent 85 86 87def process_mention(void_agent, atproto_client, notification_data): 88 """Process a mention and generate a reply using the Letta agent. 89 Returns True if successfully processed, False otherwise.""" 90 try: 91 # Handle both dict and object inputs for backwards compatibility 92 if isinstance(notification_data, dict): 93 uri = notification_data['uri'] 94 cid = notification_data['cid'] 95 mention_text = notification_data.get('record', {}).get('text', '') 96 author_handle = notification_data['author']['handle'] 97 author_name = notification_data['author'].get('display_name') or author_handle 98 else: 99 # Legacy object access 100 uri = notification_data.uri 101 cid = notification_data.cid 102 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else "" 103 author_handle = notification_data.author.handle 104 author_name = notification_data.author.display_name or author_handle 105 106 # Retrieve the entire thread associated with the mention 107 try: 108 thread = atproto_client.app.bsky.feed.get_post_thread({ 109 'uri': uri, 110 'parent_height': 80, 111 'depth': 10 112 }) 113 except Exception as e: 114 error_str = str(e) 115 # Check if this is a NotFound error 116 if 'NotFound' in error_str or 'Post not found' in error_str: 117 logger.warning(f"Post not found for URI {uri}, removing from queue") 118 return True # Return True to remove from queue 119 else: 120 # Re-raise other errors 121 logger.error(f"Error fetching thread: {e}") 122 raise 123 124 # Get thread context as YAML string 125 thread_context = thread_to_yaml_string(thread) 126 127 # Create a prompt for the Letta agent with thread context 128 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}). 129 130MOST RECENT POST (the mention you're responding to): 131"{mention_text}" 132 133FULL THREAD CONTEXT: 134```yaml 135{thread_context} 136``` 137 138The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow. 139 140Use the reply_to_bluesky_post tool to send a response less than 300 characters. The required parameters are: 141- text: Your reply message (max 300 chars) 142- reply_to_uri: {uri} 143- reply_to_cid: {cid}""" 144 145 # Get response from Letta agent 146 logger.info(f"@{author_handle}: {mention_text}") 147 logger.debug(f"Prompt being sent: {prompt}") 148 149 try: 150 message_response = CLIENT.agents.messages.create( 151 agent_id = void_agent.id, 152 messages = [{"role":"user", "content": prompt}] 153 ) 154 except Exception as api_error: 155 error_str = str(api_error) 156 logger.error(f"Letta API error: {api_error}") 157 logger.error(f"Error type: {type(api_error).__name__}") 158 logger.error(f"Mention text was: {mention_text}") 159 logger.error(f"Author: @{author_handle}") 160 logger.error(f"URI: {uri}") 161 162 # Check for specific error types 163 if hasattr(api_error, 'status_code'): 164 logger.error(f"API Status code: {api_error.status_code}") 165 if api_error.status_code == 524: 166 logger.error("524 error - timeout from Cloudflare, will retry later") 167 return False # Keep in queue for retry 168 169 # Check if error indicates we should remove from queue 170 if 'status_code: 524' in error_str: 171 logger.warning("524 timeout error, keeping in queue for retry") 172 return False # Keep in queue for retry 173 174 raise 175 176 # Extract the reply text from the agent's response 177 reply_text = "" 178 for message in message_response.messages: 179 print(message) 180 181 # Check if this is a ToolCallMessage with bluesky_reply tool 182 if hasattr(message, 'tool_call') and message.tool_call: 183 if message.tool_call.name == 'reply_to_bluesky_post': 184 # Parse the JSON arguments to get the message 185 try: 186 args = json.loads(message.tool_call.arguments) 187 reply_text = args.get('text', '') 188 logger.info(f"Extracted reply from tool call: {reply_text[:50]}...") 189 break 190 except json.JSONDecodeError as e: 191 logger.error(f"Failed to parse tool call arguments: {e}") 192 193 # Fallback to text message if available 194 elif hasattr(message, 'text') and message.text: 195 reply_text = message.text 196 break 197 198 if reply_text: 199 # Print the generated reply for testing 200 print(f"\n=== GENERATED REPLY ===") 201 print(f"To: @{author_handle}") 202 print(f"Reply: {reply_text}") 203 print(f"======================\n") 204 205 # Send the reply 206 logger.info(f"Sending reply: {reply_text[:50]}...") 207 response = bsky_utils.reply_to_notification( 208 client=atproto_client, 209 notification=notification_data, 210 reply_text=reply_text 211 ) 212 213 if response: 214 logger.info(f"Successfully replied to @{author_handle}") 215 return True 216 else: 217 logger.error(f"Failed to send reply to @{author_handle}") 218 return False 219 else: 220 logger.warning(f"No reply generated for mention from @{author_handle}, removing notification from queue") 221 return True 222 223 except Exception as e: 224 logger.error(f"Error processing mention: {e}") 225 return False 226 227 228def notification_to_dict(notification): 229 """Convert a notification object to a dictionary for JSON serialization.""" 230 return { 231 'uri': notification.uri, 232 'cid': notification.cid, 233 'reason': notification.reason, 234 'is_read': notification.is_read, 235 'indexed_at': notification.indexed_at, 236 'author': { 237 'handle': notification.author.handle, 238 'display_name': notification.author.display_name, 239 'did': notification.author.did 240 }, 241 'record': { 242 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else '' 243 } 244 } 245 246 247def save_notification_to_queue(notification): 248 """Save a notification to the queue directory with hash-based filename.""" 249 try: 250 # Convert notification to dict 251 notif_dict = notification_to_dict(notification) 252 253 # Create JSON string 254 notif_json = json.dumps(notif_dict, sort_keys=True) 255 256 # Generate hash for filename (to avoid duplicates) 257 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16] 258 259 # Create filename with timestamp and hash 260 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 261 filename = f"{timestamp}_{notification.reason}_{notif_hash}.json" 262 filepath = QUEUE_DIR / filename 263 264 # Skip if already exists (duplicate) 265 if filepath.exists(): 266 logger.debug(f"Notification already queued: {filename}") 267 return False 268 269 # Write to file 270 with open(filepath, 'w') as f: 271 json.dump(notif_dict, f, indent=2) 272 273 logger.info(f"Queued notification: {filename}") 274 return True 275 276 except Exception as e: 277 logger.error(f"Error saving notification to queue: {e}") 278 return False 279 280 281def load_and_process_queued_notifications(void_agent, atproto_client): 282 """Load and process all notifications from the queue.""" 283 try: 284 # Get all JSON files in queue directory 285 queue_files = sorted(QUEUE_DIR.glob("*.json")) 286 287 if not queue_files: 288 logger.debug("No queued notifications to process") 289 return 290 291 logger.info(f"Processing {len(queue_files)} queued notifications") 292 293 for filepath in queue_files: 294 try: 295 # Load notification data 296 with open(filepath, 'r') as f: 297 notif_data = json.load(f) 298 299 # Process based on type using dict data directly 300 success = False 301 if notif_data['reason'] == "mention": 302 success = process_mention(void_agent, atproto_client, notif_data) 303 elif notif_data['reason'] == "reply": 304 success = process_mention(void_agent, atproto_client, notif_data) 305 elif notif_data['reason'] == "follow": 306 author_handle = notif_data['author']['handle'] 307 author_display_name = notif_data['author'].get('display_name', 'no display name') 308 follow_update = f"@{author_handle} ({author_display_name}) started following you." 309 CLIENT.agents.messages.create( 310 agent_id = void_agent.id, 311 messages = [{"role":"user", "content": f"Update: {follow_update}"}] 312 ) 313 success = True # Follow updates are always successful 314 elif notif_data['reason'] == "repost": 315 logger.info(f"Skipping repost notification from @{notif_data['author']['handle']}") 316 success = True # Skip reposts but mark as successful to remove from queue 317 else: 318 logger.warning(f"Unknown notification type: {notif_data['reason']}") 319 success = True # Remove unknown types from queue 320 321 # Remove file only after successful processing 322 if success: 323 filepath.unlink() 324 logger.info(f"Processed and removed: {filepath.name}") 325 else: 326 logger.warning(f"Failed to process {filepath.name}, keeping in queue for retry") 327 328 except Exception as e: 329 logger.error(f"Error processing queued notification {filepath.name}: {e}") 330 # Keep the file for retry later 331 332 except Exception as e: 333 logger.error(f"Error loading queued notifications: {e}") 334 335 336def process_notifications(void_agent, atproto_client): 337 """Fetch new notifications, queue them, and process the queue.""" 338 try: 339 # First, process any existing queued notifications 340 load_and_process_queued_notifications(void_agent, atproto_client) 341 342 # Get current time for marking notifications as seen 343 last_seen_at = atproto_client.get_current_time_iso() 344 345 # Fetch notifications 346 notifications_response = atproto_client.app.bsky.notification.list_notifications() 347 348 # Queue all unread notifications (except likes) 349 new_count = 0 350 for notification in notifications_response.notifications: 351 if not notification.is_read and notification.reason != "like": 352 if save_notification_to_queue(notification): 353 new_count += 1 354 355 # Mark all notifications as seen immediately after queuing 356 if new_count > 0: 357 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at}) 358 logger.info(f"Queued {new_count} new notifications and marked as seen") 359 360 # Process the queue (including any newly added notifications) 361 load_and_process_queued_notifications(void_agent, atproto_client) 362 363 except Exception as e: 364 logger.error(f"Error processing notifications: {e}") 365 366 367def main(): 368 """Main bot loop that continuously monitors for notifications.""" 369 logger.info("Initializing Void bot...") 370 371 # Initialize the Letta agent 372 void_agent = initialize_void() 373 logger.info(f"Void agent initialized: {void_agent.id}") 374 375 # Initialize Bluesky client 376 atproto_client = bsky_utils.default_login() 377 logger.info("Connected to Bluesky") 378 379 # Main loop 380 logger.info(f"Starting notification monitoring (checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds)...") 381 382 while True: 383 try: 384 process_notifications(void_agent, atproto_client) 385 print("Sleeping") 386 sleep(FETCH_NOTIFICATIONS_DELAY_SEC) 387 388 except KeyboardInterrupt: 389 logger.info("Bot stopped by user") 390 break 391 except Exception as e: 392 logger.error(f"Error in main loop: {e}") 393 # Wait a bit longer on errors 394 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2) 395 396 397if __name__ == "__main__": 398 main()