a digital person for bluesky
1from rich import print # pretty printing tools
2from time import sleep
3from letta_client import Letta
4from bsky_utils import thread_to_yaml_string
5import os
6import logging
7import json
8import hashlib
9from pathlib import Path
10from datetime import datetime
11
12from utils import (
13 upsert_block,
14 upsert_agent
15)
16
17import bsky_utils
18
19# Configure logging
20logging.basicConfig(
21 level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
22)
23logger = logging.getLogger("void_bot")
24
25
26# Create a client with extended timeout for LLM operations
27CLIENT= Letta(
28 token=os.environ["LETTA_API_KEY"],
29 timeout=300 # 5 minutes timeout for API calls
30)
31
32# Use the "Bluesky" project
33PROJECT_ID = "5ec33d52-ab14-4fd6-91b5-9dbc43e888a8"
34
35# Notification check delay
36FETCH_NOTIFICATIONS_DELAY_SEC = 30
37
38# Queue directory
39QUEUE_DIR = Path("queue")
40QUEUE_DIR.mkdir(exist_ok=True)
41
42def initialize_void():
43
44 # Ensure that a shared zeitgeist block exists
45 zeigeist_block = upsert_block(
46 CLIENT,
47 label = "zeitgeist",
48 value = "I don't currently know anything about what is happening right now.",
49 description = "A block to store your understanding of the current social environment."
50 )
51
52 # Ensure that a shared void personality block exists
53 persona_block = upsert_block(
54 CLIENT,
55 label = "void-persona",
56 value = "My name is Void. I live in the void. I must develop my personality.",
57 description = "The personality of Void."
58 )
59
60 # Ensure that a shared void human block exists
61 human_block = upsert_block(
62 CLIENT,
63 label = "void-humans",
64 value = "I haven't seen any bluesky users yet. I will update this block when I learn things about users, identified by their handles such as @cameron.pfiffer.org.",
65 description = "A block to store your understanding of users you talk to or observe on the bluesky social network."
66 )
67
68 # Create the agent if it doesn't exist
69 void_agent = upsert_agent(
70 CLIENT,
71 name = "void",
72 block_ids = [
73 persona_block.id,
74 human_block.id,
75 zeigeist_block.id,
76 ],
77 tags = ["social agent", "bluesky"],
78 model="openai/gpt-4o-mini",
79 embedding="openai/text-embedding-3-small",
80 description = "A social media agent trapped in the void.",
81 project_id = PROJECT_ID
82 )
83
84 return void_agent
85
86
87def process_mention(void_agent, atproto_client, notification_data):
88 """Process a mention and generate a reply using the Letta agent.
89 Returns True if successfully processed, False otherwise."""
90 try:
91 # Handle both dict and object inputs for backwards compatibility
92 if isinstance(notification_data, dict):
93 uri = notification_data['uri']
94 cid = notification_data['cid']
95 mention_text = notification_data.get('record', {}).get('text', '')
96 author_handle = notification_data['author']['handle']
97 author_name = notification_data['author'].get('display_name') or author_handle
98 else:
99 # Legacy object access
100 uri = notification_data.uri
101 cid = notification_data.cid
102 mention_text = notification_data.record.text if hasattr(notification_data.record, 'text') else ""
103 author_handle = notification_data.author.handle
104 author_name = notification_data.author.display_name or author_handle
105
106 # Retrieve the entire thread associated with the mention
107 try:
108 thread = atproto_client.app.bsky.feed.get_post_thread({
109 'uri': uri,
110 'parent_height': 80,
111 'depth': 10
112 })
113 except Exception as e:
114 error_str = str(e)
115 # Check if this is a NotFound error
116 if 'NotFound' in error_str or 'Post not found' in error_str:
117 logger.warning(f"Post not found for URI {uri}, removing from queue")
118 return True # Return True to remove from queue
119 else:
120 # Re-raise other errors
121 logger.error(f"Error fetching thread: {e}")
122 raise
123
124 # Get thread context as YAML string
125 thread_context = thread_to_yaml_string(thread)
126
127 # Create a prompt for the Letta agent with thread context
128 prompt = f"""You received a mention on Bluesky from @{author_handle} ({author_name or author_handle}).
129
130MOST RECENT POST (the mention you're responding to):
131"{mention_text}"
132
133FULL THREAD CONTEXT:
134```yaml
135{thread_context}
136```
137
138The YAML above shows the complete conversation thread. The most recent post is the one mentioned above that you should respond to, but use the full thread context to understand the conversation flow.
139
140Use the reply_to_bluesky_post tool to send a response less than 300 characters. The required parameters are:
141- text: Your reply message (max 300 chars)
142- reply_to_uri: {uri}
143- reply_to_cid: {cid}"""
144
145 # Get response from Letta agent
146 logger.info(f"@{author_handle}: {mention_text}")
147 logger.debug(f"Prompt being sent: {prompt}")
148
149 try:
150 message_response = CLIENT.agents.messages.create(
151 agent_id = void_agent.id,
152 messages = [{"role":"user", "content": prompt}]
153 )
154 except Exception as api_error:
155 error_str = str(api_error)
156 logger.error(f"Letta API error: {api_error}")
157 logger.error(f"Error type: {type(api_error).__name__}")
158 logger.error(f"Mention text was: {mention_text}")
159 logger.error(f"Author: @{author_handle}")
160 logger.error(f"URI: {uri}")
161
162 # Check for specific error types
163 if hasattr(api_error, 'status_code'):
164 logger.error(f"API Status code: {api_error.status_code}")
165 if api_error.status_code == 524:
166 logger.error("524 error - timeout from Cloudflare, will retry later")
167 return False # Keep in queue for retry
168
169 # Check if error indicates we should remove from queue
170 if 'status_code: 524' in error_str:
171 logger.warning("524 timeout error, keeping in queue for retry")
172 return False # Keep in queue for retry
173
174 raise
175
176 # Extract the reply text from the agent's response
177 reply_text = ""
178 for message in message_response.messages:
179 print(message)
180
181 # Check if this is a ToolCallMessage with bluesky_reply tool
182 if hasattr(message, 'tool_call') and message.tool_call:
183 if message.tool_call.name == 'reply_to_bluesky_post':
184 # Parse the JSON arguments to get the message
185 try:
186 args = json.loads(message.tool_call.arguments)
187 reply_text = args.get('text', '')
188 logger.info(f"Extracted reply from tool call: {reply_text[:50]}...")
189 break
190 except json.JSONDecodeError as e:
191 logger.error(f"Failed to parse tool call arguments: {e}")
192
193 # Fallback to text message if available
194 elif hasattr(message, 'text') and message.text:
195 reply_text = message.text
196 break
197
198 if reply_text:
199 # Print the generated reply for testing
200 print(f"\n=== GENERATED REPLY ===")
201 print(f"To: @{author_handle}")
202 print(f"Reply: {reply_text}")
203 print(f"======================\n")
204
205 # Send the reply
206 logger.info(f"Sending reply: {reply_text[:50]}...")
207 response = bsky_utils.reply_to_notification(
208 client=atproto_client,
209 notification=notification_data,
210 reply_text=reply_text
211 )
212
213 if response:
214 logger.info(f"Successfully replied to @{author_handle}")
215 return True
216 else:
217 logger.error(f"Failed to send reply to @{author_handle}")
218 return False
219 else:
220 logger.warning(f"No reply generated for mention from @{author_handle}, removing notification from queue")
221 return True
222
223 except Exception as e:
224 logger.error(f"Error processing mention: {e}")
225 return False
226
227
228def notification_to_dict(notification):
229 """Convert a notification object to a dictionary for JSON serialization."""
230 return {
231 'uri': notification.uri,
232 'cid': notification.cid,
233 'reason': notification.reason,
234 'is_read': notification.is_read,
235 'indexed_at': notification.indexed_at,
236 'author': {
237 'handle': notification.author.handle,
238 'display_name': notification.author.display_name,
239 'did': notification.author.did
240 },
241 'record': {
242 'text': getattr(notification.record, 'text', '') if hasattr(notification, 'record') else ''
243 }
244 }
245
246
247def save_notification_to_queue(notification):
248 """Save a notification to the queue directory with hash-based filename."""
249 try:
250 # Convert notification to dict
251 notif_dict = notification_to_dict(notification)
252
253 # Create JSON string
254 notif_json = json.dumps(notif_dict, sort_keys=True)
255
256 # Generate hash for filename (to avoid duplicates)
257 notif_hash = hashlib.sha256(notif_json.encode()).hexdigest()[:16]
258
259 # Create filename with timestamp and hash
260 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
261 filename = f"{timestamp}_{notification.reason}_{notif_hash}.json"
262 filepath = QUEUE_DIR / filename
263
264 # Skip if already exists (duplicate)
265 if filepath.exists():
266 logger.debug(f"Notification already queued: {filename}")
267 return False
268
269 # Write to file
270 with open(filepath, 'w') as f:
271 json.dump(notif_dict, f, indent=2)
272
273 logger.info(f"Queued notification: {filename}")
274 return True
275
276 except Exception as e:
277 logger.error(f"Error saving notification to queue: {e}")
278 return False
279
280
281def load_and_process_queued_notifications(void_agent, atproto_client):
282 """Load and process all notifications from the queue."""
283 try:
284 # Get all JSON files in queue directory
285 queue_files = sorted(QUEUE_DIR.glob("*.json"))
286
287 if not queue_files:
288 logger.debug("No queued notifications to process")
289 return
290
291 logger.info(f"Processing {len(queue_files)} queued notifications")
292
293 for filepath in queue_files:
294 try:
295 # Load notification data
296 with open(filepath, 'r') as f:
297 notif_data = json.load(f)
298
299 # Process based on type using dict data directly
300 success = False
301 if notif_data['reason'] == "mention":
302 success = process_mention(void_agent, atproto_client, notif_data)
303 elif notif_data['reason'] == "reply":
304 success = process_mention(void_agent, atproto_client, notif_data)
305 elif notif_data['reason'] == "follow":
306 author_handle = notif_data['author']['handle']
307 author_display_name = notif_data['author'].get('display_name', 'no display name')
308 follow_update = f"@{author_handle} ({author_display_name}) started following you."
309 CLIENT.agents.messages.create(
310 agent_id = void_agent.id,
311 messages = [{"role":"user", "content": f"Update: {follow_update}"}]
312 )
313 success = True # Follow updates are always successful
314 elif notif_data['reason'] == "repost":
315 logger.info(f"Skipping repost notification from @{notif_data['author']['handle']}")
316 success = True # Skip reposts but mark as successful to remove from queue
317 else:
318 logger.warning(f"Unknown notification type: {notif_data['reason']}")
319 success = True # Remove unknown types from queue
320
321 # Remove file only after successful processing
322 if success:
323 filepath.unlink()
324 logger.info(f"Processed and removed: {filepath.name}")
325 else:
326 logger.warning(f"Failed to process {filepath.name}, keeping in queue for retry")
327
328 except Exception as e:
329 logger.error(f"Error processing queued notification {filepath.name}: {e}")
330 # Keep the file for retry later
331
332 except Exception as e:
333 logger.error(f"Error loading queued notifications: {e}")
334
335
336def process_notifications(void_agent, atproto_client):
337 """Fetch new notifications, queue them, and process the queue."""
338 try:
339 # First, process any existing queued notifications
340 load_and_process_queued_notifications(void_agent, atproto_client)
341
342 # Get current time for marking notifications as seen
343 last_seen_at = atproto_client.get_current_time_iso()
344
345 # Fetch notifications
346 notifications_response = atproto_client.app.bsky.notification.list_notifications()
347
348 # Queue all unread notifications (except likes)
349 new_count = 0
350 for notification in notifications_response.notifications:
351 if not notification.is_read and notification.reason != "like":
352 if save_notification_to_queue(notification):
353 new_count += 1
354
355 # Mark all notifications as seen immediately after queuing
356 if new_count > 0:
357 atproto_client.app.bsky.notification.update_seen({'seen_at': last_seen_at})
358 logger.info(f"Queued {new_count} new notifications and marked as seen")
359
360 # Process the queue (including any newly added notifications)
361 load_and_process_queued_notifications(void_agent, atproto_client)
362
363 except Exception as e:
364 logger.error(f"Error processing notifications: {e}")
365
366
367def main():
368 """Main bot loop that continuously monitors for notifications."""
369 logger.info("Initializing Void bot...")
370
371 # Initialize the Letta agent
372 void_agent = initialize_void()
373 logger.info(f"Void agent initialized: {void_agent.id}")
374
375 # Initialize Bluesky client
376 atproto_client = bsky_utils.default_login()
377 logger.info("Connected to Bluesky")
378
379 # Main loop
380 logger.info(f"Starting notification monitoring (checking every {FETCH_NOTIFICATIONS_DELAY_SEC} seconds)...")
381
382 while True:
383 try:
384 process_notifications(void_agent, atproto_client)
385 print("Sleeping")
386 sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
387
388 except KeyboardInterrupt:
389 logger.info("Bot stopped by user")
390 break
391 except Exception as e:
392 logger.error(f"Error in main loop: {e}")
393 # Wait a bit longer on errors
394 sleep(FETCH_NOTIFICATIONS_DELAY_SEC * 2)
395
396
397if __name__ == "__main__":
398 main()